- QueueingConsumer实现
- maven依赖
<!--rabbitmq--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.6.0</version></dependency><!-- https://mvnrepository.com/artifact/com.rabbitmq/rabbitmq-client --><dependency><groupId>com.rabbitmq</groupId><artifactId>rabbitmq-client</artifactId><version>1.3.0</version></dependency>
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** RabbitMQ 生产者*/
public class Producer {//队列名称private final static String QUEUE_NAME = "Queue";public static void main(String[] args) {// 创建连接工厂ConnectionFactory factory = null;// 建立到代理服务器到连接Connection connection = null;// 获得通道Channel channel = null;try {factory = new ConnectionFactory();//设置用户名和密码factory.setUsername("xwy");factory.setPassword("123456");// 设置 RabbitMQ 地址factory.setHost("192.168.1.218");factory.setPort(5672);// 建立到代理服务器到连接connection = factory.newConnection();channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "hello world .....";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println("发送 message[" + message + "] to " + QUEUE_NAME + " success!");} catch (Exception e) {e.printStackTrace();} finally {try {// 关闭资源channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}}
}
import com.rabbitmq.client.*;/*** RabbitMQ 消费者*/
public class Consumer {//队列名称private final static String QUEUE_NAME = "Queue";public static void main(String[] args) {// 创建连接工厂ConnectionFactory factory = null;// 建立到代理服务器到连接Connection connection = null;// 获得通道Channel channel = null;try {factory = new ConnectionFactory();factory.setUsername("xwy");factory.setPassword("123456");factory.setHost("192.168.1.218");factory.setPort(5672);// 建立到代理服务器到连接connection = factory.newConnection();channel = connection.createChannel();// 1.队列名2.是否持久化,3是否局限与链接,4不再使用是否删除,5其他的属性channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 声明一个消费者,配置好获取消息的方式QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(QUEUE_NAME, true, consumer);// 循环获取消息while (true) {// 循环获取信息// 指向下一个消息,如果没有会一直阻塞QueueingConsumer.Delivery delivery = consumer.nextDelivery();String msg = new String(delivery.getBody());System.out.println("接收 message[" + msg + "] from " + QUEUE_NAME);}} catch (Exception e) {e.printStackTrace();} finally {try {// 关闭资源channel.close();connection.close();} catch (Exception e) {e.printStackTrace();}}}
}