当前位置: 代码迷 >> 综合 >> Redis发布订阅模式(publish/subscribe)
  详细解决方案

Redis发布订阅模式(publish/subscribe)

热度:46   发布时间:2024-02-19 15:51:21.0

目录

 

一、概念

二、原理

三、推送的消息格式

四、发布订阅命令

1.发送消息

2.订阅频道

3.模式匹配

4.取消订阅

五、使用Jedis发布订阅命令

六、缺点

七、参考文档


 

 

一、概念

Redis发布/订阅(Pub/Sub)是一种通信机制,将数据推到某个信息管道中,其他客户端可通过订阅这些管道来获取推送信息,以此用于消息的传输。

 

由三部分组成:发布者(Publisher)、频道(Channel)、订阅者(Subscriber)。

 

发布者发布的消息分到不同的频道,不需要知道什么样的订阅者者订阅。订阅者对一个或多个频道感兴趣,只需要接收感兴趣的消息,不需要知道什么样的发布者发布。主要目的是解除消息的发布者与订阅者之间的耦合关系。

 

发布者和订阅者都是Redis客户端,频道则是服务器端。

 

二、原理

Redis通过SUBSCRIBE,PSUBSCRIBE,UNSUBSCRIBE和PUNSUBSCRIBE等命令实现发布和订阅功能。

在Redis底层结构中,客户端和频道的订阅关系是通过一个字典加链表的结构保存的,如下图:

 

 

在 Redis 的底层结构中,redis服务器中定义了一个pubsub_channels字典,用于保存所有频道的订阅关系,在这个字典中,key为所有频道名称,value结构是一个链表,其中存放的是所有订阅这个频道的订阅者客户端。subscribe命令的实质即为在key中添加value的订阅链。

若频道首次被订阅说明在字典中并不存在该渠道的信息,那么程序首先要新建一个对应的 key,并且要赋值一个空链表,然后将对应的客户端加入到链表中。此时链表只有一个元素。若该渠道已经被其他客户端订阅过:这个时候就直接找到key值对应的value客户端信息添加到链表的末尾即可。

 

三、推送的消息格式

所有订阅接收的消息均为由三个元素组成的多块响应。

第一个元素是消息类型,有三种类型:

  • subscribe:

该类型表示成功订阅到频道响应,此时第二个元素为订阅的频道名称,第三个元素为已订阅的频道数量,例:

redis:6379> SUBSCRIBE myChannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "myChannel"
3) (integer) 1
  • unsubscribe:

该类型表示成功取消订阅到的频道响应,此时第二个元素为订阅的频道名称,第三个元素为已订阅的频道数量,例:

redis:6379> UNSUBSCRIBE myChannel
1) "unsubscribe" 
2) "myChannel"  
3) (integer) 0 
  • message:

该类型表示订阅者客户端接收到其他客户端发出的发布命令结果,此时第二个元素表示来源频道的名称,第三个元素是实际的消息内容,例:

redis:6379> SUBSCRIBE myChannel  myChannel2
1) "subscribe" 
2) "myChannel"  
3) (integer) 1
1) "subscribe"
2) "myChannel2"
3) (integer) 2
1) "message"
2) "myChannel1"
3) "Hello World"

注:以上三种消息类型均有对应的按模式订阅指令psubscribe,punsubscribe和pmessage,消息格式的第一个元素变为对应类型。

 

四、发布订阅命令

 

1.发送消息

PUBLISH命令为发送信息,返回值为接收到该消息的订阅者数量。

redis:4379> PUBLISH mychannel "Hello"
(integer) 1
redis:4379> PUBLISH mychannel "World"
(integer) 1

2.订阅频道

SUBSCRIBE命令为订阅频道,返回值消息格式如上所述。

redis:6379> SUBSCRIBE myChannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "myChannel"
3) (integer) 1
1) "message"
2) "myChannel"
3) "Hello"
1) "message"
2) "myChannel"
3) "World"

3.模式匹配

PSUBSCRIBE 命令订阅一个或多个符合给定模式的频道。

每个模式以 * 作为匹配符,例如news.* 匹配所有以 news. 开头的频道( news.one、news.global.today 等等)。

redis:6379> PSUBSCRIBE new.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "new.*"
3) (integer) 1
1) "pmessage"
2) "new.*"      # 消息匹配的模式
3) "new.one"    # 消息本身的目标频道
4) "Hi"

注:当订阅客户端同时订阅某种模式和符合该模式的具体某个频道时,那么会接收到发布者推送的信息两次,两次接收的信息格式不同,一个为message类型,另一个为pmessage类型,消息内容一致。

4.取消订阅

UNSUBSCRIBE命令为取消订阅频道,与订阅频道命令相同,也有对应匹配模式PUNSUBSCRIBE。

如果没有填写指定频道,即一个无参数的UNSUBSCRIBE被调用执行,那么该客户端订阅的所有频道都会被退订。

由于Redis的订阅操作是阻塞式的,因此一旦客户端订阅了某个频道或模式,就将会一直处于订阅状态直到退出。在SUBSCRIBE,PSUBSCRIBE,UNSUBSCRIBE和PUNSUBSCRIBE命令中,其返回值都包含了该客户端当前订阅的频道和模式的数量,当这个数量变为0时,该客户端会自动退出订阅状态。

 

