当前位置: 代码迷 >> 综合 >> springboot webflux websocket+redis实现集群聊天室
  详细解决方案

springboot webflux websocket+redis实现集群聊天室

热度:60   发布时间:2024-02-20 13:10:39.0

由于websocket的session不能直接序列化然后存储到redis之类的缓存数据库中实现session共享,本例采用的是redis的发布/订阅机制来实现集群聊天室,原理就是当其中一个节点接收到消息时做处理(ps:消息中带用户信息用于判断用户在哪个节点),并将消息发布到对应的channel,订阅了这个channel的都会收到消息,接收到消息的节点判断用户是不是缓存在当前节点中,然后做对应消息处理就行,直接上代码吧!

pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.3.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>webflux-websocket-chat</artifactId><version>0.0.1-SNAPSHOT</version><name>webflux-websocket-chat</name><description>Demo project for Spring Boot</description><properties></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.44</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.tuckey</groupId><artifactId>urlrewritefilter</artifactId><version>4.0.4</version></dependency><!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-redis --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>2.3.3.RELEASE</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>4.6.1</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

配置文件application.yml

# 服务端口
server:port: 8081# redis ip 端口
spring:redis:host: 127.0.0.1port: 6379# redis聊天topic
chat:topic: chatTopic# 日志打印
logging:level:ROOT: info

redis工具类 RedisUtil

package com.example.webflux.util;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.util.Map;/*** @program: webflux-websocket-chat* @description:* @author: 71ang~* @create: 2020-07-14 14:45* @vsersion: V1.0*/
@Component
public class RedisUtil {
    @Autowiredprivate RedisTemplate redisTemplate;public static RedisTemplate redis;@PostConstructpublic void getRedisTemplate() {
    redis = this.redisTemplate;}public static Map getOfMap(String key) {
    return redis.opsForHash().entries(key);}public static void putOfMap(String key, Map map) {
    redis.opsForHash().putAll(key, map);}public static boolean delete(String key) {
    return redis.delete(key);}public static void convertAndSend(String channel, Object message) {
    redis.convertAndSend(channel,message);}
}

channel配置类

package com.example.webflux.redis;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;/*** @program: webflux-websocket-chat* @description: 聊天配置* @author: Yang Mingqiang* @create: 2020-09-17 14:58* @vsersion: V1.0*/
@Data
@Component
@ConfigurationProperties(prefix = "chat")
public class ChatConfig {
    private String topic;
}

redis监听容器

package com.example.webflux.redis;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;/*** @program: webflux-websocket-chat* @description:* @author: 71ang~* @create: 2020-07-14 19:00* @vsersion: V1.0*/
@Component
public class RedisListener {
    @Autowiredprivate ChatConfig chatConfig;@BeanRedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 所有订阅该主题的节点都能收到消息container.addMessageListener(listenerAdapter, new PatternTopic(chatConfig.getTopic()));return container;}
}

redis监听消息处理

package com.example.webflux.redis;import com.alibaba.fastjson.JSONObject;
import com.example.webflux.websocket.ChatHandler;
import com.example.webflux.websocket.WebSocketClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;import java.util.Arrays;
import java.util.Map;/*** @program: webflux-websocket-chat* @description: redis消息订阅发布* @author: 71ang~* @create: 2020-07-14 18:53* @vsersion: V1.0*/
@Slf4j
@Component
public class RedisListenerHandler extends MessageListenerAdapter {
    @Autowiredprivate RedisTemplate<String, String> redisTemplate;/*** 消息订阅处理* @param message* @param pattern*/@Overridepublic void onMessage(Message message, byte[] pattern) {
    JdkSerializationRedisSerializer serializer = new JdkSerializationRedisSerializer();byte[] body = message.getBody();String rawMsg;try {
    rawMsg = String.valueOf(serializer.deserialize(body));JSONObject msgObj = JSONObject.parseObject(rawMsg);String roomName = msgObj.getString("roomName");String userId = msgObj.getString("userId");String msg = msgObj.getString("message");// 发送消息ChatHandler.sendToAll(roomName,userId,msg);} catch (Exception e) {
    return;}}
}

spring上下文工具类

package com.example.webflux.util;import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;/*** @program: vue-cli-rest* @description: spring上下文工具类* @author: Yang Mingqiang* @create: 2020-07-06 09:56* @vsersion: V1.0*/
@Component
public class SpringContextUtil implements ApplicationContextAware {
    private static ApplicationContext applicationContext;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    SpringContextUtil.applicationContext = applicationContext;}/*** Description:* 〈获取applicationContext〉*[]* @return : org.springframework.context.ApplicationContext*/public static ApplicationContext getApplicationContext() {
    return applicationContext;}/*** Description:* 〈通过name获取 Bean.〉*[name]* @return : java.lang.Object*/public static Object getBean(String name) {
    return getApplicationContext().getBean(name);}/*** Description:* 〈通过class获取Bean.〉*[clazz]* @return : T*/public static <T> T getBean(Class<T> clazz) {
    return getApplicationContext().getBean(clazz);}/*** Description:* 〈通过name,以及Clazz返回指定的Bean〉*[name, clazz]* @return : T*/public static <T> T getBean(String name, Class<T> clazz) {
    return getApplicationContext().getBean(name, clazz);}
}

websocket配置类

