一、订阅模型简介
在之前的操作中,生产者生产的消息每次在被其中一个消费者消费后,都会删除掉该信息数据,如果说我想要我一条消息让多个消费者都可以使用呢?所以订阅模式应运而生。
x:交换机/转发器 exchange
…
含义:
1、一个生产者,多个消费者
2、之前的生产者产生消息是直接发送给消息队列,由消息队列发送至各个消费者;现在的模式是生产者将消息发送给 交换机 ,再通过 交换机 发送给各项 消息队列。
3、以前是一个消息队列,并且一个消息队列发送消息给多个消费者;现在是一个消费者一个消息队列。
应用场景举栗:
注册验证码–发送邮件、短信
二、类比
与之前的模式(work queue、简单模式)的区别:
相比较而言,发现多了一个转发器。
为什么会多了个转发器,而不是消息队列保存数据呢?
消息队列中的消息是保存在内存中的,每当消息队列确认消费者收到消息(autoAck = false)或者消息队列发送消息给消费者(autoAck = true)后,会将消息从消息队列中移除,如果要保证多个消费者都能够收到相同的消息,就需要给每个消费者分配一个消息队列,每个消息队列中的消息都一样,所以需要采取==转发器(exchange)==实现功能。
他的交换机类型如下所示
三、实现demo
以前我们是消息生产者,生产消息后,直接发送消息至消息队列中,但我们使用消息订阅模式后,则是先向转发器exchange中推送消息,所以此时的通道为转发器通道,所以按照以前的方式来说,我们需要把通道设置要进行变动。
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import cn.linkpower.util.MqConnectUtil;public class Send {
private static final String exchange_name = "test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {
//创建连接对象Connection mqConnection = MqConnectUtil.getMqConnection();//创建通信管道Channel channel = mqConnection.createChannel();//申明交换机(分发类型)channel.exchangeDeclare(exchange_name, BuiltinExchangeType.FANOUT);//发送消息String msg = "hello world";// 只需要向“交换机”中发数据,至于数据会被推送到哪个队列,由交换机决定channel.basicPublish(exchange_name, "", null, msg.getBytes());//关闭流channel.close();mqConnection.close();}
}
上述代码运行之后,我们可以查看消息信息。
- 访问控制台
- 找到我们设定的转发器
但是此时我们发现,我们的消息没有了。。。。
在rabbitmq中,消息转发器exchange并没有消息的保存能力,只有消息队列才有存储能力。
再加上没有消息队列绑定到这个消息转发器上,所以消息丢失了。
那我们现在就创建一个消费者绑定消息队列和消息交换器。
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import cn.linkpower.util.MqConnectUtil;public class GetMsg1 {
private static final String queue_name = "test_queue_name_email";private static final String exchange_name = "test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接对象Connection mqConnection = MqConnectUtil.getMqConnection();// 创建通信管道final Channel channel = mqConnection.createChannel();// 声明队列channel.queueDeclare(queue_name, false, false, false, null);// 绑定消息交换机(将消息交换器和消息队列进行绑定)channel.queueBind(queue_name, exchange_name, "");// 获取消息// 公平分发---每次只分发一个消息channel.basicQos(1);// 4、信访室接受消息DefaultConsumer consumer = new DefaultConsumer(channel) {
@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException {
String message = new String(body, "UTF-8");System.out.println(" get msg "+queue_name+" = " + message);try {
Thread.sleep(1000);} catch (InterruptedException e) {
e.printStackTrace();} finally {
System.out.println("get msg "+queue_name+" done");// 公平分发--- 消费完成后,需要做相关的回执信息channel.basicAck(envelope.getDeliveryTag(), false);}}};// 5、创建监听// 公平分发--- 同时需要关闭自动“应答”channel.basicConsume(queue_name, false, consumer);}
}
创建第二个消息消费者
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import cn.linkpower.util.MqConnectUtil;public class GetMsg2 {
private static final String queue_name = "test_queue_name_sms";private static final String exchange_name = "test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接对象Connection mqConnection = MqConnectUtil.getMqConnection();// 创建通信管道final Channel channel = mqConnection.createChannel();// 声明队列channel.queueDeclare(queue_name, false, false, false, null);// 绑定消息交换机(将消息交换器和消息队列进行绑定)channel.queueBind(queue_name, exchange_name, "");// 获取消息// 公平分发---每次只分发一个消息channel.basicQos(1);// 4、信访室接受消息DefaultConsumer consumer = new DefaultConsumer(channel) {
@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)throws IOException {
String message = new String(body, "UTF-8");System.out.println(" get msg "+queue_name+" = " + message);try {
Thread.sleep(1000);} catch (InterruptedException e) {
e.printStackTrace();} finally {
System.out.println("get msg "+queue_name+" done");// 公平分发--- 消费完成后,需要做相关的回执信息channel.basicAck(envelope.getDeliveryTag(), false);}}};// 5、创建监听// 公平分发--- 同时需要关闭自动“应答”channel.basicConsume(queue_name, false, consumer);}
}
启动 GetMsg1和GetMsg2,再启动Send,发现两边消息队列名称不一致时,依旧可以获得一样的消息。
在消息消费者中有个很重要的参数设置,
channel.queueBind(queue_name, exchange_name, "");
这里是将消息队列和消息转发器进行绑定操作。
查看控制台发现绑定信息: