当前位置: 代码迷 >> 综合 >> 【Sprint Boot论坛项目】5、Kafka,构建TB级异步消息系统
  详细解决方案

【Sprint Boot论坛项目】5、Kafka,构建TB级异步消息系统

热度:31   发布时间:2023-12-29 22:29:19.0

这一章主要解决发送系统级消息或通知的问题

目录

  • 1 阻塞队列
  • 2 Kafka入门
  • 3 Spring整合Kafka
  • 4 发送系统通知
    • 4.1 封装事件(实体)
    • 4.2 写生产者与消费者
    • 4.3 在对应事件发生时调用生产者(Controller里修改)
    • 4.4 启动
  • 5 显示系统通知
    • 5.1 通知列表
    • 5.2 通知详情
    • 5.3 未读消息

1 阻塞队列

在这里插入图片描述
BlockingQueue是个接口,不能直接使用,只能调用它的实现类

package com.nowcoder.community;import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class BlockingQueueTests {
    public static void main(String[] args) {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);new Thread(new Producer(queue)).start();new Thread(new Consumer(queue)).start();new Thread(new Consumer(queue)).start();new Thread(new Consumer(queue)).start();}
}class Producer implements Runnable{
    //要把阻塞队列传进来private BlockingQueue<Integer> queue;public Producer(BlockingQueue<Integer> queue){
    //有参构造this.queue=queue;}@Overridepublic void run() {
    //实现该接口要重写run方法try{
    for(int i=0;i<100;i++){
    Thread.sleep(20);queue.put(i);//把生产的数据存入阻塞队列中System.out.println(Thread.currentThread().getName()+"生产:"+queue.size());}}catch (Exception e){
    e.printStackTrace();}}
}class Consumer implements Runnable{
    //要把阻塞队列传进来private BlockingQueue<Integer> queue;public Consumer(BlockingQueue<Integer> queue){
    //有参构造this.queue=queue;}@Overridepublic void run() {
    //实现该接口要重写run方法try{
    while(true){
    Thread.sleep(new Random().nextInt(1000));queue.take();System.out.println(Thread.currentThread().getName()+"消费:"+queue.size());}}catch (Exception e){
    e.printStackTrace();}}
}

2 Kafka入门

在这里插入图片描述
http://kafka.apache.org/
Broker:Kafka中的每台服务器记为一个Broker
Zookeeper:用来管理集群
Topic:主题,生产者-消费者模式是“发布订阅”模式,生产者发布消息的地方就是Topic
Partition:分区,对主题的消息分区,可以增强服务器的并发能力(如上图右侧)
Offset:消息在分区内 存放的索引
Replication:副本,对消息备份,Kafka是分布式消息引擎
Leader Replic主副本,可以响应查询

配置zookeeper:
zookeeper.properties

# the directory where the snapshot is stored.
dataDir=d:/code/data/zookeeper

server.properties

# A comma separated list of directories under which to store log files
log.dirs=d:/code/data/kafka-logs

开启Kafka的命令:(两个窗口都在安装目录下)

bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\windows\kafka-server-start.bat config\server.properties

不想打字的话直接把文件拖到窗口即可

3 Spring整合Kafka

在这里插入图片描述
生产者:传入主题,以及要发的具体消息
消费者:传入要监听的主题,再把消息包装成ConsumerRecode传入方法体里
配置

#Kafka
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
#是否自动提交消费者的偏移量
spring.kafka.consumer.enable-auto-commit=true
#自动提交间隔
spring.kafka.consumer.auto-commit-interval=3000

生产者发消息是我们主动调用(需要注入),消费者取消息是自动的(不需要注入)

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {
    @Autowiredprivate KafkaProducer kafkaProducer;@Testpublic void testKafka(){
    kafkaProducer.sendMessage("test","你好!");kafkaProducer.sendMessage("test","hello!");try {
    Thread.sleep(1000*10);} catch (InterruptedException e) {
    e.printStackTrace();}}
}
@Component
class KafkaProducer{
    @Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic,String content){
    kafkaTemplate.send(topic, content);}
}@Component
class KafkaConsumer{
    @KafkaListener(topics = {
    "test"})public void handleMessage(ConsumerRecord record){
    System.out.println(record.value());}
}

【此处运行时电脑的小电扇嗷嗷吹然后运行失败了,提示找不到topic,我开动我的小脑袋瓜子想是不是因为我把启动Kafka的命令行关掉了?打开之后再运行就成功了~
开启Kafka的命令:(两个窗口都在安装目录下)

bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\windows\kafka-server-start.bat config\server.properties