package com.example.webflux.websocket;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;import java.util.HashMap;
import java.util.Map;@Configuration
public class WebSocketConfig {
    @Beanpublic HandlerMapping handlerMapping() {
    Map<String, WebSocketHandler> map = new HashMap<>();map.put("/chat", new ChatHandler());SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();mapping.setUrlMap(map);mapping.setOrder(-1);return mapping;}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {
    return new WebSocketHandlerAdapter();}
}

WebSocketClient

package com.example.webflux.websocket;import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.FluxSink;import java.io.Serializable;/*** @program: webflux-websocket-chat-websocket-chat* @description:* @author: 71ang~* @create: 2020-07-14 13:39* @vsersion: V1.0*/
@Slf4j
@Data
public class WebSocketClient implements Serializable {
    private static final long serialVersionUID = 3126044575672218399L;private FluxSink<WebSocketMessage> sink;private WebSocketSession session;public WebSocketClient(FluxSink<WebSocketMessage> sink, WebSocketSession session) {
    this.sink = sink;this.session = session;}public void sendData(String data) {
    sink.next(session.textMessage(data));}
}

ChatHandler

package com.example.webflux.websocket;import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSONObject;
import com.example.webflux.redis.ChatConfig;
import com.example.webflux.util.RedisUtil;
import com.example.webflux.util.SpringContextUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;@Slf4j
public class ChatHandler implements WebSocketHandler {
    public static ConcurrentHashMap<String,Map<String, WebSocketClient>> roomCacheMap = new ConcurrentHashMap<>();@Overridepublic Mono<Void> handle(WebSocketSession session) {
    HandshakeInfo handshakeInfo = session.getHandshakeInfo();InetSocketAddress remoteAddress = handshakeInfo.getRemoteAddress();String params = handshakeInfo.getUri().getQuery();// Map<String, String> paramMap = Stream.of(params.split("&")).collect(Collectors.toMap(param -> param.split("=")[0], param -> param.split("=")[1]));HashMap<String, String> paramMap = HttpUtil.decodeParamMap(params, "UTF-8");String roomName = paramMap.get("roomName");String userId = paramMap.get("userId");//出站Mono<Void> output = session.send(Flux.create(sink -> handleClient(roomName, userId, new WebSocketClient(sink, session))));//入站Mono<Void> input = session.receive().doOnSubscribe(conn -> {
    log.info("建立连接:{},用户ip:{},房间号:{},用户:{}", session.getId(),remoteAddress.getHostName(), roomName, userId);}).doOnNext(msg -> {
    String message = msg.getPayloadAsText();broadcast(roomName, userId, message);}).doOnComplete(() -> {
    log.info("关闭连接:{}", session.getId());exitRoom(session, roomName, userId);}).doOnCancel(() -> {
    log.info("关闭连接:{}", session.getId());exitRoom(session, roomName, userId);}).then();return Mono.zip(input, output).then();}private void exitRoom(WebSocketSession session, String roomName, String userId) {
    session.close().toProcessor().then();broadcast(roomName, userId, "退出房间!");removeUser(roomName, userId);}private void removeUser(String roomName, String userId) {
    log.info("用户:{},退出房间:{}!", userId, roomName);Map<String, WebSocketClient> socketClientCacheMap = roomCacheMap.get(roomName);socketClientCacheMap.remove(userId);if (socketClientCacheMap.isEmpty()) {
    log.info("房间:{}没人了,关闭房间!", roomName);roomCacheMap.remove(roomName);}}private void handleClient(String roomName, String userId, WebSocketClient client) {
    if (!roomCacheMap.containsKey(roomName)) {
    log.info("用户:{},创建房间:{}", userId, roomName);Map<String, WebSocketClient> socketClientCacheMap = new HashMap<>();socketClientCacheMap.put(userId, client);roomCacheMap.put(roomName, socketClientCacheMap);} else {
    Map<String, WebSocketClient> socketClientCacheMap = roomCacheMap.get(roomName);if (!socketClientCacheMap.containsKey(userId)) {
    log.info("用户:{},进入房间:{}", userId, roomName);socketClientCacheMap.put(userId, client);}}}/*** 发布消息广播*/public void broadcast(String roomName, String userId, String message) {
    JSONObject msgObj = new JSONObject();msgObj.put("roomName",roomName);msgObj.put("userId",userId);msgObj.put("message",message);ChatConfig chatConfig = SpringContextUtil.getBean(ChatConfig.class);RedisUtil.convertAndSend(chatConfig.getTopic(),msgObj.toJSONString());sendToAll(roomName, userId, message);}/*** 发送消息给除了自己的所有用户* @param roomName* @param userId* @param message*/public static void sendToAll(String roomName, String userId, String message) {
    Map<String, WebSocketClient> clients = roomCacheMap.get(roomName);clients.forEach((user, client) -> {
    if (!userId.equals(user)) {
    log.info("用户:{}发送消息:{}",userId,message);client.sendData(userId + ":" + message);}});}
}

在线websocket测试

在这里插入图片描述
在这里插入图片描述

源码github地址

注意事项

1、环境为jdk11
2、其中使用了lombok,要安装lombok插件
3、其中有一些依赖可能会下载不到,推荐两个maven镜像地址
在maven的setting.xml里添加:

<mirror><id>alimaven</id><name>aliyun maven</name><url>http://maven.aliyun.com/nexus/content/groups/public/</url><mirrorOf>central</mirrorOf></mirror>
<mirror><id>repo2</id><name>Mirror from Maven Repo2</name><url>http://repo2.maven.org/maven2/</url><mirrorOf>central</mirrorOf>
</mirror>