1.简介
? ? ? ? ?RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP高级消息队列,说白了就是一个开源的消息中间件。它能解决不同组件、模块、系统间消息通信。
?
2.系统架构
RabbitMQ Server: 也叫broker server,它不是运送食物的卡车,而是一种传输服务。原话是RabbitMQisn’t a food truck, it’s a delivery service. 他的角色就是维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。
Producer:数据的发送方,create messages and publish (send) them to a broker server (RabbitMQ)。
Consumer:数据的接收方,Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。
Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。
?
Channels: 虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。
那么,为什么使用Channel,而不是直接使用TCP连接?
?
?
? ? 对于OS来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。
?
对于一个数据从Producer到Consumer的正确传递,还有三个概念需要明确:exchanges, queues and bindings。
?
? ? ? ? Exchanges are where producers publish their messages.
?
? ? ? ? Queues are where the messages end up and are received by consumers
?
?
? ? ? ? Bindings are how the messages get routed from the exchange to particular queues.
?
Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定。
? ? ?有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法(routing algorithm)。
Direct exchange: 如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。
Fanout exchange: 会向响应的queue广播。
Topic exchange: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue。
?
Consumer和Procuder都可以通过 queue.declare 创建queue。如果queue已经存在,也不会报错。如果没有,要么发送不了消息,要么取不到消息,所以还是都创建吧。
?
Bindings就是将通过Exchange将queue和routing keys绑定。
?
3.应用开发测试
我们使用直接交换(direct exchange)模式,这种方式有效实现点对点发送。比如发送方:系统分别给每个组织机构或用户发送信息,接收方:每个组织机构或用户各自接收自己的消息。
RabbitMQ服务端搭建(windows环境)参考附件:
a.它由erlang开发,要安装erlang依赖otp_win32_R16B02.exe
b.rabbitmq服务端rabbitmq-server-3.2.0.exe,默认端口5672,要改的话,代码里得显示指定。
c.RabbitMQ客户端对种语言支持良好,这里我用Java,下载java开发包rabbitmq-client.jar,commons-cli-1.1.jar,commons-io-1.2.jar
producer代码:
?
import java.io.BufferedReader;import java.io.InputStreamReader;import java.util.Date;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;/** * 测试RabbitMQ发送 */public class SendTest { private static String HOST = "172.16.6.180"; private static String EXCHANGE_NAME = "temp"; private static String[] BINDINGS_QUEUE_NAMES = { "user001","user002","user003","user004","user005","user006","user007","user008","user009","user010"}; public static void main(String[] args) throws Exception { BufferedReader br = null; String message = null; String flag = ""; // 建立连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); System.out.println("连接成功"); System.out.println("声明持久化的direct交换机...."); System.out.println("声明持久化队列并绑定..."); // 声明此交换器为全广播并且持久化 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) { channel.queueDeclare(BINDINGS_QUEUE_NAMES[i], true, false, false, null); channel.basicQos(1); channel.queueBind(BINDINGS_QUEUE_NAMES[i], EXCHANGE_NAME, BINDINGS_QUEUE_NAMES[i]); System.out.println("队列" + BINDINGS_QUEUE_NAMES[i] + "绑定成功!"); } while (true) { System.out.println("请选择发送方式:1:全部发送;2:指定发送;"); br = new BufferedReader(new InputStreamReader(System.in)); flag = br.readLine(); if ("1".equals(flag)) { System.out.println("发送内容为:这是一条测试数据"); for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) { // 发送消息 byte[] buffer = ("这是一条测试数据"+BINDINGS_QUEUE_NAMES[i]+new Date()).getBytes("utf-8"); channel.basicPublish(EXCHANGE_NAME, BINDINGS_QUEUE_NAMES[i], MessageProperties.PERSISTENT_TEXT_PLAIN, buffer); String ss = new String(buffer, "utf-8"); System.out.println(ss); } System.out.println("发送完毕!"); } else if("2".equals(flag)) { System.out.println("请选择您需要发送的队列序号,以','隔开:"); for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) { System.out.println((i+1) + ":" + BINDINGS_QUEUE_NAMES[i]); } br = new BufferedReader(new InputStreamReader(System.in)); String[] indexs = br.readLine().split(","); System.out.println("请输入您要发送的消息:"); br = new BufferedReader(new InputStreamReader(System.in)); message = br.readLine(); for (int i = 0; i < indexs.length; i++) { // 发送消息 channel.basicPublish(EXCHANGE_NAME, BINDINGS_QUEUE_NAMES[Integer.valueOf(indexs[i])-1], MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); } System.out.println("发送完毕!"); } } } }
发送方控制台:
?
连接成功声明持久化的direct交换机....声明持久化队列并绑定...队列user001绑定成功!队列user002绑定成功!队列user003绑定成功!队列user004绑定成功!队列user005绑定成功!队列user006绑定成功!队列user007绑定成功!队列user008绑定成功!队列user009绑定成功!队列user010绑定成功!请选择发送方式:1:全部发送;2:指定发送。1发送内容为:这是一条测试数据这是一条测试数据user001Wed Apr 22 10:37:00 CST 2015这是一条测试数据user002Wed Apr 22 10:37:00 CST 2015这是一条测试数据user003Wed Apr 22 10:37:00 CST 2015这是一条测试数据user004Wed Apr 22 10:37:00 CST 2015这是一条测试数据user005Wed Apr 22 10:37:00 CST 2015这是一条测试数据user006Wed Apr 22 10:37:00 CST 2015这是一条测试数据user007Wed Apr 22 10:37:00 CST 2015这是一条测试数据user008Wed Apr 22 10:37:00 CST 2015这是一条测试数据user009Wed Apr 22 10:37:00 CST 2015这是一条测试数据user010Wed Apr 22 10:37:00 CST 2015发送完毕!请选择发送方式:1:全部发送;2:指定发送。
?RabbitMQ可视化查看:http://localhost:15672/#/queues
?
Receive代码:
?
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;/** * 测试RabbitMQ接收 */public class ReceTest { private static String HOST = "172.16.6.180"; private static String EXCHANGE_NAME = "temp"; private static String[] BINDINGS_QUEUE_NAMES = { "user001","user002","user003","user004","user005","user006","user007","user008","user009","user010"}; public static void main(String[] args) throws Exception { // 建立连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); System.out.println("连接成功"); System.out.println("声明持久化的direct交换机...."); System.out.println("声明持久化队列并绑定..."); // 声明交换器,与服务保持一致 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) { channel.queueDeclare(BINDINGS_QUEUE_NAMES[i], true, false, false, null); channel.basicQos(1); channel.queueBind(BINDINGS_QUEUE_NAMES[i], EXCHANGE_NAME, BINDINGS_QUEUE_NAMES[i]); System.out.println("队列" + BINDINGS_QUEUE_NAMES[i] + "绑定成功!"); } System.out.println("开始接收数据..."); for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) { final String queue = BINDINGS_QUEUE_NAMES[i]; new Thread(){ public void run() { try { receive(channel, queue); } catch (Exception e) { e.printStackTrace(); } } }.start(); } } private static void receive(Channel channel,String QUEUE_NAME) throws Exception { // 声明消费者 QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer); while (true) { // 等待队列推送消息 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody(), "UTF-8"); System.out.println(QUEUE_NAME + " Received '" + message + "'"); // 反馈给服务器表示收到信息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }}
?
?接收方控制台:
?
连接成功声明持久化的direct交换机....声明持久化队列并绑定...队列user001绑定成功!队列user002绑定成功!队列user003绑定成功!队列user004绑定成功!队列user005绑定成功!队列user006绑定成功!队列user007绑定成功!队列user008绑定成功!队列user009绑定成功!队列user010绑定成功!开始接收数据...user010 Received '这是一条测试数据user010Wed Apr 22 10:37:00 CST 2015'user001 Received '这是一条测试数据user001Wed Apr 22 10:37:00 CST 2015'user007 Received '这是一条测试数据user007Wed Apr 22 10:37:00 CST 2015'user006 Received '这是一条测试数据user006Wed Apr 22 10:37:00 CST 2015'user004 Received '这是一条测试数据user004Wed Apr 22 10:37:00 CST 2015'user005 Received '这是一条测试数据user005Wed Apr 22 10:37:00 CST 2015'user002 Received '这是一条测试数据user002Wed Apr 22 10:37:00 CST 2015'user008 Received '这是一条测试数据user008Wed Apr 22 10:37:00 CST 2015'user009 Received '这是一条测试数据user009Wed Apr 22 10:37:00 CST 2015'user003 Received '这是一条测试数据user003Wed Apr 22 10:37:00 CST 2015'
?再看?http://localhost:15672/#/queues
?关于RabbitMQ服务端搭建,及可视化页面配置,请参考附件RabbitMQ安装与配置。
参考网站:
http://blog.csdn.net/anzhsoft/article/details/19563091
http://my.oschina.net/OpenSourceBO/blog/379732