RabbitMQ工作队列(work queues)
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,造成消息堆积。利用多个消费者同时消费队列中不同的消息,处理速度快的消费者多分配一些消息,相反,处理速度慢的消费者少分配一下消息。当您运行许多工作任务时,任务将在他们之间共享。所有的消费者将 (注意:不是消费同一条消息,而是在默认情况下将一些消息平均分配给不同的消费者消费,类似于nginx的负载均衡) 消费不同的消息,开启消息自动确认机制时平均分配的,如果关闭自动消息确认机制,利用手动确认,则会实现能者多劳的消费模式,具体详情代码说明。
代码实现
默认平均分配
生产者
package com.mahy.notes.workqueues;import com.mahy.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;/*** @Auther : mahy* @Date : 2020/7/27 21:10* @Description : RabbitMQ的工作队列 参数说明请查看直连模式中的说明*/
public class Provider {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("workqueues",true,false,false,null);for (int i= 1;i<=10;i++){channel.basicPublish("","workqueues",null,"WorkQueues is OK!".getBytes());}RabbitMQUtils.closeConnection(channel,connection);}
}
消费者
消费者1
package com.mahy.workqueue;import com.mahy.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;/*** @Auther : mahy* @Date : 2020/7/27 21:17* @Description : 工作队列消费者1.*/
public class Customer001 {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("workqueuees",true,false,false,null);//消费消息channel.basicConsume("workqueues",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1消费:" + new String(body));}});}
}
消费者2
package com.mahy.workqueue;import com.mahy.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;/*** @Auther : mahy* @Date : 2020/7/27 21:17* @Description : 工作队列消费者1.*/
public class Customer002 {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("workqueuees",true,false,false,null);//消费消息channel.basicConsume("workqueues",true,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2消费:" + new String(body));}});}
}
注意:消费者代码一样。
运行结果
消费者1消费的消息:
消费者2消费的消息:
说明:通过上面的结果可以看出,工作队列默认情况下是平均分配消息给消费者的,但是又会出现另外一种情况:如果一个消费者消费的速度快,另外一个慢,怎么提高消息的消费效率,而不是平均分配消息给每个消费者。下面时如何让工作队列处于能者多劳模式:
能者多劳模式(提高消息被消费的效率)
步骤:
- 关闭消息自动确认机制
- 设置通道每次消费的消息为一条
- 使用手动消息确认机制
代码实现
生产者
package com.mahy.notes.workqueues;import com.mahy.utils.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;/*** @Auther : mahy* @Date : 2020/7/27 21:10* @Description : RabbitMQ的工作队列 代码没有变化.*/
public class Provider {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare("workqueues",true,false,false,null);for (int i= 1;i<=20;i++){channel.basicPublish("","workqueues",null,(i + "WorkQueues is OK!").getBytes());}RabbitMQUtils.closeConnection(channel,connection);}
}
消费者
消费者1
package com.mahy.notes.workqueues;import com.mahy.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;/*** @Auther : mahy* @Date : 2020/7/27 21:17* @Description : 工作队列消费者1.*/
public class Customer001 {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();//每次只能消费一条消息channel.basicQos(1);channel.queueDeclare("workqueuees",true,false,false,null);//消费消息//关闭消息自动自动确认机制:参数二设置为falsechannel.basicConsume("workqueues",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//睡眠1秒方便观看效果try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者1消费:" + new String(body));/*** 手动确认消息* 参数一: 具体那条消息的标志* 参数二: 是否开启多条消息同时确认(由于上面指定了一次只能消费一条消息 所以关闭)*/channel.basicAck(envelope.getDeliveryTag(),false);}});}
}
消费者2
package com.mahy.notes.workqueues;import com.mahy.utils.RabbitMQUtils;
import com.rabbitmq.client.*;
import java.io.IOException;/*** @Auther : mahy* @Date : 2020/7/27 21:17* @Description : 工作队列消费者1.*/
public class Customer002 {public static void main(String[] args) throws IOException {Connection connection = RabbitMQUtils.getConnection();Channel channel = connection.createChannel();//每次只能消费一条消息channel.basicQos(1);channel.queueDeclare("workqueuees",true,false,false,null);//消费消息//关闭消息自动自动确认机制:参数二设置为falsechannel.basicConsume("workqueues",false,new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//睡眠3秒方便观看效果try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者2消费:" + new String(body));/*** 手动确认消息* 参数一: 具体那条消息的标志* 参数二: 是否开启多条消息同时确认(由于上面指定了一次只能消费一条消息 所以关闭)*/channel.basicAck(envelope.getDeliveryTag(),false);}});}
}
结果
消费者1的结果
消费者2的结果
这样就能实现能者多劳的功能了,这就是工作队列。
最后说明
这是观看BILIBILI上“编程不良人”大神的视频学习记录的笔记,如果眼花有记错的地方,希望能够提出,大家共同努力,谢谢。