当前位置: 代码迷 >> 综合 >> RabbitMQ之Publish/Subscribe
  详细解决方案

RabbitMQ之Publish/Subscribe

热度:90   发布时间:2023-12-08 17:42:53.0

在这部分我们将做一些完全不一样的事情—我们将分发一条消息到多个消费者。这种模式被称作”publish/subscribe”。

为了阐明这种模式,我们打算创建一个简单的日志系统。它将由两部分程序组成—第一部分发送日志信息,第二部分将接收日志信息并打印它们。

在我们的日志系统中每一个接收程序的运行副本都将得到这消息。这样我们将能够运行一个接收程序将日志信息保存到磁盘,同时我们能够运行其它的接收程序并在屏幕上看这些日志信息。

本来,已发布的日志信息就是打算对所有的接收者进行广播的。

Exchanges
在前面的那篇文章中我们发送消息到队列中并从队列中接收消息。现在是时候介绍RabbitMQ中完整的消息发送模型。

让我们快速回顾前篇文章中涉及到的部分
一个生产者是一个发送消息的应用
一个队列是一个存储消息的缓存
一个消费者是一个接收消息的应用

RabbitMQ中消息发送模型的核心理念是生成者从不直接发送消息到队列中。实际上,生产者甚至经常不知道消息是否已经被分发到其队列。

生产者仅能够发送消息到一个exchange。exchange是一个非常简单的事物。在它的一侧从生产者接收消息,在它的另一侧将这些消息添加到队列中。exchange必须准确的了解如何处理它接收到的消息。它应该被追加到一个指定的队列中?还是应该被添加到多个队列中?又或者它应该被丢弃。这些规则是由exchange的类型类决定的。

这里写图片描述

exchange 的类型有以下几种:
directtopicheadersfanout
这篇文章我们主要关注最后一个类型fanout。现在我们创建一个fanout类型的exchange,并将它命名为logs

channel.exchangeDeclare("logs","fanout");

fanout类型的exchange是非常简单的,它仅广播它接收的全部消息到它已知的所有队列中

无名的exchange
在前面的文章中我们对exchange一无所知,但仍然能成功的发送消息到队列中。之所以可以是因为我们利用了一个默认的exchange,在代码中我们利用空字符串(“”)表示这个exchange。

回忆我们前面是怎么发布消息的

channel.basicPublish("",QUEUE_NAME,null,message.getByte());

这方法的第一个参数就是exchange的名称,这空字符串表示默认或无名的exchange。如果routingKey参数存在,则消息将根据它指定的路径发送到对应的队列中。

现在我们能发布消息到我们命名的exchange

channel.basicPublish("logs", "", null, message.getBytes());

临时队列
你可能还记得我们前面利用的队列都有一个特殊的名称。能够为队列命名对我们将工作者指定到相同的队列是重要的。当我们需要在生成者、消费者之间共享一个队列时,得到一个队列名称是重要的 。

但在我们的日志系统没有这种情况,我们想获取到所有的日志信息,而不是这些信息集的子集。我们仅对当前信息流感兴趣,而不是旧的那些。为了解决这种情况我们需要做两件事。

首先,无论何时我们连接Rabbit我们都需要一个新的、空的队列。为了这样做我们可以利用一个随机的名称创建一个队列,更好的是,让RabbitMQ 服务为我们选着一个随机的队列名称。

其次,一旦我们断开消费者,队列应该自动被删掉。
在java客户端,我们提供无参的queueDeclare()创建未持久化、唯一的、自动删除的队列并得到一个生成的名称

String queueName = channel.queueDeclare().getQueue();

queueName包含一个随机的队列名称,例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg.

绑定
这里写图片描述

我们已经创建了一个fanout类型的exchange和一个队列。现在我们需要告诉exchange发送消息到我们的队列中。介于exchange和队列之间的关系称作binding

channel.queueBind(queueNama, "logs","");

现在logs exchange 将追加信息到队列中

这里写图片描述

生产者发送日志信息的过程看起来与前篇文件中的过程有许多不同的。这最重要的调整是我们现在想发送日志信息到logs exchange 替代 无名的exchange 。当发送的时候我们需要提供一个routingKey,但它的值被fanout类型的exchange忽略了。


import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;public class EmitLog {
    private final static String EXCHANGE_NAME  = "logs";public static void main(String[] args) throws java.io.IOException,java.util.concurrent.TimeoutException{ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = getMessage(args);channel.basicPublish(EXCHANGE_NAME,"", null, message.getBytes());System.out.println("[x] Sent'"+message+"'");channel.close();connection.close();}private static String getMessage(String[] args) {if(args.length < 1) {return "Hello World!";}return joinString(args," ");}private static String joinString(String[] args, String delimiter) {int len = args.length;if(len==0) return "";StringBuilder sbd = new StringBuilder(args[0]);for(int i=1; i<len; i++) {sbd.append(delimiter).append(args[i]);          }return sbd.toString();}}

如你看到的那样,在建立连接之后我们声明了exchange,这一步是必须的,发送消息到一个不存在的exchange上是不被允许的。

如果队列还没有被绑定到这个exchange上,则消息将会丢失,但这对我们是没有问题的。如果消费者还没有开始监听则我们可以安全的删除消息。


import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.lang.InterruptedException;import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;public class ReceiveLogs {
    private final static String EXCHANGE_NAME = "logs";public static void main(String[] args)throws java.io.IOException,java.util.concurrent.TimeoutException,java.lang.InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME,"fanout");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException {String message = new String(body,"UTF-8");System.out.println(" [x] Received '" + message + "'");}};boolean autoAsk = true;channel.basicConsume(queueName, autoAsk, consumer);}private static void doWork(String task) throws InterruptedException{for (char ch: task.toCharArray()) {if (ch=='.') Thread.sleep(1000);}}
}

编译代码

javac -cp %CP% EmitLog.java ReceiveLogs.java

如果你想保存日志到文件中,可以打开cmd命令行执行如下命令

java -cp %CP% ReceiveLogs > logs_from_rabbit.log

如果你想在屏幕中看日志信息

java -cp %CP% ReceiveLogs

新开一个cmd命令窗口执行如下命令

E:\RabbitMQ>java -cp %CP% EmitLog test
[x] Sent'test'

两个消费者窗口中看到的效果
这里写图片描述

  相关解决方案