不想打字的话直接把文件拖到窗口即可

4 发送系统通知

在这里插入图片描述
系统发布通知是非常频繁的行为,重点要考虑性能问题
要用到Kafka的消息队列解决问题
为什么要用消息队列解决问题呢?
评论、点赞、关注是三类不同的通知,可以用三个不同的主题,某件事发生后,就把消息扔到队列里,当前线程(生产者)就可以去处理别的事情。后续的业务由消费者处理。
这种并发方式为异步方式

业务角度:以事件为主体来解决

触发事件:评论、点赞、关注
处理事件:

  • 将事件包含的所有信息封装起来;
    private String topic;private int userId;private int entityType;private int entityId;private int entityUserId;//使程序具有扩展性private Map<String, Object> data = new HashMap<>();
  • 开发事件的生产者(4.2)
  • 开发事件的消费者(4.2)

面向事件编程

4.1 封装事件(实体)

知识点:
1.private Map<String, Object> data = new HashMap<>();//实体最后有个map属性,使程序具有扩展性
2.set方法将无返回值改为返回自身,可以循环调用

public Event setTopic(String topic) {
    //返回自身可以循环调用this.topic = topic;return this;}

代码:

package com.nowcoder.community.entity;import java.util.HashMap;
import java.util.Map;public class Event {
    private String topic;//不同的类型:评论、点赞、关注private int userId;//事件触发的人private int entityType;private int entityId;//被触发的实体对象private int entityUserId;//实体作者private Map<String, Object> data = new HashMap<>();//具有扩展性public String getTopic() {
    return topic;}public Event setTopic(String topic) {
    //返回自身可以循环调用this.topic = topic;return this;}public int getUserId() {
    return userId;}public Event setUserId(int userId) {
    this.userId = userId;return this;}public int getEntityType() {
    return entityType;}public Event setEntityType(int entityType) {
    this.entityType = entityType;return this;}public int getEntityId() {
    return entityId;}public Event setEntityId(int entityId) {
    this.entityId = entityId;return this;}public int getEntityUserId() {
    return entityUserId;}public Event setEntityUserId(int entityUserId) {
    this.entityUserId = entityUserId;return this;}public Map<String, Object> getData() {
    return data;}public Event setData(String key,Object value) {
    this.data.put(key, value);return this;}
}

4.2 写生产者与消费者

@Component
public class EventProducer {
    @Autowiredprivate KafkaTemplate kafkaTemplate;//处理事件public void fireEvent(Event event){
    //将事件发布到指定的主题kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));}
}
@Component
public class EventConsumer implements CommunityConstant {
    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@KafkaListener(topics = {
    TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record){
    if(record == null || record.value()==null){
    logger.error("消息的内容为空");return;}Event event = JSONObject.parseObject(record.value().toString(),Event.class);if(event==null){
    logger.error("消息格式错误");return ;}//发送站内通知Message message = new Message();message.setFromId(SYSTEM_USER_ID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());Map<String, Object> content = new HashMap<>();content.put("userId", event.getUserId());content.put("entityType", event.getEntityType());content.put("entityId", event.getEntityId());if(!event.getData().isEmpty()){
    for(Map.Entry<String,Object> entry : event.getData().entrySet()){
    content.put(entry.getKey(), entry.getValue());}}message.setContent(JSONObject.toJSONString(content));messageService.addMessage(message);}
}

4.3 在对应事件发生时调用生产者(Controller里修改)

消费者是被动出发的,只要topic里有数据就会自动调用,我们需要主动调用生产者
在触发事件的三个Controller里调用:CommentController、LikeController、FollowController
1.注入生产者

	@Autowiredprivate EventProducer eventProducer;

2.在触发事件后封装事件

commentService.addComment(comment);//添加评论之后触发事件!
Event event = new Event().setTopic(TOPIC_COMMENT).setUserId(hostHolder.getUser().getId()).setEntityType(comment.getEntityType()).setEntityId(comment.getEntityId()).setData("postId", discussPostId);

3.处理事件

eventProducer.fireEvent(event);

4.4 启动

Kafka如果启动报错什么锁死,把kafka-logs删掉再重启(上面配置过日志存放的位置)
在这里插入图片描述
报错了,看第一句话报的什么错
在这里插入图片描述
往下拉看到自己编写的程序,点进去看报什么错(因为底层代码一般比较严谨不会出错)

5 显示系统通知

在这里插入图片描述

5.1 通知列表

数据层-业务层-视图层-动态模板,分别加方法

5.2 通知详情

5.3 未读消息

  相关解决方案