首先看看AMQP
协议,对RabbitMQ
的架构会更了解。
深入理解AMQP协议
创建一个Maven
项目,根据自己服务器RabbitMQ的版本导入相应的包。
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version></dependency>
直连交换机
直连交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应绑定键(我先定义为binding Key,其实是队列与交换机绑定的routing key)的队列。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:
1)将一个队列绑定到某个交换机上时,赋予该绑定一个绑定键(binding Key),假设为R。
2)当一个携带着路由键(routing Key)为R的消息被发送给直连交换机时,交换机会把它路由给绑定键为R的队列。
直连交换机的队列通常是循环分发任务给多个消费者(我们称之为轮询)。比如说有3个消费者,4个任务。分别分发每个消费者一个任务后,第4个任务又分发给了第一个消费者。综上,我们很容易得出一个结论,在 AMQP 0-9-1 中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。
直连交换机图例:
当生产者(P)发送消息时 Rotuing key=booking 时,这时候将消息传送给 Exchange,Exchange 获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的 Queue,这时发现 Queue1 和 Queue2 都符合,就会将消息传送给这两个队列。
如果我们以 Rotuing key=create 和 Rotuing key=confirm 发送消息时,这时消息只会被推送到 Queue2 队列中,其他 Routing Key 的消息将会被丢弃。
生产端
package com.kaven.rabbitmq.exchange.directExchange;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class DirectProducer {
// 自己服务器的IPprivate static String ip = "IP";// RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口private static int port = 5672;// RabbitMQ有一个 "/" 的虚拟主机private static String virtualHost = "/";// direct exchange ,RabbitMQ提供的direct exchangeprivate static String exchangeName = "amq.direct";// exchange typeprivate static String exchangeType= "direct";// 交换机路由的routingKeyprivate static String routingKey = "test";public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建一个连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(ip);connectionFactory.setPort(port);connectionFactory.setVirtualHost(virtualHost);// 2 创建连接Connection connection = connectionFactory.newConnection();// 3 创建ChannelChannel channel = connection.createChannel();// 4 发送消息String msg = "RabbitMQ:Direct Exchange 发送数据";channel.basicPublish(exchangeName ,routingKey ,null, msg.getBytes());// 5 关闭连接channel.close();connection.close();}
}
消费端
package com.kaven.rabbitmq.exchange.directExchange;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class DirectConsumer {
// 自己服务器的IPprivate static String ip = "IP";// RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口private static int port = 5672;// RabbitMQ有一个 "/" 的虚拟主机private static String virtualHost = "/";// direct exchange ,RabbitMQ提供的direct exchangeprivate static String exchangeName = "amq.direct";// exchange typeprivate static String exchangeType= "direct";// 队列名private static String queueName = "queue";// 队列与交换机绑定的routingKeyprivate static String routingKey = "test";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1 创建一个连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(ip);connectionFactory.setPort(port);connectionFactory.setVirtualHost(virtualHost);// 2 创建连接Connection connection = connectionFactory.newConnection();// 3 创建ChannelChannel channel = connection.createChannel();// 4 定义Queue ,将Queue绑定到direct exchangechannel.queueDeclare(queueName,true , false , false , null);channel.queueBind(queueName , exchangeName , routingKey);// 5 创建消费者QueueingConsumer consumer = new QueueingConsumer(channel);// 6 设置channel.basicConsume(queueName , true , consumer);// 7 接收消息while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println(msg);}}
}
测试
因为这里使用的是RabbitMQ
已经给我们提供好了的交换机,所以无论先启动生产端还是消费端,消费端都可以成功收到消息,如果使用的交换机之前没有定义过,我们需要先定义好交换机,才能生产和消费消息。
RabbitMQ
给我们提供了如下图所示的交换机。
当然我们也可以自己来定义一个交换机,方法名和参数看下图,这里就不展开了。
无论先启动生产端还是消费端,我们都可以成功收到消息。
看看RabbitMQ Management
。