当前位置: 代码迷 >> Exchange >> RabbitMQ安装使用(直接互换direct exchange)
  详细解决方案

RabbitMQ安装使用(直接互换direct exchange)

热度:481   发布时间:2016-05-02 06:33:27.0
RabbitMQ安装使用(直接交换direct exchange)

1.简介

? ? ? ? ?RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP高级消息队列,说白了就是一个开源的消息中间件。它能解决不同组件、模块、系统间消息通信。

?

2.系统架构

RabbitMQ Server: 也叫broker server,它不是运送食物的卡车,而是一种传输服务。原话是RabbitMQisn’t a food truck, it’s a delivery service. 他的角色就是维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。

Producer:数据的发送方,create messages and publish (send) them to a broker server (RabbitMQ)。

Consumer:数据的接收方,Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。

Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。

?

Channels: 虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。

那么,为什么使用Channel,而不是直接使用TCP连接?

?

?

? ? 对于OS来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。

?

对于一个数据从Producer到Consumer的正确传递,还有三个概念需要明确:exchanges, queues and bindings。

?

? ? ? ? Exchanges are where producers publish their messages.

?

? ? ? ? Queues are where the messages end up and are received by consumers

?

?

? ? ? ? Bindings are how the messages get routed from the exchange to particular queues.

?

Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定。

? ? ?有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法(routing algorithm)。

Direct exchange: 如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。

Fanout exchange: 会向响应的queue广播。

Topic exchange: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue。

?

Consumer和Procuder都可以通过 queue.declare 创建queue。如果queue已经存在,也不会报错。如果没有,要么发送不了消息,要么取不到消息,所以还是都创建吧。

?

Bindings就是将通过Exchange将queue和routing keys绑定。

?

3.应用开发测试

我们使用直接交换(direct exchange)模式,这种方式有效实现点对点发送。比如发送方:系统分别给每个组织机构或用户发送信息,接收方:每个组织机构或用户各自接收自己的消息。

RabbitMQ服务端搭建(windows环境)参考附件:

a.它由erlang开发,要安装erlang依赖otp_win32_R16B02.exe

b.rabbitmq服务端rabbitmq-server-3.2.0.exe,默认端口5672,要改的话,代码里得显示指定。

c.RabbitMQ客户端对种语言支持良好,这里我用Java,下载java开发包rabbitmq-client.jar,commons-cli-1.1.jar,commons-io-1.2.jar

producer代码:

?

