启动流程
DefaultMQProducer producer = new DefaultMQProducer("mq-group");
创建生产者实例
先看DefaultMQProducer的类结构
MQAdmin
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.client;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;/*** Base interface for MQ management*/
public interface MQAdmin {
/*** Creates an topic** @param key accesskey* @param newTopic topic name* @param queueNum topic's queue number*/void createTopic(final String key, final String newTopic, final int queueNum)throws MQClientException;/*** Creates an topic** @param key accesskey* @param newTopic topic name* @param queueNum topic's queue number* @param topicSysFlag topic system flag*/void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)throws MQClientException;/*** Gets the message queue offset according to some time in milliseconds<br>* be cautious to call because of more IO overhead** @param mq Instance of MessageQueue* @param timestamp from when in milliseconds.* @return offset*/long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;/*** Gets the max offset** @param mq Instance of MessageQueue* @return the max offset*/long maxOffset(final MessageQueue mq) throws MQClientException;/*** Gets the minimum offset** @param mq Instance of MessageQueue* @return the minimum offset*/long minOffset(final MessageQueue mq) throws MQClientException;/*** Gets the earliest stored message time** @param mq Instance of MessageQueue* @return the time in microseconds*/long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;/*** Query message according tto message id** @param offsetMsgId message id* @return message*/MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,InterruptedException, MQClientException;/*** Query messages** @param topic message topic* @param key message key index word* @param maxNum max message number* @param begin from when* @param end to when* @return Instance of QueryResult*/QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,final long end) throws MQClientException, InterruptedException;/*** @return The {@code MessageExt} of given msgId*/MessageExt viewMessage(String topic,String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;}
可以看出来,生产者具备创建topic的,查询消息,获取偏移量等功能
MQProdecer
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.client.producer;import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;public interface MQProducer extends MQAdmin {
void start() throws MQClientException;void shutdown();List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;SendResult send(final Message msg, final long timeout) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;void send(final Message msg, final SendCallback sendCallback) throws MQClientException,RemotingException, InterruptedException;void send(final Message msg, final SendCallback sendCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException;void sendOneway(final Message msg) throws MQClientException, RemotingException,InterruptedException;SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;SendResult send(final Message msg, final MessageQueue mq, final long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException;void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)throws MQClientException, RemotingException, InterruptedException;void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)throws MQClientException, RemotingException, InterruptedException;void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,RemotingException, InterruptedException;SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)throws MQClientException, RemotingException, MQBrokerException, InterruptedException;SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,final long timeout) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;void send(final Message msg, final MessageQueueSelector selector, final Object arg,final SendCallback sendCallback) throws MQClientException, RemotingException,InterruptedException;void send(final Message msg, final MessageQueueSelector selector, final Object arg,final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,InterruptedException;void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)throws MQClientException, RemotingException, InterruptedException;TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;TransactionSendResult sendMessageInTransaction(final Message msg,final Object arg) throws MQClientException;//for batchSendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQClientException,RemotingException, MQBrokerException, InterruptedException;SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}
各种发送消息的方法
发送消息,指定超时时间,设置callback,事务消息,发送集合批量消息等,还可以设置选择消息队列的发送规则。
?
ClientConfig
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.client;import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.TlsSystemConfig;
import org.apache.rocketmq.remoting.protocol.LanguageCode;/*** Client Common configuration*/
public class ClientConfig {
//快速通道,broker启动还会启动一个快速通道端口public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";//nameServer地址private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));//本地ipprivate String clientIP = RemotingUtil.getLocalAddress();//producer实例名称private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");//回消息线程数private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();/*** Pulling topic information interval from the named server*///对nameServer投票设置private int pollNameServerInterval = 1000 * 30;/*** Heartbeat interval in microseconds with message broker*///针对心跳配置private int heartbeatBrokerInterval = 1000 * 30;/*** Offset persistent interval for consumer*///持久化消费者队列,消费者配置private int persistConsumerOffsetInterval = 1000 * 5;private boolean unitMode = false;private String unitName;//是否开启vip快速通道private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));private boolean useTLS = TlsSystemConfig.tlsEnable;private LanguageCode language = LanguageCode.JAVA;public String buildMQClientId() {
StringBuilder sb = new StringBuilder();sb.append(this.getClientIP());sb.append("@");sb.append(this.getInstanceName());if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");sb.append(this.unitName);}return sb.toString();}public String getClientIP() {
return clientIP;}public void setClientIP(String clientIP) {
this.clientIP = clientIP;}public String getInstanceName() {
return instanceName;}public void setInstanceName(String instanceName) {
this.instanceName = instanceName;}public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = String.valueOf(UtilAll.getPid());}}public void resetClientConfig(final ClientConfig cc) {
this.namesrvAddr = cc.namesrvAddr;this.clientIP = cc.clientIP;this.instanceName = cc.instanceName;this.clientCallbackExecutorThreads = cc.clientCallbackExecutorThreads;this.pollNameServerInterval = cc.pollNameServerInterval;this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval;this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval;this.unitMode = cc.unitMode;this.unitName = cc.unitName;this.vipChannelEnabled = cc.vipChannelEnabled;this.useTLS = cc.useTLS;this.language = cc.language;}public ClientConfig cloneClientConfig() {
ClientConfig cc = new ClientConfig();cc.namesrvAddr = namesrvAddr;cc.clientIP = clientIP;cc.instanceName = instanceName;cc.clientCallbackExecutorThreads = clientCallbackExecutorThreads;cc.pollNameServerInterval = pollNameServerInterval;cc.heartbeatBrokerInterval = heartbeatBrokerInterval;cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval;cc.unitMode = unitMode;cc.unitName = unitName;cc.vipChannelEnabled = vipChannelEnabled;cc.useTLS = useTLS;cc.language = language;return cc;}public String getNamesrvAddr() {
return namesrvAddr;}public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;}public int getClientCallbackExecutorThreads() {
return clientCallbackExecutorThreads;}public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) {
this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;}public int getPollNameServerInterval() {
return pollNameServerInterval;}public void setPollNameServerInterval(int pollNameServerInterval) {
this.pollNameServerInterval = pollNameServerInterval;}public int getHeartbeatBrokerInterval() {
return heartbeatBrokerInterval;}public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) {
this.heartbeatBrokerInterval = heartbeatBrokerInterval;}public int getPersistConsumerOffsetInterval() {
return persistConsumerOffsetInterval;}public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) {
this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;}public String getUnitName() {
return unitName;}public void setUnitName(String unitName) {
this.unitName = unitName;}public boolean isUnitMode() {
return unitMode;}public void setUnitMode(boolean unitMode) {
this.unitMode = unitMode;}public boolean isVipChannelEnabled() {
return vipChannelEnabled;}public void setVipChannelEnabled(final boolean vipChannelEnabled) {
this.vipChannelEnabled = vipChannelEnabled;}public boolean isUseTLS() {
return useTLS;}public void setUseTLS(boolean useTLS) {
this.useTLS = useTLS;}public LanguageCode getLanguage() {
return language;}public void setLanguage(LanguageCode language) {
this.language = language;}@Overridepublic String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval+ ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="+ persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="+ vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + "]";}
}
接下来看实现
DefaultMQProducer
构造,传入生产者组名称
public DefaultMQProducer(final String producerGroup) {
this(producerGroup, null);}
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
this.producerGroup = producerGroup;defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);}
DefaultMQProducerImpl
只继承了一个接口
MQProducerInner
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.client.impl.producer;import java.util.Set;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;public interface MQProducerInner {
Set<String> getPublishTopicList();boolean isPublishTopicNeedUpdate(final String topic);TransactionCheckListener checkListener();TransactionListener getCheckListener();void checkTransactionState(final String addr,final MessageExt msg,final CheckTransactionStateRequestHeader checkRequestHeader);void updateTopicPublishInfo(final String topic, final TopicPublishInfo info);boolean isUnitMode();
}
关于topic的接口信息
?
回来继续看DefaultMQProducerImpl属性
private final InternalLogger log = ClientLogger.getLog();private final Random random = new Random();// defaultMQProducer实例·private final DefaultMQProducer defaultMQProducer;//维护的topicmapprivate final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =new ConcurrentHashMap<String, TopicPublishInfo>();//private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();private final RPCHook rpcHook;protected BlockingQueue<Runnable> checkRequestQueue;protected ExecutorService checkExecutor;private ServiceState serviceState = ServiceState.CREATE_JUST;private MQClientInstance mQClientFactory;private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;private final ExecutorService defaultAsyncSenderExecutor;private ExecutorService asyncSenderExecutor;
顺着程序执行的顺序看
先看构造函数
?
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;this.rpcHook = rpcHook;this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors(),1000 * 60,TimeUnit.MILLISECONDS,this.asyncSenderThreadPoolQueue,new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());}});}
public interface RPCHook {
void doBeforeRequest(final String remoteAddr, final RemotingCommand request);void doAfterResponse(final String remoteAddr, final RemotingCommand request,final RemotingCommand response);
}
RPCHook是一个接口可以使用内部类传入实现。在发送请求前和请求后执行。
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
异步发送消息队列使用线程池中的LinkedBlockingQueue
构建线程池
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors(),1000 * 60,TimeUnit.MILLISECONDS,this.asyncSenderThreadPoolQueue,new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());}});
最大线程数和核心线程数都使用机器的cpu的最大线程数,,60秒不使用的线程会被回收,传入线程工厂
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)
看start方法
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;this.checkConfig();//设置实例名称为PIDif (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();}this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());if (startFactory) {
mQClientFactory.start();}log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The producer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();}
启动后进入_CREATE_JUST分支_
org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {
instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {
instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);}}return instance;}
MQClientInstance
创建MqClientInstance
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;this.instanceIndex = instanceIndex;this.nettyClientConfig = new NettyClientConfig();this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());this.clientRemotingProcessor = new ClientRemotingProcessor(this);this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());}this.clientId = clientId;this.mQAdminImpl = new MQAdminImpl(this);this.pullMessageService = new PullMessageService(this);this.rebalanceService = new RebalanceService(this);this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);this.defaultMQProducer.resetClientConfig(clientConfig);this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",this.instanceIndex,this.clientId,this.clientConfig,MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());}
NettyClientConfig
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.remoting.netty;public class NettyClientConfig {
/*** Worker thread number*/private int clientWorkerThreads = 4;private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;private int connectTimeoutMillis = 3000;private long channelNotActiveInterval = 1000 * 60;/*** IdleStateEvent will be triggered when neither read nor write was performed for* the specified period of this time. Specify {@code 0} to disable*/private int clientChannelMaxIdleTimeSeconds = 120;private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize;private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;private boolean clientPooledByteBufAllocatorEnable = false;private boolean clientCloseSocketIfTimeout = false;private boolean useTLS;public boolean isClientCloseSocketIfTimeout() {
return clientCloseSocketIfTimeout;}public void setClientCloseSocketIfTimeout(final boolean clientCloseSocketIfTimeout) {
this.clientCloseSocketIfTimeout = clientCloseSocketIfTimeout;}public int getClientWorkerThreads() {
return clientWorkerThreads;}public void setClientWorkerThreads(int clientWorkerThreads) {
this.clientWorkerThreads = clientWorkerThreads;}public int getClientOnewaySemaphoreValue() {
return clientOnewaySemaphoreValue;}public void setClientOnewaySemaphoreValue(int clientOnewaySemaphoreValue) {
this.clientOnewaySemaphoreValue = clientOnewaySemaphoreValue;}public int getConnectTimeoutMillis() {
return connectTimeoutMillis;}public void setConnectTimeoutMillis(int connectTimeoutMillis) {
this.connectTimeoutMillis = connectTimeoutMillis;}public int getClientCallbackExecutorThreads() {
return clientCallbackExecutorThreads;}public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) {
this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;}public long getChannelNotActiveInterval() {
return channelNotActiveInterval;}public void setChannelNotActiveInterval(long channelNotActiveInterval) {
this.channelNotActiveInterval = channelNotActiveInterval;}public int getClientAsyncSemaphoreValue() {
return clientAsyncSemaphoreValue;}public void setClientAsyncSemaphoreValue(int clientAsyncSemaphoreValue) {
this.clientAsyncSemaphoreValue = clientAsyncSemaphoreValue;}public int getClientChannelMaxIdleTimeSeconds() {
return clientChannelMaxIdleTimeSeconds;}public void setClientChannelMaxIdleTimeSeconds(int clientChannelMaxIdleTimeSeconds) {
this.clientChannelMaxIdleTimeSeconds = clientChannelMaxIdleTimeSeconds;}public int getClientSocketSndBufSize() {
return clientSocketSndBufSize;}public void setClientSocketSndBufSize(int clientSocketSndBufSize) {
this.clientSocketSndBufSize = clientSocketSndBufSize;}public int getClientSocketRcvBufSize() {
return clientSocketRcvBufSize;}public void setClientSocketRcvBufSize(int clientSocketRcvBufSize) {
this.clientSocketRcvBufSize = clientSocketRcvBufSize;}public boolean isClientPooledByteBufAllocatorEnable() {
return clientPooledByteBufAllocatorEnable;}public void setClientPooledByteBufAllocatorEnable(boolean clientPooledByteBufAllocatorEnable) {
this.clientPooledByteBufAllocatorEnable = clientPooledByteBufAllocatorEnable;}public boolean isUseTLS() {
return useTLS;}public void setUseTLS(boolean useTLS) {
this.useTLS = useTLS;}
}
关于netty的一些配置
NettyRequestProcessor
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.remoting.netty;import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;/*** Common remoting command processor*/
public interface NettyRequestProcessor {
RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws Exception;boolean rejectRequest();
}
请求处理或者拒绝接口
?
ClientRemotingProcessor
客户端请求处理实现
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.client.impl;import io.netty.channel.ChannelHandlerContext;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.MQProducerInner;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GetConsumerStatusBody;
import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;public class ClientRemotingProcessor implements NettyRequestProcessor {
private final InternalLogger log = ClientLogger.getLog();private final MQClientInstance mqClientFactory;public ClientRemotingProcessor(final MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;}@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:return this.checkTransactionState(ctx, request);case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:return this.notifyConsumerIdsChanged(ctx, request);case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:return this.resetOffset(ctx, request);case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:return this.getConsumeStatus(ctx, request);case RequestCode.GET_CONSUMER_RUNNING_INFO:return this.getConsumerRunningInfo(ctx, request);case RequestCode.CONSUME_MESSAGE_DIRECTLY:return this.consumeMessageDirectly(ctx, request);default:break;}return null;}@Overridepublic boolean rejectRequest() {
return false;}public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
final CheckTransactionStateRequestHeader requestHeader =(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());final MessageExt messageExt = MessageDecoder.decode(byteBuffer);if (messageExt != null) {
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {
messageExt.setTransactionId(transactionId);}final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);if (group != null) {
MQProducerInner producer = this.mqClientFactory.selectProducer(group);if (producer != null) {
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());producer.checkTransactionState(addr, messageExt, requestHeader);} else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);}} else {
log.warn("checkTransactionState, pick producer group failed");}} else {
log.warn("checkTransactionState, decode message failed");}return null;}public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
try {
final NotifyConsumerIdsChangedRequestHeader requestHeader =(NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);log.info("receive broker's notification[{}], the consumer group: {} changed, rebalance immediately",RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.getConsumerGroup());this.mqClientFactory.rebalanceImmediately();} catch (Exception e) {
log.error("notifyConsumerIdsChanged exception", RemotingHelper.exceptionSimpleDesc(e));}return null;}public RemotingCommand resetOffset(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
final ResetOffsetRequestHeader requestHeader =(ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}",RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),requestHeader.getTimestamp());Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();if (request.getBody() != null) {
ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class);offsetTable = body.getOffsetTable();}this.mqClientFactory.resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), offsetTable);return null;}@Deprecatedpublic RemotingCommand getConsumeStatus(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetConsumerStatusRequestHeader requestHeader =(GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);Map<MessageQueue, Long> offsetTable = this.mqClientFactory.getConsumerStatus(requestHeader.getTopic(), requestHeader.getGroup());GetConsumerStatusBody body = new GetConsumerStatusBody();body.setMessageQueueTable(offsetTable);response.setBody(body.encode());response.setCode(ResponseCode.SUCCESS);return response;}private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetConsumerRunningInfoRequestHeader requestHeader =(GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());if (null != consumerRunningInfo) {
if (requestHeader.isJstackEnable()) {
Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();String jstack = UtilAll.jstack(map);consumerRunningInfo.setJstack(jstack);}response.setCode(ResponseCode.SUCCESS);response.setBody(consumerRunningInfo.encode());} else {
response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));}return response;}private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);final ConsumeMessageDirectlyResultRequestHeader requestHeader =(ConsumeMessageDirectlyResultRequestHeader) request.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);final MessageExt msg = MessageDecoder.decode(ByteBuffer.wrap(request.getBody()));ConsumeMessageDirectlyResult result =this.mqClientFactory.consumeMessageDirectly(msg, requestHeader.getConsumerGroup(), requestHeader.getBrokerName());if (null != result) {
response.setCode(ResponseCode.SUCCESS);response.setBody(result.encode());} else {
response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));}return response;}
}
具体功能后面看
MQClientAPIImpl
创建MQClientAPIImpl
ublic MQClientAPIImpl(final NettyClientConfig nettyClientConfig,final ClientRemotingProcessor clientRemotingProcessor,RPCHook rpcHook, final ClientConfig clientConfig) {
this.clientConfig = clientConfig;topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);this.clientRemotingProcessor = clientRemotingProcessor;this.remotingClient.registerRPCHook(rpcHook);this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);}
注册各种的消息处理器都是clientRemotingProcessor
MQAdminImpl
this.mQAdminImpl = new MQAdminImpl(this);
和其名称相同, 该类实现了对mq的管理,管理topic,消息队列操作,消息查询
PullMessageService
ServiceThread
继承自runnable同时内置thread线程对象,执行run方法,同时还具有暂定线程的能力
?
@Overridepublic void run() {
log.info(this.getServiceName() + " service started");while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();this.pullMessage(pullRequest);} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);}}log.info(this.getServiceName() + " service end");}
如果没有暂定,就不断从队列中取出请求进行尝试。
?
RebalanceService
同样,也是在消费者才有用到的
@Overridepublic void run() {
log.info(this.getServiceName() + " service started");while (!this.isStopped()) {
this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);this.defaultMQProducer.resetClientConfig(clientConfig);this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",this.instanceIndex,this.clientId,this.clientConfig,MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
又创建了一个defaultMQProducer,回头看发送消息流程看这个到底有什么用。
org.apache.rocketmq.client.impl.factory.MQClientInstance#sendHeartbeatToAllBrokerWithLock
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker();this.uploadFilterClassSource();} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);} finally {
this.lockHeartbeat.unlock();}} else {
log.warn("lock heartBeat, but failed.");}}
IO/线程模型
接收消息
NettyRemotingClient
NettyClientHandler
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Overrideprotected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);}}
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:processRequestCommand(ctx, cmd);`break;case RESPONSE_COMMAND:processResponseCommand(ctx, cmd);break;default:break;}}}
REQUEST_COMMAND代表请求别的客户端的请求 ,RESPONSE_COMMAND_发送请求的回复_
_先看_REQUEST_COMMAND
进入org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();if (pair != null) {
Runnable run = new Runnable() {
@Overridepublic void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);response.markResponseType();try {
ctx.writeAndFlush(response);} catch (Throwable e) {
log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {
}}} catch (Throwable e) {
log.error("process request exception", e);log.error(cmd.toString());if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,RemotingHelper.exceptionSimpleDesc(e));response.setOpaque(opaque);ctx.writeAndFlush(response);}}}};if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[REJECTREQUEST]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);return;}try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())+ ", too many requests and system thread pool busy, RejectedExecutionException "+ pair.getObject2().toString()+ " request code: " + cmd.getCode());}if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,"[OVERLOAD]system busy, start flow control for a while");response.setOpaque(opaque);ctx.writeAndFlush(response);}}} else {
String error = " request type " + cmd.getCode() + " not supported";final RemotingCommand response =RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);response.setOpaque(opaque);ctx.writeAndFlush(response);log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);}}
processorTable注册processor
@Overridepublic void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;if (null == executor) {
executorThis = this.publicExecutor;}Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, ExecutorService>(processor, executorThis);this.processorTable.put(requestCode, pair);}
publicExecutor是一个根据电脑cpu核心数的一个固定线程池
Pair
public class Pair<T1, T2> {
private T1 object1;private T2 object2;public Pair(T1 object1, T2 object2) {
this.object1 = object1;this.object2 = object2;}public T1 getObject1() {
return object1;}public void setObject1(T1 object1) {
this.object1 = object1;}public T2 getObject2() {
return object2;}public void setObject2(T2 object2) {
this.object2 = object2;}
}
Pair是一个二元组,存的分别是processor,和对应执行的线程池,如果指定processor想要通过特定线程池执行,也可以传入自定义线程池代替publicExecutor
最后根据code,pair放入 processorTable
创建一个runable任务,使用pair中的线程池执行,相当于work线程至此就完成任务了, 把任务交给业务线程池
org.apache.rocketmq.client.impl.ClientRemotingProcessor#processRequest
public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:return this.checkTransactionState(ctx, request);case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:return this.notifyConsumerIdsChanged(ctx, request);case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:return this.resetOffset(ctx, request);case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:return this.getConsumeStatus(ctx, request);case RequestCode.GET_CONSUMER_RUNNING_INFO:return this.getConsumerRunningInfo(ctx, request);case RequestCode.CONSUME_MESSAGE_DIRECTLY:return this.consumeMessageDirectly(ctx, request);default:break;}return null;}
处理完毕之后返回结果,如果不是oneWay,需要给一个回复
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);response.markResponseType();try {
ctx.writeAndFlush(response);} catch (Throwable e) {
log.error("process request over, but response failed", e);log.error(cmd.toString());log.error(response.toString());}} else {
}}
异步发送消息线程
异步线程池构建
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors(),1000 * 60,TimeUnit.MILLISECONDS,this.asyncSenderThreadPoolQueue,new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());}});
异步发送
@Deprecatedpublic void send(final Message msg, final SendCallback sendCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();ExecutorService executor = this.getAsyncSenderExecutor();try {
executor.submit(new Runnable() {
@Overridepublic void run() {
long costTime = System.currentTimeMillis() - beginStartTime;if (timeout > costTime) {
try {
sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);} catch (Exception e) {
sendCallback.onException(e);}} else {
sendCallback.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));}}});} catch (RejectedExecutionException e) {
throw new MQClientException("executor rejected ", e);}}
直接交给线程池就返回了
发送流程
Message
创建消息
private String topic;private int flag;private Map<String, String> properties;private byte[] body;private String transactionId;
body是消息体,
properties存储了额外信息,比如延时消息等级,消息key
同步消息发送
org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message, long)
@Overridepublic SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQProducerImpl.send(msg, timeout);}
最终进入org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}}
来到这里发现topicPublishInfoTable中已经有一个TBW102这个topic
在start时放进去的,后面看这个topic有什么作用
如果topicPublishInfoTable中没有此topic,新建放入,并且进入
org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)
?
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}}} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);}if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);boolean changed = topicRouteDataIsChange(old, topicRouteData);if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);}if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);}} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);}} finally {
this.lockNamesrv.unlock();}} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);}} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);}return false;}
org.apache.rocketmq.client.impl.MQClientAPIImpl#getTopicRouteInfoFromNameServer(java.lang.String, long, boolean)
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();requestHeader.setTopic(topic);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);assert response != null;switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
if (allowTopicNotExist && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);}break;}case ResponseCode.SUCCESS: {
byte[] body = response.getBody();if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);}}default:break;}throw new MQClientException(response.getCode(), response.getRemark());}
nio如何实现消息的同步发送
构建请求消息。同步发送
org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();final Channel channel = this.getAndCreateChannel(addr);if (channel != null && channel.isActive()) {
try {
doBeforeRpcHooks(addr, request);long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");}RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);return response;} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);this.closeChannel(addr, channel);throw e;} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);}log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);throw e;}} else {
this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}}
判断超时时间,执行消息前置钩子,和消息后置钩子,钩子就是构建生产者时传入的
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeSyncImpl
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
final int opaque = request.getOpaque();try {
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);this.responseTable.put(opaque, responseFuture);final SocketAddress addr = channel.remoteAddress();channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Overridepublic void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);return;} else {
responseFuture.setSendRequestOK(false);}responseTable.remove(opaque);responseFuture.setCause(f.cause());responseFuture.putResponse(null);log.warn("send a request command to channel <" + addr + "> failed.");}});RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}return responseCommand;} finally {
this.responseTable.remove(opaque);}}
private static AtomicInteger requestId = new AtomicInteger(0);
先看ResponseFuture
// 请求标识相当于idprivate final int opaque; //netty channel连接private final Channel processChannel;//超时时间private final long timeoutMillis;
// 回调接口private final InvokeCallback invokeCallback;private final long beginTimestamp = System.currentTimeMillis();private final CountDownLatch countDownLatch = new CountDownLatch(1);private final SemaphoreReleaseOnlyOnce once;private final AtomicBoolean executeCallbackOnlyOnce = new AtomicBoolean(false);private volatile RemotingCommand responseCommand;private volatile boolean sendRequestOK = true;private volatile Throwable cause;
opaque生成规则
private static AtomicInteger requestId = new AtomicInteger(0);
private int opaque = requestId.getAndIncrement();
原子自增,保证每条请求的opaque都不同
放入responseTable中
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Overridepublic void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);return;} else {
responseFuture.setSendRequestOK(false);}responseTable.remove(opaque);responseFuture.setCause(f.cause());responseFuture.putResponse(null);log.warn("send a request command to channel <" + addr + "> failed.");}});
利用netty发送消息并添加监听器,如果发送失败,移除responseTable设置错误原因
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);return this.responseCommand;}
当收到消息回复
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processResponseCommand
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();final ResponseFuture responseFuture = responseTable.get(opaque);if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);responseTable.remove(opaque);if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);} else {
responseFuture.putResponse(cmd);responseFuture.release();}} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(cmd.toString());}}
如果responseFuture设置了callBack先执行callback
public void putResponse(final RemotingCommand responseCommand) {
this.responseCommand = responseCommand;this.countDownLatch.countDown();}
执行putResponse,countDown操作,释放countDown阻塞的地方
?
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}return responseCommand;
设置结果并返回
?
TopicRouteData
private String orderTopicConf;private List<QueueData> queueDatas;private List<BrokerData> brokerDatas;private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
queueDatas为从nameServer上读到的队列信息
brokerDatas为broker信息,包含broker的IP地址
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// Update Pub info{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();MQProducerInner impl = entry.getValue();if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);}}}// Update sub info{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();MQConsumerInner impl = entry.getValue();if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);}}}log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}
将broker及其地址放入brokerAddrTable
topicRouteData2TopicPublishInfo
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);info.getMessageQueueList().add(mq);}
**_ Update Pub info _**是生产者需要执行的发布操作
将可写的队列放入TopicPublishInfo中
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#updateTopicPublishInfo
将topic和publishInfo保存起来
@Overridepublic void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) {
if (info != null && topic != null) {
TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info);if (prev != null) {
log.info("updateTopicPublishInfo prev is not null, " + prev.toString());}}
Update sub info是消费者执行,暂时不看
?
回到发送消息的方法
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {
·······
循环发送,如果未能发送成功,则再重试默认重试三次(不算第一次发送)也只有同步消息会重试
?
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
同步消息默认的队列选择方式
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;return this.messageQueueList.get(pos);}
随机选择队列并发送
sendKernelImpl
private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();try {
//for MessageBatch,ID has been set in the generating processif (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);}int sysFlag = 0;boolean msgBodyCompressed = false;if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed = true;}final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}if (this.hasSendMessageHook()) {
context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {
case ASYNC:Message tmpMessage = msg;if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);msg.setBody(prevBody);}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}return sendResult;} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {
msg.setBody(prevBody);}}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}
设置请求头,生产者组,topic,和默认topicTW102,设置消息队列id。是否为批量消息等信息,最终
case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:
发送出去
org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage
public SendResult sendMessage(final String addr,final String brokerName,final Message msg,final SendMessageRequestHeader requestHeader,final long timeoutMillis,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final MQClientInstance instance,final int retryTimesWhenSendFailed,final SendMessageContext context,final DefaultMQProducerImpl producer) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();RemotingCommand request = null;if (sendSmartMsg || msg instanceof MessageBatch) {
SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);} else {
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);}request.setBody(msg.getBody());switch (communicationMode) {
case ONEWAY:this.remotingClient.invokeOneway(addr, request, timeoutMillis);return null;case ASYNC:final AtomicInteger times = new AtomicInteger();long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");}this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, context, producer);return null;case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");}return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);default:assert false;break;}return null;}
进入SYNC中的org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessageSync
private SendResult sendMessageSync(final String addr,final String brokerName,final Message msg,final long timeoutMillis,final RemotingCommand request) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;return this.processSendResponse(brokerName, msg, response);}
invokeSync跟之前的从nameServer上拉取broker信息的过程相同,不再研究
processSendResponse处理返回数据
至此,同步消息发送完毕
同步消息指定队列
org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message, org.apache.rocketmq.client.producer.MessageQueueSelector, java.lang.Object)
?
MessageQueueSelector
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.client.producer;import java.util.List;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
producer.send(msg,(messageQueues,message,arg)-> {
return messageQueues.get(0);},null);
可以这样发送消息,指定队列进行发送
?
事务消息(一定是同步消息)
TransactionMQProducer
/** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements. See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License. You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.rocketmq.client.producer;import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;public class TransactionMQProducer extends DefaultMQProducer {
private TransactionCheckListener transactionCheckListener;private int checkThreadPoolMinSize = 1;private int checkThreadPoolMaxSize = 1;private int checkRequestHoldMax = 2000;private ExecutorService executorService;private TransactionListener transactionListener;public TransactionMQProducer() {
}public TransactionMQProducer(final String producerGroup) {
super(producerGroup);}public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) {
super(producerGroup, rpcHook);}@Overridepublic void start() throws MQClientException {
this.defaultMQProducerImpl.initTransactionEnv();super.start();}@Overridepublic void shutdown() {
super.shutdown();this.defaultMQProducerImpl.destroyTransactionEnv();}/*** This method will be removed in the version 5.0.0, method <code>sendMessageInTransaction(Message,Object)</code>}* is recommended.*/@Override@Deprecatedpublic TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
if (null == this.transactionCheckListener) {
throw new MQClientException("localTransactionBranchCheckListener is null", null);}return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);}@Overridepublic TransactionSendResult sendMessageInTransaction(final Message msg,final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);}return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);}public TransactionCheckListener getTransactionCheckListener() {
return transactionCheckListener;}/*** This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.*/@Deprecatedpublic void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;}public int getCheckThreadPoolMinSize() {
return checkThreadPoolMinSize;}/*** This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.*/@Deprecatedpublic void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) {
this.checkThreadPoolMinSize = checkThreadPoolMinSize;}public int getCheckThreadPoolMaxSize() {
return checkThreadPoolMaxSize;}/*** This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.*/@Deprecatedpublic void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) {
this.checkThreadPoolMaxSize = checkThreadPoolMaxSize;}public int getCheckRequestHoldMax() {
return checkRequestHoldMax;}/*** This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.*/@Deprecatedpublic void setCheckRequestHoldMax(int checkRequestHoldMax) {
this.checkRequestHoldMax = checkRequestHoldMax;}public ExecutorService getExecutorService() {
return executorService;}public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;}public TransactionListener getTransactionListener() {
return transactionListener;}public void setTransactionListener(TransactionListener transactionListener) {
this.transactionListener = transactionListener;}
}
在父类DefaultMQProducer的基础上增加了事务消息的相关操作
demo
public static void main(String[] args) {
try {
TransactionMQProducer producer = new TransactionMQProducer("transactionMQProducer");producer.setNamesrvAddr("127.0.0.1:9876");producer.setTransactionListener(new SelfTransactionListener());producer.start();for (int i = 1; i < 6; i++) {
Message message = new Message("TransactionTopic", "transactionTest","msg-" + i, ("Hello" + ":" + i).getBytes());try {
SendResult result = producer.sendMessageInTransaction(message, "Hello" + ":" + i);System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId());} catch (Exception e) {
e.printStackTrace();}}Thread.sleep(Integer.MAX_VALUE);producer.shutdown();} catch (MQClientException e) {
e.printStackTrace();} catch (InterruptedException e) {
e.printStackTrace();}}
需要设置transactionListener
TransactionListener
看一个示例
package com.mq.study.mqpro.test;import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;public class SelfTransactionListener implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);private AtomicInteger checkTimes = new AtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();/*** 执行本地事务** @param message* @param o* @return*/@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {
String msgKey = message.getKeys();System.out.println("start execute local transaction " + msgKey);LocalTransactionState state;if (msgKey.contains("1")) {
// 第一条消息让他通过state = LocalTransactionState.COMMIT_MESSAGE;} else if (msgKey.contains("2")) {
// 第二条消息模拟异常,明确回复回滚操作state = LocalTransactionState.ROLLBACK_MESSAGE;} else {
// 第三条消息无响应,让它调用回查事务方法state = LocalTransactionState.UNKNOW;// 给剩下3条消息,放1,2,3三种状态localTrans.put(msgKey, transactionIndex.incrementAndGet());}System.out.println("executeLocalTransaction:" + message.getKeys() + ",execute state:" + state + ",current time:" + System.currentTimeMillis());return state;}/*** 回查本地事务结果** @param messageExt* @return*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
String msgKey = messageExt.getKeys();System.out.println("start check local transaction " + msgKey);Integer state = localTrans.get(msgKey);switch (state) {
case 1:System.out.println("check result unknown 回查次数" + checkTimes.incrementAndGet());return LocalTransactionState.UNKNOW;case 2:System.out.println("check result commit message, 回查次数" + checkTimes.incrementAndGet());return LocalTransactionState.COMMIT_MESSAGE;case 3:System.out.println("check result rollback message, 回查次数" + checkTimes.incrementAndGet());return LocalTransactionState.ROLLBACK_MESSAGE;default:return LocalTransactionState.COMMIT_MESSAGE;}}
}
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter localTransactionExecuter, final Object arg)throws MQClientException {
TransactionListener transactionListener = getCheckListener();if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);}Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult = null;MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());try {
sendResult = this.send(msg);} catch (Exception e) {
throw new MQClientException("send message Exception", e);}LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);}if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) {
log.debug("Used new transaction API");localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE:localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}try {
this.endTransaction(sendResult, localTransactionState, localException);} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);}TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);return transactionSendResult;}
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
设置属性事务消息,设置生产者组
同步发送完毕之后进入switch
如果发送成功,执行本地事务
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);}if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) {
log.debug("Used new transaction API");localTransactionState = transactionListener.executeLocalTransaction(msg, arg);}if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
执行本地事务
如果返回_COMMIT_MESSAGE,则提交事务消息_
如果返回 ROLLBACK_MESSAGE 则回滚掉已经发出去的半消息
如果返回UNKONW,则暂时没有操作,broker则会反查producer,判断回滚还是提交
?
?
org.apache.rocketmq.client.producer.TransactionListener#checkLocalTransaction
是broker来反查生产者端事务是否已经提交的接口
?
?
事务消息发送完毕,并执行完毕本地事务后
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#endTransaction
public void endTransaction(final SendResult sendResult,final LocalTransactionState localTransactionState,final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());}String transactionId = sendResult.getTransactionId();final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();requestHeader.setTransactionId(transactionId);requestHeader.setCommitLogOffset(id.getOffset());switch (localTransactionState) {
case COMMIT_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);break;case UNKNOW:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);break;default:break;}requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());requestHeader.setMsgId(sendResult.getMsgId());String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,this.defaultMQProducer.getSendMsgTimeout());}
根据事务执行的不同结果向broker发送不同的消息,注意,这里采用的是oneWay发送,如果没发出去,也不用担心,因为broker会回查。
?
回查逻辑
org.apache.rocketmq.client.impl.ClientRemotingProcessor#processRequest
@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.CHECK_TRANSACTION_STATE:return this.checkTransactionState(ctx, request);case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:return this.notifyConsumerIdsChanged(ctx, request);case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:return this.resetOffset(ctx, request);case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:return this.getConsumeStatus(ctx, request);case RequestCode.GET_CONSUMER_RUNNING_INFO:return this.getConsumerRunningInfo(ctx, request);case RequestCode.CONSUME_MESSAGE_DIRECTLY:return this.consumeMessageDirectly(ctx, request);default:break;}return null;}
最终进入org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkTransactionState
public void checkTransactionState(final String addr, final MessageExt msg,final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
private final String brokerAddr = addr;private final MessageExt message = msg;private final CheckTransactionStateRequestHeader checkRequestHeader = header;private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();@Overridepublic void run() {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();TransactionListener transactionListener = getCheckListener();if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable exception = null;try {
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");localTransactionState = transactionListener.checkLocalTransaction(message);} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);}} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);exception = e;}this.processTransactionState(localTransactionState,group,exception);} else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);}}private void processTransactionState(final LocalTransactionState localTransactionState,final String producerGroup,final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());thisHeader.setProducerGroup(producerGroup);thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());thisHeader.setFromTransactionCheck(true);String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (uniqueKey == null) {
uniqueKey = message.getMsgId();}thisHeader.setMsgId(uniqueKey);thisHeader.setTransactionId(checkRequestHeader.getTransactionId());switch (localTransactionState) {
case COMMIT_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);log.warn("when broker check, client rollback this transaction, {}", thisHeader);break;case UNKNOW:thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);log.warn("when broker check, client does not know this transaction state, {}", thisHeader);break;default:break;}String remark = null;if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);}try {
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,3000);} catch (Exception e) {
log.error("endTransactionOneway exception", e);}}};this.checkExecutor.submit(request);}
进而执行,用户自定以的checkLocalTransaction方法
异步消息
demo
package com.mq.study.mqpro.test;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageType;import java.util.ArrayList;
import java.util.List;import static java.lang.Thread.sleep;//rocketMq默认同步发送就是局部有序的 ,加上一selector就是全局有序
public class MQProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("mq-group");
// producer.setNamesrvAddr("123.207.63.192:9876");producer.setNamesrvAddr("127.0.0.1:9876");producer.setInstanceName("producer");producer.start();try {
for (int i = 0; i < 1; i++) {
// Thread.sleep(1000); //MQ每隔一秒发送一条消息Message msg = new Message("TopicA-test24",// topic"TagA",// tag("RocketMQ message"+i).getBytes()// body);msg.setKeys("i"+1);producer.send(msg,new SendCallback() {
//异步消息,发送消息交给线程池去做@Overridepublic void onSuccess(SendResult sendResult) {
System.out.printf("%s%n",sendResult);}@Overridepublic void onException(Throwable throwable) {
throwable.printStackTrace();}},10000000);//发送消息
// producer.sendOneway(msg);
// producer.send(msg,(messageQueues,message,arg)-> {
// return messageQueues.get(0);
// },null
// );
// String topic = "BatchTest";
// List<Message> messages = new ArrayList<>();
// messages.add(new Message(topic, "TagA", "Order1", "Hello world 0".getBytes()));
// messages.add(new Message(topic, "TagA", "Order2", "Hello world 1".getBytes()));
// messages.add(new Message(topic, "TagA", "Order3", "Hello world 2".getBytes()));
// producer.send(messages,10000000);}} catch (Exception e) {
e.printStackTrace();}Thread.sleep(3000); //延迟主线程的执行时间producer.shutdown();//关闭消息生产者}
}
注意
Thread.sleep(3000);
?
//延迟主线程的执行时间否则回导致,异步未发送完毕,主线程已经关闭了
producer.shutdown();
?
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message, org.apache.rocketmq.client.producer.SendCallback, long)
@Deprecatedpublic void send(final Message msg, final SendCallback sendCallback, final long timeout)throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();ExecutorService executor = this.getAsyncSenderExecutor();try {
executor.submit(new Runnable() {
@Overridepublic void run() {
long costTime = System.currentTimeMillis() - beginStartTime;if (timeout > costTime) {
try {
sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);} catch (Exception e) {
sendCallback.onException(e);}} else {
sendCallback.onException(new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));}}});} catch (RejectedExecutionException e) {
throw new MQClientException("executor rejected ", e);}}
异步发送最终进入
switch (communicationMode) {
case ASYNC:Message tmpMessage = msg;if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);msg.setBody(prevBody);}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;
case ASYNC:final AtomicInteger times = new AtomicInteger();long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");}this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, context, producer);return null;
org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessageAsync
最终进入org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeAsyncImpl
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,final InvokeCallback invokeCallback)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();final int opaque = request.getOpaque();boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {
once.release();throw new RemotingTimeoutException("invokeAsyncImpl call timeout");}final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);this.responseTable.put(opaque, responseFuture);try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Overridepublic void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);return;}requestFail(opaque);log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));}});} catch (Exception e) {
responseFuture.release();log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");} else {
String info =String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",timeoutMillis,this.semaphoreAsync.getQueueLength(),this.semaphoreAsync.availablePermits());log.warn(info);throw new RemotingTimeoutException(info);}}}
与同步发送相似,不同的是这里不需要阻塞
回调执行时机
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:processRequestCommand(ctx, cmd);break;case RESPONSE_COMMAND:processResponseCommand(ctx, cmd);break;default:break;}}}
进入org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processResponseCommand
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();final ResponseFuture responseFuture = responseTable.get(opaque);if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);responseTable.remove(opaque);if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);} else {
responseFuture.putResponse(cmd);responseFuture.release();}} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(cmd.toString());}}
如果有回调,那么直接执行回调函数,
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#executeInvokeCallback
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;ExecutorService executor = this.getCallbackExecutor();if (executor != null) {
try {
executor.submit(new Runnable() {
@Overridepublic void run() {
try {
responseFuture.executeInvokeCallback();} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);} finally {
responseFuture.release();}}});} catch (Exception e) {
runInThisThread = true;log.warn("execute callback in executor exception, maybe executor busy", e);}} else {
runInThisThread = true;}if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);} finally {
responseFuture.release();}}}
同样是丢给别的线程池执行,不一直占用work线程
?
oneWay消息
单向发送
适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
只发送消息,不等待服务器响应,只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeOnewayImpl
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
request.markOnewayRPC();boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Overridepublic void operationComplete(ChannelFuture f) throws Exception {
once.release();if (!f.isSuccess()) {
log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");}}});} catch (Exception e) {
once.release();log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");} else {
String info = String.format("invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",timeoutMillis,this.semaphoreOneway.getQueueLength(),this.semaphoreOneway.availablePermits());log.warn(info);throw new RemotingTimeoutException(info);}}}
sendOneWay非常简单,就是直接发送出去。
注意这里有一个
SemaphoreReleaseOnlyOnce
异步消息和oneWay都有这个,控制并发线程数
对于异步消息,在发送前+1,出错或者成功回调-1
private void executeInvokeCallback(final ResponseFuture responseFuture) {
boolean runInThisThread = false;ExecutorService executor = this.getCallbackExecutor();if (executor != null) {
try {
executor.submit(new Runnable() {
@Overridepublic void run() {
try {
responseFuture.executeInvokeCallback();} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);} finally {
responseFuture.release();}}});} catch (Exception e) {
runInThisThread = true;log.warn("execute callback in executor exception, maybe executor busy", e);}} else {
runInThisThread = true;}
对于oneWay,发送前+1,发送后-1
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);try {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Overridepublic void operationComplete(ChannelFuture f) throws Exception {
once.release();if (!f.isSuccess()) {
log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");}}});} catch (Exception e) {
once.release();log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);}}