消费者启动流程
DefaultMQPushConsumer
从继承结构来看
可以对topic操作,查询消息,订阅指定topic实现push模式
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.consumerGroup = consumerGroup;this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);}
默认AllocateMessageQueueAveragely
@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return result;}int index = cidAll.indexOf(currentCID);int mod = mqAll.size() % cidAll.size();int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;int range = Math.min(averageSize, mqAll.size() - startIndex);for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;}
DefaultMQPushConsumerImpl
大致看其具有的方法
String groupName();MessageModel messageModel();ConsumeType consumeType();ConsumeFromWhere consumeFromWhere();Set<SubscriptionData> subscriptions();void doRebalance();void persistConsumerOffset();void updateTopicSubscribeInfo(final String topic, final Set<MessageQueue> info);boolean isSubscribeTopicNeedUpdate(final String topic);boolean isUnitMode();ConsumerRunningInfo consumerRunningInfo();
订阅,负载均衡消费消息等接口
] consumer.subscribe("TopicA-test24", "*");
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#subscribe(java.lang.String, java.lang.String)
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subExpression);this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}} catch (Exception e) {
throw new MQClientException("subscription exception", e);}}
@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0);try {
byte[] body = messageExt.getBody();String tags = messageExt.getTags();String topic = messageExt.getTopic();String keys = messageExt.getKeys();System.out.println("body:"+ JSON.parseObject(new String(body, RemotingHelper.DEFAULT_CHARSET),MqData.class)+" tags:"+tags+" topic:"+topic+" keys:"+keys);} catch (UnsupportedEncodingException e) {
e.printStackTrace();//处理出现异常,获取重试次数.达到某个次数的时候可以记录日志,做补偿处理int reconsumeTimes = messageExt.getReconsumeTimes();if(reconsumeTimes == 3){
return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());this.serviceState = ServiceState.START_FAILED;//检查配置,是否有配置非法的配置this.checkConfig();this.copySubscription();if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();}this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load();if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}this.consumeMessageService.start();boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown();throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.updateTopicSubscribeInfoWhenSubscriptionChanged();this.mQClientFactory.checkClientInBroker();this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();this.mQClientFactory.rebalanceImmediately();}
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#copySubscription
private void copySubscription() throws MQClientException {
try {
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();final String subString = entry.getValue();SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subString);this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);}}if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();}switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:break;case CLUSTERING:final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),retryTopic, SubscriptionData.SUB_ALL);this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);break;default:break;}} catch (Exception e) {
throw new MQClientException("subscription exception", e);}}
该方法功能为创建retry topic,放入org.apache.rocketmq.client.impl.consumer.RebalanceImpl#subscriptionInner
org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {
instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {
instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);}}return instance;}
创建MQClientInstance
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;this.instanceIndex = instanceIndex;this.nettyClientConfig = new NettyClientConfig();this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());this.clientRemotingProcessor = new ClientRemotingProcessor(this);this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());}this.clientId = clientId;this.mQAdminImpl = new MQAdminImpl(this);this.pullMessageService = new PullMessageService(this);this.rebalanceService = new RebalanceService(this);this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);this.defaultMQProducer.resetClientConfig(clientConfig);this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",this.instanceIndex,this.clientId,this.clientConfig,MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());}
生产者同样要初始化这个客户端,主要初始化客户端的网络,拉取消息,负载均衡等实体类
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
负载配置,设置消费者组,集群还是广播消费,负载策略,MQClientInstance
this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
拉取消息的实现
?
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load();
如果是广播消费,偏移量offset从本地文件中加载
如果是集群消费,偏移量采用broker中维护的偏移量
OffsetStore
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.client.consumer.store;import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;/*** Offset store interface*/
public interface OffsetStore {
/*** Load*/void load() throws MQClientException;/*** Update the offset,store it in memory*/void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);/*** Get offset from local storage** @return The fetched offset*/long readOffset(final MessageQueue mq, final ReadOffsetType type);/*** Persist all offsets,may be in local storage or remote name server*/void persistAll(final Set<MessageQueue> mqs);/*** Persist the offset,may be in local storage or remote name server*/void persist(final MessageQueue mq);/*** Remove offset*/void removeOffset(MessageQueue mq);/*** @return The cloned offset table of given topic*/Map<MessageQueue, Long> cloneOffsetTable(String topic);/*** @param mq* @param offset* @param isOneway*/void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,MQBrokerException, InterruptedException, MQClientException;
}
管理消费偏移量的接口
有两个实现,分别是本地存储的offset,对应的是广播消费,和broker存储的offset,对应的是集群消费
后面先以集群消费的方式来看
?
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}
选择顺序消费还是并发消费
?
ConsumeMessageService
同样对应了两种实现,看并发的这种实现
public void start() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Overridepublic void run() {
cleanExpireMsg();}}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);}
?
默认15分支执行一次cleanExpireMsg
?
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#cleanExpireMsg
private void cleanExpireMsg() {
Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();while (it.hasNext()) {
Map.Entry<MessageQueue, ProcessQueue> next = it.next();ProcessQueue pq = next.getValue();pq.cleanExpiredMsg(this.defaultMQPushConsumer);}}
cleanExpireMsg方法定时对ProcessQueue进行处理,至于ProcessQueue存储的是什么,后面看
?
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown();throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}
注册消费者
mQClientFactory.start();
switch (this.serviceState) {
case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channel//启动netty客户端this.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull servicethis.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start();// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;
pullMessageService.start
public void run() {
log.info(this.getServiceName() + " service started");while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);}}
RebalanceService.start
@Overridepublic void run() {
log.info(this.getServiceName() + " service started");while (!this.isStopped()) {
this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}
//拉取topic信息,获取broker中的队列信息this.updateTopicSubscribeInfoWhenSubscriptionChanged();//this.mQClientFactory.checkClientInBroker();//消费者向生产者发送心跳this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();//立即做一次负载 this.mQClientFactory.rebalanceImmediately();
?
public void rebalanceImmediately() {
this.rebalanceService.wakeup();}
@Overridepublic void run() {
log.info(this.getServiceName() + " service started");while (!this.isStopped()) {
this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}
this.mQClientFactory.rebalanceImmediately();会解除阻塞,进行rebalance
?
消费者拉取消息流程
doRebalance
@Overridepublic void run() {
log.info(this.getServiceName() + " service started");while (!this.isStopped()) {
this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();try {
this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);}}}}this.truncateMessageQueueNotMyTopic();}
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",consumerGroup,topic,mqSet,mqSet);}} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {
allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);}boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {
log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;}}
如果是集群消费
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
先根据topic,获取到broker。再从broker上获取到消费者列表(心跳时会注册消费者到broker,org.apache.rocketmq.client.impl.factory.MQClientInstance#sendHeartbeatToAllBroker)、
使用指定策略分配消息队列,默认时消费者平均分配消息队列
返回返回allocateResult本消费者分配到的队列
思考:为什么消费者要固定需要消费的队列?,如果多个消费者在同时消费同一队列,那么为了保证偏移量的正确,就需要在消费期间,加锁,不允许另一个消费者进行消费。反之如果只需要保证一个队列在同一时间只能被一个消费者消费,则效率大大提升
?
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#computePullFromWhere
public long computePullFromWhere(MessageQueue mq) {
long result = -1;final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:case CONSUME_FROM_MIN_OFFSET:case CONSUME_FROM_MAX_OFFSET:case CONSUME_FROM_LAST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);if (lastOffset >= 0) {
result = lastOffset;}// First start,no offsetelse if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0L;} else {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);} catch (MQClientException e) {
result = -1;}}} else {
result = -1;}break;}case CONSUME_FROM_FIRST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);if (lastOffset >= 0) {
result = lastOffset;} else if (-1 == lastOffset) {
result = 0L;} else {
result = -1;}break;}case CONSUME_FROM_TIMESTAMP: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);if (lastOffset >= 0) {
result = lastOffset;} else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);} catch (MQClientException e) {
result = -1;}} else {
try {
long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),UtilAll.YYYYMMDDHHMMSS).getTime();result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);} catch (MQClientException e) {
result = -1;}}} else {
result = -1;}break;}default:break;}return result;}
判断队列从何处开始消费
三种情况
- ConsumeFromLastOffset:从队列最大偏移量量进行消费,从上一次消费的位置开始
- ConsumeFromFirstOffset:从头开始消费,如果消费过会出现重复消费的问题
- ConsumeFromTimestamp:第一次启动从给定时间戳消费,后续启动接着上一次消费的位置继续
以上的前提都是一个新的消费者组第一次启动消费者
如果是重试队列,都从0开始消费
processQueueTable存放,消息队列和ProcessQueue
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);pullRequestList.add(pullRequest);
生成pullRequest
this.dispatchPullRequest(pullRequestList);
分发pullRequest
org.apache.rocketmq.client.impl.consumer.PullMessageService#executePullRequestImmediately
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);}}
将pullRequest放入pullRequestQueue中,这事一个阻塞队列
可见,在rebancel过程中,如果队列发生变化了,会把新的pullRequest重新分发,去拉取数据。如果队列已经存在目前的订阅processQueueTable,则不会生成pullRequest,。默认rebance间隔未20秒
private static long waitInterval =Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
@Overridepublic void run() {
log.info(this.getServiceName() + " service started");while (!this.isStopped()) {
this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
···}···}
再看org.apache.rocketmq.client.impl.consumer.PullMessageService#run
@Overridepublic void run() {
log.info(this.getServiceName() + " service started");while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;impl.pullMessage(pullRequest);} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);}}
进入org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
可以看出来rocketMq默认采用的就是push模式进行消息的拉取
?
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
记录最后一次拉取时间
long cachedMessageCount = processQueue.getMsgCount().get();long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
分别代表消费者缓存的消息数和消息大小
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}
/*** Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit*/private int pullThresholdForQueue = 1000;/*** Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,* Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit** <p>* The size of a message only measured by message body, so it's not accurate*/private int pullThresholdSizeForQueue = 100;
如果缓存消息数大于1000或者消息大于100M,那么稍后再拉取,防止消息堆积在消费者。
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#executePullRequestLater
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(new Runnable() {
@Overridepublic void run() {
PullMessageService.this.executePullRequestImmediately(pullRequest);}}, timeDelay, TimeUnit.MILLISECONDS);} else {
log.warn("PullMessageServiceScheduledThread has shutdown");}}
放入一个单线程延时线程池,延时3秒后重试
private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
PullCallback pullCallback = new PullCallback() {
@Overridepublic void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);switch (pullResult.getPullStatus()) {
case FOUND:long prevRequestOffset = pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());long pullRT = System.currentTimeMillis() - beginTimestamp;DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullRT);long firstMsgOffset = Long.MAX_VALUE;if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}}if (pullResult.getNextBeginOffset() < prevRequestOffset|| firstMsgOffset < prevRequestOffset) {
log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",pullResult.getNextBeginOffset(),firstMsgOffset,prevRequestOffset);}break;case NO_NEW_MSG:pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break;case NO_MATCHED_MSG:pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break;case OFFSET_ILLEGAL:log.warn("the pull request offset illegal, {} {}",pullRequest.toString(), pullResult.toString());pullRequest.setNextOffset(pullResult.getNextBeginOffset());pullRequest.getProcessQueue().setDropped(true);DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Overridepublic void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),pullRequest.getNextOffset(), false);DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());log.warn("fix the pull request offset, {}", pullRequest);} catch (Throwable e) {
log.error("executeTaskLater Exception", e);}}}, 10000);break;default:break;}}}@Overridepublic void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);}DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);}};
拉取消息回调
boolean commitOffsetEnable = false;long commitOffsetValue = 0L;if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);if (commitOffsetValue > 0) {
commitOffsetEnable = true;}}
如果是集群消费,需要提交消费偏移量
this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(),subExpression,subscriptionData.getExpressionType(),subscriptionData.getSubVersion(),pullRequest.getNextOffset(),this.defaultMQPushConsumer.getPullBatchSize(),sysFlag,commitOffsetValue,BROKER_SUSPEND_MAX_TIME_MILLIS,CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,CommunicationMode.ASYNC,pullCallback);
拉取消息
public PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final String expressionType,final long subVersion,final long offset,final int maxNums,final int sysFlag,final long commitOffset,final long brokerSuspendMaxTimeMillis,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);}if (findBrokerResult != null) {
{
// check versionif (!ExpressionType.isTagType(expressionType)&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
throw new MQClientException("The broker[" + mq.getBrokerName() + ", "+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);}}int sysFlagInner = sysFlag;if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);requestHeader.setExpressionType(expressionType);String brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);}PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}
FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);
根据brokerName+queueId找到对应的broker地址
?
进行实际的消息拉取
org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync
private void pullMessageAsync(final String addr,final RemotingCommand request,final long timeoutMillis,final PullCallback pullCallback) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Overridepublic void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);assert pullResult != null;pullCallback.onSuccess(pullResult);} catch (Exception e) {
pullCallback.onException(e);}} else {
if (!responseFuture.isSendRequestOK()) {
pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));} else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,responseFuture.getCause()));} else {
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));}}}});}
收到回复后调用callBack.onSuccess
?
broker对应拉取消息操作
PullMessageProcessor是
?
?
final GetMessageResult getMessageResult =this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
使用消费者发送过来的queueId,偏移量,最大消息数(默认32)
查询后返回消费者,还通知消费者下次开始消费的偏移量
执行回调org.apache.rocketmq.client.consumer.PullCallback#onSuccess
PullCallback pullCallback = new PullCallback() {
@Overridepublic void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);switch (pullResult.getPullStatus()) {
case FOUND:long prevRequestOffset = pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());long pullRT = System.currentTimeMillis() - beginTimestamp;DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullRT);long firstMsgOffset = Long.MAX_VALUE;if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}}if (pullResult.getNextBeginOffset() < prevRequestOffset|| firstMsgOffset < prevRequestOffset) {
log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",pullResult.getNextBeginOffset(),firstMsgOffset,prevRequestOffset);}break;case NO_NEW_MSG:pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break;case NO_MATCHED_MSG:pullRequest.setNextOffset(pullResult.getNextBeginOffset());DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);break;case OFFSET_ILLEGAL:log.warn("the pull request offset illegal, {} {}",pullRequest.toString(), pullResult.toString());pullRequest.setNextOffset(pullResult.getNextBeginOffset());pullRequest.getProcessQueue().setDropped(true);DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Overridepublic void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),pullRequest.getNextOffset(), false);DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());log.warn("fix the pull request offset, {}", pullRequest);} catch (Throwable e) {
log.error("executeTaskLater Exception", e);}}}, 10000);break;default:break;}}}
三种情况
发现新消息FOUND,
long prevRequestOffset = pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());
更新下次消费的偏移量(broker返回),
long pullRT = System.currentTimeMillis() - beginTimestamp;DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullRT);
public void addValue(final String statsKey, final int incValue, final int incTimes) {
StatsItem statsItem = this.getAndCreateStatsItem(statsKey);statsItem.getValue().addAndGet(incValue);statsItem.getTimes().addAndGet(incTimes);}
累加拉取消息次数和拉取消息时间
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
累加拉取消息个数
org.apache.rocketmq.client.impl.consumer.ProcessQueue#putMessage
public boolean putMessage(final List<MessageExt> msgs) {
boolean dispatchToConsume = false;try {
this.lockTreeMap.writeLock().lockInterruptibly();try {
int validMsgCnt = 0;for (MessageExt msg : msgs) {
MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);if (null == old) {
validMsgCnt++;this.queueOffsetMax = msg.getQueueOffset();msgSize.addAndGet(msg.getBody().length);}}msgCount.addAndGet(validMsgCnt);if (!msgTreeMap.isEmpty() && !this.consuming) {
dispatchToConsume = true;this.consuming = true;}if (!msgs.isEmpty()) {
MessageExt messageExt = msgs.get(msgs.size() - 1);String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);if (property != null) {
long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();if (accTotal > 0) {
this.msgAccCnt = accTotal;}}}} finally {
this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {
log.error("putMessage exception", e);}return dispatchToConsume;}
int validMsgCnt = 0;for (MessageExt msg : msgs) {
MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);if (null == old) {
validMsgCnt++;this.queueOffsetMax = msg.getQueueOffset();msgSize.addAndGet(msg.getBody().length);}}msgCount.addAndGet(validMsgCnt);
把消息放入msgTreeMap ,key为偏移量value为消息,如果是新消息,新增msgCount
?
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);
提交任务
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest
public void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);try {
this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);}} else {
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));} else {
break;}}ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);try {
this.consumeExecutor.submit(consumeRequest);} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));}this.submitConsumeRequestLater(consumeRequest);}}}}
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
一次消费的数量,默认为1
如果消息数量不大于配置的批量数量
直接提交
否则分批次提交请求,每批最大为consumeBatchSize
this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(),this.defaultMQPushConsumer.getConsumeThreadMax(),1000 * 60,TimeUnit.MILLISECONDS,this.consumeRequestQueue,new ThreadFactoryImpl("ConsumeMessageThread_"));
最小和最大分别为
/*** Minimum consumer thread number*/private int consumeThreadMin = 20;/*** Max consumer thread number*/private int consumeThreadMax = 64;
提交任务后,继续加入拉取消息任务,接下来继续拉取mq的新数据
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
@Overridepublic void run() {
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);return;}MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;ConsumeMessageContext consumeMessageContext = null;if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();boolean hasException = false;ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;try {
ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));}}status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);hasException = true;}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;} else {
returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",ConsumeMessageConcurrentlyService.this.consumerGroup,msgs,messageQueue);status = ConsumeConcurrentlyStatus.RECONSUME_LATER;}if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageConcurrentlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);}}
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);ConsumeConcurrentlyStatus status = null;ConsumeMessageContext consumeMessageContext = null;if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setProps(new HashMap<String, String>());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}
获取listener,组装ConsumeMessageContextx
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
调用自定义消费接口
?
返回ConsumeConcurrentlyStatus
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.client.consumer.listener;public enum ConsumeConcurrentlyStatus {
/*** Success consumption*/CONSUME_SUCCESS,/*** Failure consumption,later try to consume*/RECONSUME_LATER;
}
NO.1 获得我们注入的回调函数 MessageListenerConcurrently
?
NO.2 执行钩子函数的before
?
NO.3 执行回调函数的 consumeMessage 方法
?
NO.4 判断回调函数返回的状态,如果是空则设置为 ConsumeConcurrentlyStatus.RECONSUME_LATER;
?
NO.5 执行钩子函数的after方法
?
NO.6 处理执行结果(消费异常ack与消费进度管理)
?
消费异常ack与本地消费进度保存核心方法是:
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
public void processConsumeResult(final ConsumeConcurrentlyStatus status,final ConsumeConcurrentlyContext context,final ConsumeRequest consumeRequest) {
int ackIndex = context.getAckIndex();if (consumeRequest.getMsgs().isEmpty())return;switch (status) {
case CONSUME_SUCCESS:if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;}int ok = ackIndex + 1;int failed = consumeRequest.getMsgs().size() - ok;this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);break;case RECONSUME_LATER:ackIndex = -1;this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),consumeRequest.getMsgs().size());break;default:break;}switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;case CLUSTERING:List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);boolean result = this.sendMessageBack(msg, context);if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}}
这里主要通过变量
?
int ackIndex = context.getAckIndex(); 的设置来处理是否将消息回送到broker;大致逻辑是如果是中正常消费结束 ackindex会被设置成消息大size-1,那么后面代码不会进入循环进而不会将消息发送回broker;如果有异常就会将对应的消息通过retry的形式发回broker,如果发送失败则消费端会再次消费之前被标记失败的消息;
NO.1 将消费失败的消息发回broker,发送消息的cmd是:RequestCode.CONSUMER_SEND_MSG_BACK
?
NO.2 将发回失败的消息remove掉
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequestLater
private void submitConsumeRequestLater(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue) {
this.scheduledExecutorService.schedule(new Runnable() {
@Overridepublic void run() {
ConsumeMessageConcurrentlyService.this.submitConsumeRequest(msgs, processQueue, messageQueue, true);}}, 5000, TimeUnit.MILLISECONDS);}
如果消费失败,提交稍后再次消费的请求,并且放入重试队列中,消费者同时也订阅了这个队列,此时不会将消息移除
NO.3 如果1过程失败,client会再次重试消费该消息
?
NO.4 这里会获得最小的一个已被确认消费消息的偏移量(这个方法有可能会引起集群模式下的重复消费问题)
?
NO.5 将第四步的消费进度返回值存入本地缓存中
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);}
先看removeMessage
public long removeMessage(final List<MessageExt> msgs) {
long result = -1;final long now = System.currentTimeMillis();try {
this.lockTreeMap.writeLock().lockInterruptibly();this.lastConsumeTimestamp = now;try {
if (!msgTreeMap.isEmpty()) {
result = this.queueOffsetMax + 1;int removedCnt = 0;for (MessageExt msg : msgs) {
MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());if (prev != null) {
removedCnt--;msgSize.addAndGet(0 - msg.getBody().length);}}msgCount.addAndGet(removedCnt);if (!msgTreeMap.isEmpty()) {
result = msgTreeMap.firstKey();}}} finally {
this.lockTreeMap.writeLock().unlock();}} catch (Throwable t) {
log.error("removeMessage exception", t);}return result;}
将已经确认消费成功的从msgTreeMap删除。减少消息个数
如果,把提交的消息删除之后,如果msgTreeMap中没有数据了,那就直接提交偏移量为queueOffsetMax否则取队列中的最小偏移量,msgTreeMap为treeMap,因此很容易获取到最小偏移量。
又因为是并发消费(这个方法有可能会引起集群模式下的重复消费问题,比如一个批次有消息m1、m2、m3,
每次都消费一条消息 , 如果m1比m3先消费完成,这个函数返回剩下最小的偏移量,会是m1的偏移量而不是m3的,那么如果此时确认的的就是m1的偏移量,所以自定义程序要保证程序幂等)而且同步偏移量到broker是一个异步的过程如果进程断,掉电都会导致消费重复
?
消费失败,并且发送到retry队列失败(同步发送,要确定发送失败还是成功)才会放到msgBackFailed中,否则认为是已经从指定topic消费完成,删除消息,更新offset
queueOffsetMax的含义
putMessage()
用来初始化属性:msgTreeMap(存放拉取到的消息)、queueOffsetMax(消费到的最大offset)、msgSize(ProcessQueue中的总消息长度)、msgCount(ProcessQueue中的总消息数量)、msgAccCnt(表示broker端还有多少消息未被消费)。
public boolean putMessage(final List<MessageExt> msgs) {
// 这个只有在顺序消费的时候才会遇到,并发消费不会用到boolean dispatchToConsume = false;try {
this.lockTreeMap.writeLock().lockInterruptibly();try {
//有效消息数量int validMsgCnt = 0;for (MessageExt msg : msgs) {
// 把传过来的消息都都放在msgTreeMap中,以消息在queue中的offset作为key,msg做为valueMessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);// 正常情况,说明原本msgTreeMap中不包含此条消息if (null == old) {
validMsgCnt++;// 将最后一个消息的offset赋值给queueOffsetMaxthis.queueOffsetMax = msg.getQueueOffset();// 把当前消息的长度加到msgSize中msgSize.addAndGet(msg.getBody().length);}}// 增加有效消息数量msgCount.addAndGet(validMsgCnt);// msgTreeMap不为空(含有消息),并且不是正在消费状态// // 这个值在放消息的时候会设置为true,在顺序消费模式,取不到消息则设置为falseif (!msgTreeMap.isEmpty() && !this.consuming) {
// 将ProcessQueue置为正在被消费状态// 有消息,且为未消费状态,则顺序消费模式可以消费dispatchToConsume = true;this.consuming = true;}if (!msgs.isEmpty()) {
// 拿到最后一条消息MessageExt messageExt = msgs.get(msgs.size() - 1);// 获取broker端(拉取消息时)queue里最大的offset,maxOffset会存在每条消息里String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);// 计算broker端还有多少条消息没有被消费if (property != null) {
// broker端的最大偏移量 - 当前ProcessQueue中处理的最大消息偏移量long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();if (accTotal > 0) {
this.msgAccCnt = accTotal;}}}} finally {
this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {
log.error("putMessage exception", e);}return dispatchToConsume;
}
queueOffsetMax代表消费者拉取到消息后存储的最新的那一条数据的偏移量,
pullResult.getMsgFoundList()总能保持拉取过来的一个队列中的消息有序
因此queueOffsetMax总能保持消费到的最大offset.
重试队列
在自定义接口消费消息失败后就会放入重试队列,消费者本身就订阅了这个topic,同样可以再进行消费。这样只针对消费失败的消息重新消费就行了,而不需要一直等待消费失败的消息,无法继续消费对应topic的队列
?
广播消息
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}
集群消息采用LocalFileOffsetStore本地路径类似~.rocketmq_offsets\10.30.101.161@DEFAULT\mq-group\offsets.json
因为采用了本地偏移量,所以,在broker中的队列偏移量始终都没有被更新,更新的只是本地偏移量,也因此实现消息的广播
?
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
case BROADCASTING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",consumerGroup,topic,mqSet,mqSet);}} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}case CLUSTERING: {
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {
allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);}boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {
log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;}}
如果是广播消费直接使用全部的mq,而不用再使用负载策略
?
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());}break;case CLUSTERING:List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);boolean result = this.sendMessageBack(msg, context);if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);msgBackFailed.add(msg);}}if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());}break;default:break;}
可以看到,广播消息消费失败直接就丢弃不管了
?
顺序消息
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start
f (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}
public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,MessageListenerOrderly messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;this.messageListener = messageListener;this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(),this.defaultMQPushConsumer.getConsumeThreadMax(),1000 * 60,TimeUnit.MILLISECONDS,this.consumeRequestQueue,new ThreadFactoryImpl("ConsumeMessageThread_"));this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));}
和并发的构造相同
org.apache.rocketmq.client.impl.consumer.ConsumeMessageService#start
并发消费启动
public void start() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Overridepublic void run() {
cleanExpireMsg();}}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);}
org.apache.rocketmq.client.impl.consumer.ProcessQueue#cleanExpiredMsg
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) {
if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
return;}int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;for (int i = 0; i < loop; i++) {
MessageExt msg = null;try {
this.lockTreeMap.readLock().lockInterruptibly();try {
if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
msg = msgTreeMap.firstEntry().getValue();} else {
break;}} finally {
this.lockTreeMap.readLock().unlock();}} catch (InterruptedException e) {
log.error("getExpiredMsg exception", e);}try {
pushConsumer.sendMessageBack(msg, 3);log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());try {
this.lockTreeMap.writeLock().lockInterruptibly();try {
if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
try {
removeMessage(Collections.singletonList(msg));} catch (Exception e) {
log.error("send expired msg exception", e);}}} finally {
this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {
log.error("getExpiredMsg exception", e);}} catch (Exception e) {
log.error("send expired msg exception", e);}}}
如果消息存在processQueue超过15分钟,那么发回重试队列,删除本地消息
pushConsumer.sendMessageBack(msg, 3);removeMessage(Collections.singletonList(msg));
顺序消费启动
public void start() {
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Overridepublic void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();}}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);}}
private HashMap<String/* brokerName */, Set<MessageQueue>> buildProcessQueueTableByBrokerName() {
HashMap<String, Set<MessageQueue>> result = new HashMap<String, Set<MessageQueue>>();for (MessageQueue mq : this.processQueueTable.keySet()) {
Set<MessageQueue> mqs = result.get(mq.getBrokerName());if (null == mqs) {
mqs = new HashSet<MessageQueue>();result.put(mq.getBrokerName(), mqs);}mqs.add(mq);}return result;}
遍历processQueueTable,将属于一个broker上的mq,放入map的对应brokerName的key,值中
?
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);}return;}} else {
if (processQueue.isLocked()) {
if (!pullRequest.isLockedFirst()) {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());boolean brokerBusy = offset < pullRequest.getNextOffset();log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",pullRequest, offset, brokerBusy);if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",pullRequest, offset);}pullRequest.setLockedFirst(true);pullRequest.setNextOffset(offset);}}
如果时顺序消费,必须保证队列是本消费者锁住的。同时锁住处理的pullRequest,并且每次都从新从broker获取该队列的消费进度
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequest
@Overridepublic void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);this.consumeExecutor.submit(consumeRequest);}}
这里没有像并发消费那样
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService.ConsumeRequest
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;private final MessageQueue messageQueue;public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;this.messageQueue = messageQueue;}public ProcessQueue getProcessQueue() {
return processQueue;}public MessageQueue getMessageQueue() {
return messageQueue;}@Overridepublic void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);break;}if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}long interval = System.currentTimeMillis() - beginTime;if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);break;}final int consumeBatchSize =ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);ConsumeOrderlyStatus status = null;ConsumeMessageContext consumeMessageContext = null;if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);// init the consume context typeconsumeMessageContext.setProps(new HashMap<String, String>());ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;boolean hasException = false;try {
this.processQueue.getLockConsume().lock();if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue);break;}status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue);hasException = true;} finally {
this.processQueue.getLockConsume().unlock();}if (null == status|| ConsumeOrderlyStatus.ROLLBACK == status|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue);}long consumeRT = System.currentTimeMillis() - beginTimestamp;if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;} else {
returnType = ConsumeReturnType.RETURNNULL;}} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
returnType = ConsumeReturnType.FAILED;} else if (ConsumeOrderlyStatus.SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;}if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());}if (null == status) {
status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);}ConsumeMessageOrderlyService.this.getConsumerStatsManager().incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);} else {
continueConsume = false;}}} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);return;}ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);}}}}
执行回调
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);synchronized (objLock) {
····}
为每个队列分配一个对象,通过对象加锁
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);break;}if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);break;}long interval = System.currentTimeMillis() - beginTime;if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);break;}
再次判断队列是否上锁,锁是否过期,如果不满足条机,不执行,重新提交任务ConsumeRequest。
final int consumeBatchSize =ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);ConsumeOrderlyStatus status = null;ConsumeMessageContext consumeMessageContext = null;if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();consumeMessageContext.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());consumeMessageContext.setMq(messageQueue);consumeMessageContext.setMsgList(msgs);consumeMessageContext.setSuccess(false);// init the consume context typeconsumeMessageContext.setProps(new HashMap<String, String>());ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);}long beginTimestamp = System.currentTimeMillis();ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;boolean hasException = false;try {
this.processQueue.getLockConsume().lock();if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue);break;}status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);}
与并发消费不同的是,顺序消费每次取出固定的消息数,交由线程执行,最大为consumeBatchSize,不会多线程去消费
注意 this.processQueue.getLockConsume().lock()
与之对应的是
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance
if (!mqSet.contains(mq)) {
pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {
it.remove();changed = true;log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);}}
如果需要移除每个队列先把setDropped设置为true
再移除前会加对应的锁
@Overridepublic boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);if (this.defaultMQPushConsumerImpl.isConsumeOrderly()&& MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
try {
if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
try {
return this.unlockDelay(mq, pq);} finally {
pq.getLockConsume().unlock();}} else {
log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",mq,pq.getTryUnlockTimes());pq.incTryUnlockTimes();}} catch (Exception e) {
log.error("removeUnnecessaryMessageQueue Exception", e);}return false;}return true;}
与之对应的
try {
this.processQueue.getLockConsume().lock();if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",this.messageQueue);break;}status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);}
这样可以确保在没有完成消费之前,不能释放出队列给其他消费者。
顺序消息返回
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#processConsumeResult
public boolean processConsumeResult(final List<MessageExt> msgs,final ConsumeOrderlyStatus status,final ConsumeOrderlyContext context,final ConsumeRequest consumeRequest) {
boolean continueConsume = true;long commitOffset = -1L;if (context.isAutoCommit()) {
switch (status) {
case COMMIT:case ROLLBACK:log.warn("the message queue consume result is illegal, we think you want to ack these message {}",consumeRequest.getMessageQueue());case SUCCESS:commitOffset = consumeRequest.getProcessQueue().commit();this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());break;case SUSPEND_CURRENT_QUEUE_A_MOMENT:this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);this.submitConsumeRequestLater(consumeRequest.getProcessQueue(),consumeRequest.getMessageQueue(),context.getSuspendCurrentQueueTimeMillis());continueConsume = false;} else {
commitOffset = consumeRequest.getProcessQueue().commit();}break;default:break;}} else {
switch (status) {
case SUCCESS:this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());break;case COMMIT:commitOffset = consumeRequest.getProcessQueue().commit();break;case ROLLBACK:consumeRequest.getProcessQueue().rollback();this.submitConsumeRequestLater(consumeRequest.getProcessQueue(),consumeRequest.getMessageQueue(),context.getSuspendCurrentQueueTimeMillis());continueConsume = false;break;case SUSPEND_CURRENT_QUEUE_A_MOMENT:this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);this.submitConsumeRequestLater(consumeRequest.getProcessQueue(),consumeRequest.getMessageQueue(),context.getSuspendCurrentQueueTimeMillis());continueConsume = false;}break;default:break;}}if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);}return continueConsume;}
如果返回_SUCCESS_,执行commit
public long commit() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();try {
Long offset = this.consumingMsgOrderlyTreeMap.lastKey();msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
msgSize.addAndGet(0 - msg.getBody().length);}this.consumingMsgOrderlyTreeMap.clear();if (offset != null) {
return offset + 1;}} finally {
this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {
log.error("commit exception", e);}return -1;}
减少消息个数,清空consumingMsgOrderlyTreeMap
这里有个问题,放消息的TreeMap怎么没清空,原来在takeMessage的时候就已经删掉了,加入到了consumingMsgOrderlyTreeMap
public List<MessageExt> takeMessags(final int batchSize) {
List<MessageExt> result = new ArrayList<MessageExt>(batchSize);final long now = System.currentTimeMillis();try {
this.lockTreeMap.writeLock().lockInterruptibly();this.lastConsumeTimestamp = now;try {
if (!this.msgTreeMap.isEmpty()) {
for (int i = 0; i < batchSize; i++) {
Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();if (entry != null) {
result.add(entry.getValue());consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());} else {
break;}}}if (result.isEmpty()) {
consuming = false;}} finally {
this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {
log.error("take Messages exception", e);}return result;}
更新偏移量offset。
如果返回消费失败
如果返回_SUSPEND_CURRENT_QUEUE_A_MOMENT_
case SUSPEND_CURRENT_QUEUE_A_MOMENT:this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);this.submitConsumeRequestLater(consumeRequest.getProcessQueue(),consumeRequest.getMessageQueue(),context.getSuspendCurrentQueueTimeMillis());continueConsume = false;} else {
commitOffset = consumeRequest.getProcessQueue().commit();}break;
public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
try {
this.lockTreeMap.writeLock().lockInterruptibly();try {
for (MessageExt msg : msgs) {
this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());this.msgTreeMap.put(msg.getQueueOffset(), msg);}} finally {
this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {
log.error("makeMessageToCosumeAgain exception", e);}}
把失败消息 重新放回msgTreeMap重新提交顺序消费消息请求,这样回头再takeMessage,依然取出的失败的这几条消息,这样只能消费这几条失败 而不会消费的之后的消息,这样队列就相当于挂起了, 不能再继续消费了, 也不会投放重试队列
?
思考消息重新提交线程 还能保证消息顺序吗?
**先看 **
提交org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService#submitConsumeRequest
?
@Overridepublic void submitConsumeRequest(final List<MessageExt> msgs,final ProcessQueue processQueue,final MessageQueue messageQueue,final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);this.consumeExecutor.submit(consumeRequest);}}
入参dispathToConsume为true才会提交任务
再看dispathToConsume
if (!msgTreeMap.isEmpty() && !this.consuming) {
dispatchToConsume = true;this.consuming = true;}
只有当msgTreeMap不空切 没有正在消费的时候才能为true,才会分发
那么consuming什么时候变为false
再看takeMessage
public List<MessageExt> takeMessags(final int batchSize) {
List<MessageExt> result = new ArrayList<MessageExt>(batchSize);final long now = System.currentTimeMillis();try {
this.lockTreeMap.writeLock().lockInterruptibly();this.lastConsumeTimestamp = now;try {
if (!this.msgTreeMap.isEmpty()) {
for (int i = 0; i < batchSize; i++) {
Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();if (entry != null) {
result.add(entry.getValue());consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());} else {
break;}}}if (result.isEmpty()) {
consuming = false;}} finally {
this.lockTreeMap.writeLock().unlock();}} catch (InterruptedException e) {
log.error("take Messages exception", e);}return result;}
只有当msgTreeMap为空,已经取不出数据的时候, consuming = false;
这样才能继续分发处理消费消息
因此一旦分发消息之后必须等到队列中的消费消费完毕才能再次分发
for (boolean continueConsume = true; continueConsume; ) {
···}
总结:对于顺序消费的三把锁:
1)首先在ConsumeMessageOrderlyService类中定义了定时任务每隔20秒执行一次lockMQPeriodically()方法,获取该Consumer端在Broker端锁住的MessageQueue集合(即分布式锁),并将RebalanceImpl.processQueueTable:ConcurrentHashMap<MessageQueue, ProcessQueue>集合中获得分布式锁的MessageQueue对象(消费队列)对应的ProcessQueue对象(消费处理队列)加上本地锁(即该对象的lock等于ture)以及加锁的时间,目的是为了在消费时在本地检查消费队列是否锁住;
2)在进行消息队列的消费过程中,对MessageQueue对象进行本地同步锁,保证同一时间只允许一个线程消息一个ConsumeQueue队列;
3)在回调业务层定义的ConsumeMessageOrderlyService.messageListener:MessageListenerOrderly类的consumeMessage方法之前获取ProcessQueue.lockConsume:ReentrantLock变量的锁即消费处理队列的锁,该锁的粒度比消息队列的同步锁粒度更小,该锁的目的是保证在消费的过程中不会被解锁。
?
并发消费可能会发生,消费者正在消费一个队列时,突然重新负载,这个队列被分配到了别的消费者哪里,这样就会发多个消费者消费一个队列,不仅会重复,还有可能导致乱序,因此顺序消费,需要在对队列上分布式锁。
同样在本地也需要上锁,同一时刻只能有一个线程消费同一队列,同时要保证在消费的过程中不会被解锁。还要保证同时只能有一个任务被分发到线程池执行消费任务即只有一个consumeRequest在被执行(因为锁的存在,多个消费请求也是没有意义的)
Pull模式
1 Pull模式的使用特点
在rocketMQ里面一般有两种获取消息的模式,一种是push, 一种是pull ,其实本质上都是pull ,只不过在于两者实现的机制不太一样,在之前的文章中介绍过push模式,此处不再做赘述。
pull消息模式呢,取消息的过程需要用户自己写,获取topic的消息队列,然后循环队列获取消息,上报offset, 直到最后取完了,换下一个队列,
如果看过笔者之前写的一篇文章RocketMQ系列之push(推)消息模式(六) 应该就会很明显的感受到,pull模式下的消息就是用户自己写,自己拿topic的messageQueue的集合去broker里面拉取消息,而push模式下全部是rocketMq帮我们做好了
pull模式是获取当前consumer里面负载到的messageQueue, 然后循环拉取每个消息队列里面的消息内容,上报offset的进度,
push模式以每个messageQueue构建一个队列任务,后台线程异步的去拉取, 根据borker阻塞的时间可以实现长轮询和短轮询
每个MessageQueue 对应了封装成了一个PullRequest,因为拉取数据是以每个Broker下面的Queue为单位,同时里面还一个ProcessQueue,每个MessageQueue也同样对应一个ProcessQueue,保存了这个MessageQueue消息处理状态的快照;还有nextOffset用来标识读取的位置;继续看一段pullMessage中的内容,给服务端发送请求的头内容:
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.consumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType); String brokerAddr = findBrokerResult.getBrokerAddr(); if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr); } PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr, requestHeader, timeoutMillis, communicationMode, pullCallback); return pullResult;
其中有一个参数是SuspendTimeoutMillis,作用是设置Broker的最长阻塞时间,默认为15秒,前提是没有消息的情况下,有消息会立刻返回;
2 Java中PullConsumer的几种实现
1.2.1 DefaultMQPullConsumer
获取消息队列
/*** 根据Topic获取该Topic的所有消息队列,用于遍历消息队列,从每个消息队列中获取消息** @param topic Topic名称* @return 该Topic所有的消息队列*/
@Override
public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
return this.defaultMQPullConsumerImpl.fetchSubscribeMessageQueues(withNamespace(topic));
}
同步拉去消息
/*** 同步拉取消息** @param mq 消息队列* @param subExpression 消息tag过滤表达式* @param offset 消费组offset(从哪里开始拉去)* @param maxNums 一次最大拉去消息数量* @param timeout 超时时间* @return 存储了拉取状态以及消息*/
@Override
public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, timeout);
}
异步拉取消息
/*** 异步拉取消息* @param mq 消息队列* @param subExpression 消息tag过滤表达式* @param offset 消费组offset(从哪里开始拉去)* @param maxNums 一次最大拉去消息数量* @param timeout 超时时间* @param pullCallback 异步回调函数* @param timeout * @throws MQClientException* @throws RemotingException* @throws InterruptedException*/
@Override
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback,long timeout)throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(queueWithNamespace(mq), subExpression, offset, maxNums, pullCallback, timeout);
}
同步阻塞拉取消息
/*** 拉取消息,若没有找到消息,则阻塞一段时间** @param mq 消息队列* @param subExpression tag过滤* @param offset 消费组offset* @param maxNums 一次最大拉取数量* @return*/
@Override
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(queueWithNamespace(mq), subExpression, offset, maxNums);
}
pullBlockIfNotFound 和 pull区别是: 前者在没有找到消息的时候会阻塞一段时间以便等待后续消息进入,后者则会直接返回 NOT_FOUND 。
获取队列的消费Offset
/*** 获取队列的消费Offset* @param mq 队列* @param fromStore 是否从存储获取,true: 从当前内存中获取,false:从远程broker获取* @return 消费offset*/
@Override
public long fetchConsumeOffset(MessageQueue mq, boolean fromStore) throws MQClientException {
return this.defaultMQPullConsumerImpl.fetchConsumeOffset(queueWithNamespace(mq), fromStore);
}
更新消费组Offset
/*** 更新消费组的Offset,注意:只会在本地内存中更新,并不会同步到远程Broker,至少现在版本不会(4.4)* @param mq 消息队列* @param offset 消费进度*/
@Override
public void updateConsumeOffset(MessageQueue mq, long offset) throws MQClientException {
this.defaultMQPullConsumerImpl.updateConsumeOffset(queueWithNamespace(mq), offset);
}
org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#start
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;this.checkConfig();this.copySubscription();if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPullConsumer.changeInstanceNameToPID();}this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPullConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();} else {
switch (this.defaultMQPullConsumer.getMessageModel()) {
case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());break;default:break;}this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load();boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}mQClientFactory.start();log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PullConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}}
可以看出来pull模式没有针对是否为顺序消息进行处理,也就少了,ConsumeMessageService那就丧了自动拉取消息的能力,需要用户自己实现
DefaultMQPullConsumer的使用样例
import java.util.HashMap;
import java.util.Map;
import java.util.Set;import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;public class PullConsumer {
// 记录每个队列的消费进度private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();public static void main(String[] args) throws MQClientException {
// 1. 创建DefaultMQPullConsumer实例DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");// 2. 设置NameServerconsumer.setNamesrvAddr("127.0.0.1:9876");consumer.start();// 3. 获取Topic的所有队列Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");// 4. 遍历所有队列for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);SINGLE_MQ:while (true) {
try {
// 5. 拉取消息,arg1=消息队列,arg2=tag消息过滤,arg3=消息队列,arg4=一次最大拉去消息数量PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);System.out.printf("%s%n", pullResult);// 6. 将消息放入hash表中,存储该队列的消费进度putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {
case FOUND: // 找到消息,输出System.out.println(pullResult.getMsgFoundList().get(0));break;case NO_MATCHED_MSG: // 没有匹配tag的消息System.out.println("无匹配消息");break;case NO_NEW_MSG: // 该队列没有新消息,消费offset=最大offsetSystem.out.println("没有新消息");break SINGLE_MQ; // 跳出该队列遍历case OFFSET_ILLEGAL: // offset不合法System.out.println("Offset不合法");break;default:break;}} catch (Exception e) {
e.printStackTrace();}}}// 7. 关闭Consumerconsumer.shutdown();}/*** 从Hash表中获取当前队列的消费offset* @param mq 消息队列* @return long类型 offset*/private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);if (offset != null)return offset;return 0;}/*** 将消费进度更新到Hash表* @param mq 消息队列* @param offset offset*/private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);}
}
拉取消息线程模型分析
org.apache.rocketmq.client.impl.consumer.RebalanceService#run
?
单线程进行队列负载工作
@Overridepublic void run() {
log.info(this.getServiceName() + " service started");while (!this.isStopped()) {
this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}
默认间隔20s
org.apache.rocketmq.client.impl.consumer.PullMessageService#run
@Overridepublic void run() {
log.info(this.getServiceName() + " service started");while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}
单线程从pullRequestQueue取出pullRequest执行拉取操作
每次执行pullRequest过程中需要发送消息
this.pullAPIWrapper.pullKernelImpl
然后等到broker回复,由相关消息对应的线程执行回调
如果发现新消息
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);}
把拉取到的消息存到processQueue中
开启下一轮消息拉取
consumeExecutor
根据配置的线程池线程个数,批量消息的个数,执行ConsumeRequest,业务定义的listener接口
?
?
?
?
rocketMq负载
Broker端水平扩展
Broker负载均衡
Broker是以group为单位提供服务。一个group里面分master和slave,master和slave存储的数据一样,slave从master同步数据(同步双写或异步复制看配置)。
通过nameserver暴露给客户端后,只是客户端关心(注册或发送)一个个的topic路由信息。路由信息中会细化为message queue的路由信息。而message queue会分布在不同的broker group。所以对于客户端来说,分布在不同broker group的message queue为成为一个服务集群,但客户端会把请求分摊到不同的queue。
而由于压力分摊到了不同的queue,不同的queue实际上分布在不同的Broker group,也就是说压力会分摊到不同的broker进程,这样消息的存储和转发均起到了负载均衡的作用。
Broker一旦需要横向扩展,只需要增加broker group,然后把对应的topic建上,客户端的message queue集合即会变大,这样对于broker的负载则由更多的broker group来进行分担。
并且由于每个group下面的topic的配置都是独立的,也就说可以让group1下面的那个topic的queue数量是4,其他group下的topic queue数量是2,这样group1则得到更大的负载。
commit log
虽然每个topic下面有很多message queue,但是message queue本身并不存储消息。真正的消息存储会写在CommitLog的文件,message queue只是存储CommitLog中对应的位置信息,方便通过message queue找到对应存储在CommitLog的消息。
不同的topic,message queue都是写到相同的CommitLog 文件,也就是说CommitLog完全的顺序写。
具体如下图:
Producer
Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:
Consumer负载均衡
集群模式
在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。
默认的分配算法是AllocateMessageQueueAveragely,如下图:
还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式,如下图:
也是平均分摊每一条queue,只是以环状轮流分queue的形式,如下图:
当消息队列个数小于可消费客户端时,消息队列与客户端对应情况如左侧图;当消息队列个数大于可消费客户端时,消息队列与客户端对应情况如右侧图:
广播模式
由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。
在实现上,其中一个不同就是在consumer分配queue的时候,会所有consumer都分到所有的queue。