import java.io.BufferedReader;import java.io.InputStreamReader;import java.util.Date;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;/** * 测试RabbitMQ发送 */public class SendTest {		private static String HOST = "172.16.6.180";	private static String EXCHANGE_NAME = "temp";	private static String[] BINDINGS_QUEUE_NAMES = { "user001","user002","user003","user004","user005","user006","user007","user008","user009","user010"};	public static void main(String[] args) throws Exception {		BufferedReader br = null;		String message = null;		String flag = "";		// 建立连接		ConnectionFactory factory = new ConnectionFactory();		factory.setHost(HOST);		Connection connection = factory.newConnection();		Channel channel = connection.createChannel();		System.out.println("连接成功");		System.out.println("声明持久化的direct交换机....");		System.out.println("声明持久化队列并绑定...");		// 声明此交换器为全广播并且持久化		channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);		for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) {			channel.queueDeclare(BINDINGS_QUEUE_NAMES[i], true, false, false, null);			channel.basicQos(1);			channel.queueBind(BINDINGS_QUEUE_NAMES[i], EXCHANGE_NAME, BINDINGS_QUEUE_NAMES[i]);			System.out.println("队列" + BINDINGS_QUEUE_NAMES[i] + "绑定成功!");		}				while (true) {			System.out.println("请选择发送方式:1:全部发送;2:指定发送;");			br = new BufferedReader(new InputStreamReader(System.in));			flag = br.readLine();			if ("1".equals(flag)) {				System.out.println("发送内容为:这是一条测试数据");				for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) {					// 发送消息					byte[] buffer = ("这是一条测试数据"+BINDINGS_QUEUE_NAMES[i]+new Date()).getBytes("utf-8");					channel.basicPublish(EXCHANGE_NAME, BINDINGS_QUEUE_NAMES[i], MessageProperties.PERSISTENT_TEXT_PLAIN, buffer);					String ss = new String(buffer, "utf-8");					System.out.println(ss);				}				System.out.println("发送完毕!");			} else if("2".equals(flag)) {				System.out.println("请选择您需要发送的队列序号,以','隔开:");				for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) {					System.out.println((i+1) + ":" + BINDINGS_QUEUE_NAMES[i]);				}				br = new BufferedReader(new InputStreamReader(System.in));				String[] indexs = br.readLine().split(",");				System.out.println("请输入您要发送的消息:");				br = new BufferedReader(new InputStreamReader(System.in));				message = br.readLine();				for (int i = 0; i < indexs.length; i++) {					// 发送消息					channel.basicPublish(EXCHANGE_NAME, BINDINGS_QUEUE_NAMES[Integer.valueOf(indexs[i])-1], MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));				}				System.out.println("发送完毕!");			}		}			}	}

发送方控制台:

?

连接成功声明持久化的direct交换机....声明持久化队列并绑定...队列user001绑定成功!队列user002绑定成功!队列user003绑定成功!队列user004绑定成功!队列user005绑定成功!队列user006绑定成功!队列user007绑定成功!队列user008绑定成功!队列user009绑定成功!队列user010绑定成功!请选择发送方式:1:全部发送;2:指定发送。1发送内容为:这是一条测试数据这是一条测试数据user001Wed Apr 22 10:37:00 CST 2015这是一条测试数据user002Wed Apr 22 10:37:00 CST 2015这是一条测试数据user003Wed Apr 22 10:37:00 CST 2015这是一条测试数据user004Wed Apr 22 10:37:00 CST 2015这是一条测试数据user005Wed Apr 22 10:37:00 CST 2015这是一条测试数据user006Wed Apr 22 10:37:00 CST 2015这是一条测试数据user007Wed Apr 22 10:37:00 CST 2015这是一条测试数据user008Wed Apr 22 10:37:00 CST 2015这是一条测试数据user009Wed Apr 22 10:37:00 CST 2015这是一条测试数据user010Wed Apr 22 10:37:00 CST 2015发送完毕!请选择发送方式:1:全部发送;2:指定发送。

?RabbitMQ可视化查看:http://localhost:15672/#/queues


?

Receive代码:

?

import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;/** * 测试RabbitMQ接收 */public class ReceTest {		private static String HOST = "172.16.6.180";	private static String EXCHANGE_NAME = "temp";	private static String[] BINDINGS_QUEUE_NAMES = { "user001","user002","user003","user004","user005","user006","user007","user008","user009","user010"};	public static void main(String[] args) throws Exception {		// 建立连接		ConnectionFactory factory = new ConnectionFactory();		factory.setHost(HOST);		Connection connection = factory.newConnection();		final Channel channel = connection.createChannel();		System.out.println("连接成功");		System.out.println("声明持久化的direct交换机....");		System.out.println("声明持久化队列并绑定...");		// 声明交换器,与服务保持一致		channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);		for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) {			channel.queueDeclare(BINDINGS_QUEUE_NAMES[i], true, false, false, null);			channel.basicQos(1);			channel.queueBind(BINDINGS_QUEUE_NAMES[i], EXCHANGE_NAME, BINDINGS_QUEUE_NAMES[i]);			System.out.println("队列" + BINDINGS_QUEUE_NAMES[i] + "绑定成功!");		}		System.out.println("开始接收数据...");		for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) {			final String queue = BINDINGS_QUEUE_NAMES[i];			new Thread(){				public void run() {					try {						receive(channel, queue);					} catch (Exception e) {						e.printStackTrace();					}				}			}.start();		}	}		private static void receive(Channel channel,String QUEUE_NAME) throws Exception {		// 声明消费者		QueueingConsumer consumer = new QueueingConsumer(channel);		channel.basicConsume(QUEUE_NAME, false, consumer);		while (true) {			// 等待队列推送消息			QueueingConsumer.Delivery delivery = consumer.nextDelivery();			String message = new String(delivery.getBody(), "UTF-8");			System.out.println(QUEUE_NAME + " Received '" + message + "'");			// 反馈给服务器表示收到信息			channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);		}	}}

?

?接收方控制台:

?

连接成功声明持久化的direct交换机....声明持久化队列并绑定...队列user001绑定成功!队列user002绑定成功!队列user003绑定成功!队列user004绑定成功!队列user005绑定成功!队列user006绑定成功!队列user007绑定成功!队列user008绑定成功!队列user009绑定成功!队列user010绑定成功!开始接收数据...user010 Received '这是一条测试数据user010Wed Apr 22 10:37:00 CST 2015'user001 Received '这是一条测试数据user001Wed Apr 22 10:37:00 CST 2015'user007 Received '这是一条测试数据user007Wed Apr 22 10:37:00 CST 2015'user006 Received '这是一条测试数据user006Wed Apr 22 10:37:00 CST 2015'user004 Received '这是一条测试数据user004Wed Apr 22 10:37:00 CST 2015'user005 Received '这是一条测试数据user005Wed Apr 22 10:37:00 CST 2015'user002 Received '这是一条测试数据user002Wed Apr 22 10:37:00 CST 2015'user008 Received '这是一条测试数据user008Wed Apr 22 10:37:00 CST 2015'user009 Received '这是一条测试数据user009Wed Apr 22 10:37:00 CST 2015'user003 Received '这是一条测试数据user003Wed Apr 22 10:37:00 CST 2015'

?再看?http://localhost:15672/#/queues



?关于RabbitMQ服务端搭建,及可视化页面配置,请参考附件RabbitMQ安装与配置。

参考网站:

http://blog.csdn.net/anzhsoft/article/details/19563091

http://my.oschina.net/OpenSourceBO/blog/379732

1 楼 freezingsky 20 小时前  
难得看开一两篇讲得不错的文章!
  相关解决方案