当前位置: 代码迷 >> 综合 >> RabbitMQ:交换机(fanout exchange)
  详细解决方案

RabbitMQ:交换机(fanout exchange)

热度:66   发布时间:2023-12-01 20:11:09.0

首先看看AMQP协议,对RabbitMQ的架构会更了解。

深入理解AMQP协议

创建一个Maven项目,根据自己服务器RabbitMQ的版本导入相应的包。
在这里插入图片描述

		<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version></dependency>

扇型交换机

扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果 N 个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的 N 个队列。

因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以它的应用案例都极其相似:

  • 大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件。
  • 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端。
  • 分发系统使用它来广播各种状态和配置更新。
  • 在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP 没有内置 presence 的概念,因此 XMPP 可能会是个更好的选择)。

扇型交换机图例:
在这里插入图片描述
上图所示,生产者(P)生产消息 1 ,将消息 1 推送到 Exchange,由于 Exchange Type=fanout ,这时候会遵循 fanout exchange的路由规则,将消息推送到所有与它绑定的 Queue,也就是图上的两个 Queue, 最后由监听对应Queue的消费者消费 。

生产端

routingKey = "",因为fanout exchange的路由规则不关心routingKey的值(但是不能为空)。

package com.kaven.rabbitmq.exchange.fanoutExchange;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class FanoutProducer {
    // 自己服务器的IPprivate static String ip = "IP";// RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口private static int port = 5672;// RabbitMQ有一个 "/" 的虚拟主机private static String virtualHost = "/";// fanout exchange ,RabbitMQ提供的fanout exchangeprivate static String exchangeName = "amq.fanout";// exchange typeprivate static String exchangeType= "fanout";// 交换机路由的routingKeyprivate static String routingKey = "";public static void main(String[] args) throws IOException, TimeoutException {
    // 1 创建一个连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(ip);connectionFactory.setPort(port);connectionFactory.setVirtualHost(virtualHost);// 2 创建连接Connection connection = connectionFactory.newConnection();// 3 创建ChannelChannel channel = connection.createChannel();// 4 发送消息String msg = "RabbitMQ:Fanout Exchange 发送数据";channel.basicPublish(exchangeName ,routingKey ,null, msg.getBytes());// 5 关闭连接channel.close();connection.close();}
}

消费端

routingKey = "test",和生产端的routingKey不一样,主要为了看看fanout exchange的路由规则是否如上面解释的一样,这里只创建一个消费者,大家可以试一试多个消费者的情况,其实是一样的。

package com.kaven.rabbitmq.exchange.fanoutExchange;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class FanoutConsumer {
    // 自己服务器的IPprivate static String ip = "IP";// RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口private static int port = 5672;// RabbitMQ有一个 "/" 的虚拟主机private static String virtualHost = "/";// fanout exchange ,RabbitMQ提供的fanout exchangeprivate static String exchangeName = "amq.fanout";// exchange typeprivate static String exchangeType= "fanout";// 队列名private static String queueName = "queue";// 队列与交换机绑定的routingKeyprivate static String routingKey = "test";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
    // 1 创建一个连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(ip);connectionFactory.setPort(port);connectionFactory.setVirtualHost(virtualHost);// 2 创建连接Connection connection = connectionFactory.newConnection();// 3 创建ChannelChannel channel = connection.createChannel();// 4 定义Queue ,将Queue绑定到direct exchangechannel.queueDeclare(queueName,true , false , false , null);channel.queueBind(queueName , exchangeName , routingKey);// 5 创建消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 6 设置channel.basicConsume(queueName , true , consumer);// 7 接收消息while(true){
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println(msg);}}
}

测试

因为这里使用的是RabbitMQ提供给我们的fanout exchange,所以我们无需自己定义。
因为交换机已经定义好了,所以无论先启动生产端还是消费端,消费端都可以成功收到消息。
消费端输出如下:

RabbitMQFanout Exchange 发送数据

看看RabbitMQ Management
在这里插入图片描述

  相关解决方案