五、使用Jedis发布订阅命令

要使用JedisPublish/Subscribe功能,必须编写对JedisPubSub的自己的实现。

public class PubSubListener extends JedisPubSub{// 取得订阅的消息后的处理public void onMessage(String channel, String message) {//TODO:接收订阅频道消息后,业务处理逻辑System.out.println(channel + "=" + message);}// 初始化订阅时候的处理public void onSubscribe(String channel, int subscribedChannels) {System.out.println(channel + "=" + subscribedChannels);}// 取消订阅时候的处理public void onUnsubscribe(String channel, int subscribedChannels) {System.out.println(channel + "=" + subscribedChannels);}// 初始化按表达式的方式订阅时候的处理public void onPSubscribe(String pattern, int subscribedChannels) {System.out.println(pattern + "=" + subscribedChannels);}// 取消按表达式的方式订阅时候的处理public void onPUnsubscribe(String pattern, int subscribedChannels) {System.out.println(pattern + "=" + subscribedChannels);}// 取得按表达式的方式订阅的消息后的处理public void onPMessage(String pattern, String channel, String message) {System.out.println(pattern + "=" + channel + "=" + message);}}

Jedis有两种订阅模式:subsribe(一般模式设置频道)和psubsribe(使用模式匹配来设置频道)。不管是那种模式都可以设置个数不定的频道。订阅得到信息在将会lister的onMessage(…)方法或者onPMessage(…)中进行进行处理,这里我们只是做了简单的输出。 具体代码见GitHup:https://github.com/granett/Redis/tree/master/src/main/java/com/redis/pubsub

 

public class Subscribe {private Jedis jedis = new Jedis("192.168.1.207",6379);/*** SUBSCRIBE channel [channel ...]* 订阅给定的一个或多个频道的信息*/@Testpublic void subscribe(){final PubSubListener listener = new PubSubListener();jedis.subscribe(listener, "channel");}/*** UNSUBSCRIBE [channel [channel ...]]* 指示客户端退订给定的频道* 如果没有频道被指定,即一个无参数的 UNSUBSCRIBE 调用被执行,* 那么客户端使用 SUBSCRIBE 命令订阅的所有频道都会被退订。* 在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。*/@Testpublic void unsubscribe(){final PubSubListener listener = new PubSubListener();listener.unsubscribe("channel");}/*** PSUBSCRIBE pattern [pattern ...]* 订阅一个或多个符合给定模式的频道* 每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等),* news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。*/@Testpublic void psubscribe(){final PubSubListener listener = new PubSubListener();jedis.psubscribe(listener, "ch*");}/*** PUNSUBSCRIBE [pattern [pattern ...]]* 指示客户端退订所有给定模式* 如果没有模式被指定,即一个无参数的 PUNSUBSCRIBE 调用被执行,* 那么客户端使用 PSUBSCRIBE 命令订阅的所有模式都会被退订。* 在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。*/@Testpublic void punsubscribe(){final PubSubListener listener = new PubSubListener();listener.punsubscribe("ch*");}/*** PUBLISH channel message* 将信息 message 发送到指定的频道 channel* 返回值:接收到信息 message 的订阅者数量*/@Testpublic void publish(){jedis.publish("channel", "bar123");System.out.println("发布消息");}/*** PUBSUB CHANNELS [pattern]* 列出当前的活跃频道。 活跃频道指的是那些至少有一个订阅者的频道, 订阅模式的客户端不计算在内。* pattern 参数是可选的:*    如果不给出 pattern 参数,那么列出订阅与发布系统中的所有活跃频道。*    如果给出 pattern 参数,那么只列出和给定模式 pattern 相匹配的那些活跃频道。*/@Testpublic void PUBSUB(){List<String> list = jedis.pubsubChannels("*");System.out.println(list);}/*** PUBSUB NUMSUB [channel-1 ... channel-N]* 返回给定频道的订阅者数量, 订阅模式的客户端不计算在内。*/@Testpublic void pubsubNumSub(){Map<String,String> map = jedis.pubsubNumSub();System.out.println(map);}/*** PUBSUB NUMPAT* 返回订阅模式的数量。* 注意, 这个命令返回的不是订阅模式的客户端的数量, 而是客户端订阅的所有模式的数量总和。*/@Testpublic void pubsubNumPat(){Long count = jedis.pubsubNumPat();System.out.println(count);}}

 

六、缺点

Redis可以提供基本的发布订阅功能,但毕竟不像消息队列那种专业级别,所以会存在以下缺点:

  1. redis无法对消息持久化存储,消息一旦被发送,如果没有订阅者接收,数据会丢失
  2. 消息队列提供了消息传输保障,当客户端连接超时或事物回滚的等情况发生时,消息会重新发布给订阅者,redis没有该保障,导致的结果就是在订阅者断线超时或其他异常情况时,将会丢失所有发布者发布的信息
  3. 若订阅者订阅了频道,但自己读取消息的速度很慢的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃

 

七、参考文档

Redis的Pub/Sub (发布/订阅)

官网文章