这一章主要解决发送系统级消息或通知的问题
目录
- 1 阻塞队列
- 2 Kafka入门
- 3 Spring整合Kafka
- 4 发送系统通知
-
- 4.1 封装事件(实体)
- 4.2 写生产者与消费者
- 4.3 在对应事件发生时调用生产者(Controller里修改)
- 4.4 启动
- 5 显示系统通知
-
- 5.1 通知列表
- 5.2 通知详情
- 5.3 未读消息
1 阻塞队列
BlockingQueue是个接口,不能直接使用,只能调用它的实现类
package com.nowcoder.community;import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;public class BlockingQueueTests {
public static void main(String[] args) {
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);new Thread(new Producer(queue)).start();new Thread(new Consumer(queue)).start();new Thread(new Consumer(queue)).start();new Thread(new Consumer(queue)).start();}
}class Producer implements Runnable{
//要把阻塞队列传进来private BlockingQueue<Integer> queue;public Producer(BlockingQueue<Integer> queue){
//有参构造this.queue=queue;}@Overridepublic void run() {
//实现该接口要重写run方法try{
for(int i=0;i<100;i++){
Thread.sleep(20);queue.put(i);//把生产的数据存入阻塞队列中System.out.println(Thread.currentThread().getName()+"生产:"+queue.size());}}catch (Exception e){
e.printStackTrace();}}
}class Consumer implements Runnable{
//要把阻塞队列传进来private BlockingQueue<Integer> queue;public Consumer(BlockingQueue<Integer> queue){
//有参构造this.queue=queue;}@Overridepublic void run() {
//实现该接口要重写run方法try{
while(true){
Thread.sleep(new Random().nextInt(1000));queue.take();System.out.println(Thread.currentThread().getName()+"消费:"+queue.size());}}catch (Exception e){
e.printStackTrace();}}
}
2 Kafka入门
http://kafka.apache.org/
Broker:Kafka中的每台服务器记为一个Broker
Zookeeper:用来管理集群
Topic:主题,生产者-消费者模式是“发布订阅”模式,生产者发布消息的地方就是Topic
Partition:分区,对主题的消息分区,可以增强服务器的并发能力(如上图右侧)
Offset:消息在分区内 存放的索引
Replication:副本,对消息备份,Kafka是分布式消息引擎
Leader Replic主副本,可以响应查询
配置zookeeper:
zookeeper.properties
# the directory where the snapshot is stored.
dataDir=d:/code/data/zookeeper
server.properties
# A comma separated list of directories under which to store log files
log.dirs=d:/code/data/kafka-logs
开启Kafka的命令:(两个窗口都在安装目录下)
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\windows\kafka-server-start.bat config\server.properties
不想打字的话直接把文件拖到窗口即可
3 Spring整合Kafka
生产者:传入主题,以及要发的具体消息
消费者:传入要监听的主题,再把消息包装成ConsumerRecode传入方法体里
配置
#Kafka
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
#是否自动提交消费者的偏移量
spring.kafka.consumer.enable-auto-commit=true
#自动提交间隔
spring.kafka.consumer.auto-commit-interval=3000
生产者发消息是我们主动调用(需要注入),消费者取消息是自动的(不需要注入)
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {
@Autowiredprivate KafkaProducer kafkaProducer;@Testpublic void testKafka(){
kafkaProducer.sendMessage("test","你好!");kafkaProducer.sendMessage("test","hello!");try {
Thread.sleep(1000*10);} catch (InterruptedException e) {
e.printStackTrace();}}
}
@Component
class KafkaProducer{
@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic,String content){
kafkaTemplate.send(topic, content);}
}@Component
class KafkaConsumer{
@KafkaListener(topics = {
"test"})public void handleMessage(ConsumerRecord record){
System.out.println(record.value());}
}
【此处运行时电脑的小电扇嗷嗷吹然后运行失败了,提示找不到topic,我开动我的小脑袋瓜子想是不是因为我把启动Kafka的命令行关掉了?打开之后再运行就成功了~】
开启Kafka的命令:(两个窗口都在安装目录下)
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\windows\kafka-server-start.bat config\server.properties
不想打字的话直接把文件拖到窗口即可
4 发送系统通知
系统发布通知是非常频繁的行为,重点要考虑性能问题
要用到Kafka的消息队列解决问题
为什么要用消息队列解决问题呢?
评论、点赞、关注是三类不同的通知,可以用三个不同的主题,某件事发生后,就把消息扔到队列里,当前线程(生产者)就可以去处理别的事情。后续的业务由消费者处理。
这种并发方式为异步方式
业务角度:以事件为主体来解决
触发事件:评论、点赞、关注
处理事件:
- 将事件包含的所有信息封装起来;
private String topic;private int userId;private int entityType;private int entityId;private int entityUserId;//使程序具有扩展性private Map<String, Object> data = new HashMap<>();
- 开发事件的生产者(4.2)
- 开发事件的消费者(4.2)
面向事件编程
4.1 封装事件(实体)
知识点:
1.private Map<String, Object> data = new HashMap<>();
//实体最后有个map属性,使程序具有扩展性
2.set方法将无返回值改为返回自身,可以循环调用
public Event setTopic(String topic) {
//返回自身可以循环调用this.topic = topic;return this;}
代码:
package com.nowcoder.community.entity;import java.util.HashMap;
import java.util.Map;public class Event {
private String topic;//不同的类型:评论、点赞、关注private int userId;//事件触发的人private int entityType;private int entityId;//被触发的实体对象private int entityUserId;//实体作者private Map<String, Object> data = new HashMap<>();//具有扩展性public String getTopic() {
return topic;}public Event setTopic(String topic) {
//返回自身可以循环调用this.topic = topic;return this;}public int getUserId() {
return userId;}public Event setUserId(int userId) {
this.userId = userId;return this;}public int getEntityType() {
return entityType;}public Event setEntityType(int entityType) {
this.entityType = entityType;return this;}public int getEntityId() {
return entityId;}public Event setEntityId(int entityId) {
this.entityId = entityId;return this;}public int getEntityUserId() {
return entityUserId;}public Event setEntityUserId(int entityUserId) {
this.entityUserId = entityUserId;return this;}public Map<String, Object> getData() {
return data;}public Event setData(String key,Object value) {
this.data.put(key, value);return this;}
}
4.2 写生产者与消费者
@Component
public class EventProducer {
@Autowiredprivate KafkaTemplate kafkaTemplate;//处理事件public void fireEvent(Event event){
//将事件发布到指定的主题kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));}
}
@Component
public class EventConsumer implements CommunityConstant {
private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);@Autowiredprivate MessageService messageService;@KafkaListener(topics = {
TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})public void handleCommentMessage(ConsumerRecord record){
if(record == null || record.value()==null){
logger.error("消息的内容为空");return;}Event event = JSONObject.parseObject(record.value().toString(),Event.class);if(event==null){
logger.error("消息格式错误");return ;}//发送站内通知Message message = new Message();message.setFromId(SYSTEM_USER_ID);message.setToId(event.getEntityUserId());message.setConversationId(event.getTopic());message.setCreateTime(new Date());Map<String, Object> content = new HashMap<>();content.put("userId", event.getUserId());content.put("entityType", event.getEntityType());content.put("entityId", event.getEntityId());if(!event.getData().isEmpty()){
for(Map.Entry<String,Object> entry : event.getData().entrySet()){
content.put(entry.getKey(), entry.getValue());}}message.setContent(JSONObject.toJSONString(content));messageService.addMessage(message);}
}
4.3 在对应事件发生时调用生产者(Controller里修改)
消费者是被动出发的,只要topic里有数据就会自动调用,我们需要主动调用生产者
在触发事件的三个Controller里调用:CommentController、LikeController、FollowController
1.注入生产者
@Autowiredprivate EventProducer eventProducer;
2.在触发事件后封装事件
commentService.addComment(comment);//添加评论之后触发事件!
Event event = new Event().setTopic(TOPIC_COMMENT).setUserId(hostHolder.getUser().getId()).setEntityType(comment.getEntityType()).setEntityId(comment.getEntityId()).setData("postId", discussPostId);
3.处理事件
eventProducer.fireEvent(event);
4.4 启动
Kafka如果启动报错什么锁死,把kafka-logs删掉再重启(上面配置过日志存放的位置)
报错了,看第一句话报的什么错
往下拉看到自己编写的程序,点进去看报什么错(因为底层代码一般比较严谨不会出错)
5 显示系统通知
5.1 通知列表
数据层-业务层-视图层-动态模板,分别加方法