文章目录
- 一 生产者属性
- 二 默认topic作用
-
- 1 Broker初始化TBW102
- 2 生产者获取默认topic信息
- 三 发送消息重试
- 四 Broker接收消息分析
-
- 1 Broker启动注册Processor
- 2 SendMessageProcessor
-
- 2.1 Broker创建topic
- 2.2 消息存储
- 五 Producer负载均衡
一 生产者属性
/*** 生产者组名称*/private String producerGroup;/*** 默认topic名称,其它topic不存在时,会先获取默认topic的队列和broker信息。* 然后发送消息,topic不存在时,会在第一次发送消息时创建。*/private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;/*** 创建topic时默认的队列数量*/private volatile int defaultTopicQueueNums = 4;/*** 发送消息超时时间*/private int sendMsgTimeout = 3000;/*** 当消息超过4kb时,进行压缩*/private int compressMsgBodyOverHowmuch = 1024 * 4;/*** 消息发送失败重试发消息次数,同步模式*/private int retryTimesWhenSendFailed = 2;/*** 消息发送失败发消息次数,异步模式*/private int retryTimesWhenSendAsyncFailed = 2;/*** 消息最大为4M*/private int maxMessageSize = 1024 * 1024 * 4;
二 默认topic作用
private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = “TBW102”;
1 Broker初始化TBW102
Broker启动会定时上报topic的信息到nameSrv。
/*** 定时向nameSrv上报topic数据*/this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);/*** 上报topic信息,topic信息来自config目录的topics.json文件* * @param checkOrderConfig* @param oneway* @param forceRegister*/public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),this.brokerConfig.getBrokerPermission());topicConfigTable.put(topicConfig.getTopicName(), tmp);}topicConfigWrapper.setTopicConfigTable(topicConfigTable);}if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills())) {
doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}}
Topic配置文件topics.json,默认topic为TBW102
{
"dataVersion":{
"counter":1,"timestamp":1643214418621},"topicConfigTable":{
......"TBW102":{
"order":false,"perm":7,"readQueueNums":8,"topicFilterType":"SINGLE_TAG","topicName":"TBW102","topicSysFlag":0,"writeQueueNums":8},......}
}
2 生产者获取默认topic信息
private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 第一次返回默认topic信息, 这里不返回下面的判断逻辑进不去TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {
......// 真正发送消息的方法sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);......}}/*** 第一次尝试从nameSrc获取默认topic的信息并返回,保证消息能够正常发送* 默认topic TWB12的信息已经在Broker启动时上报到了nameSrv* * @param topic* @return*/private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
/*** 第一次进来topic肯定是没有创建的,这里为NULL*/TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;} else {
/*** 当topic不存在时获取默认topic的信息并返回*/this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}}
三 发送消息重试
private int retryTimesWhenSendFailed = 2;
private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//....../*** 获取重试次数*/int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];/*** 如果不是超时,则重发消息*/for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {
mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {
beginTimestampPrev = System.currentTimeMillis();if (times > 0) {
//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {
callTimeout = true;break;}// 发送消息逻辑,Netty RemotingClientsendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);}// .......}}}
四 Broker接收消息分析
1 Broker启动注册Processor
public class BrokerStartup {
// ......public static void main(String[] args) {
/*** createBrokerController创建BrokerController*/start(createBrokerController(args));}public static BrokerController createBrokerController(String[] args) {
// ....../*** 初始化BrokerController*/boolean initResult = controller.initialize();// ......}......
}public class BrokerController {
/*** 初始化BrokerController时,注册Processor处理器* * @return* @throws CloneNotSupportedException*/public boolean initialize() throws CloneNotSupportedException {
// ......this.registerProcessor();// ......}/*** 注册请求处理器,通过RequestCode处理不同的请求*/public void registerProcessor() {
/*** SendMessageProcessor*/SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);/*** PullMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);/*** ReplyMessageProcessor*/ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);replyMessageProcessor.registerSendMessageHook(sendMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);/*** QueryMessageProcessor*/NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);/*** ClientManageProcessor*/ClientManageProcessor clientProcessor = new ClientManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);/*** ConsumerManageProcessor*/ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);/*** EndTransactionProcessor*/this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);/*** Default*/AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);}// ......
}
2 SendMessageProcessor
2.1 Broker创建topic
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
....../*** Broker接收到消息处理逻辑*/return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);......}private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {
/*** 消息预处理*/final RemotingCommand response = preSend(ctx, request, requestHeader);......}private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,SendMessageRequestHeader requestHeader) {
....../*** 消息检查*/super.msgCheck(ctx, requestHeader, response);......}
}public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
......TopicConfig topicConfig =this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (null == topicConfig) {
....../*** 消息检查时,如果topic不存在,则创建topic,创建时会向nameSrv上报。* 上报成功后,下一次发送消息,则可以获取到topic相关的信息,而不是用默认的topic信息*/topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(requestHeader.getTopic(),requestHeader.getDefaultTopic(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.getDefaultTopicQueueNums(), topicSysFlag);if (null == topicConfig) {
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
topicConfig =this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,topicSysFlag);}}......}......}
}
2.2 消息存储
- 所有的消息存储在store/commitLog
- store/consumerQueue下面是topic目录,topic目录下面是队列id标号目录,队列id编号下面是索引文件,用来记录消息commitLogOffset、msgSize、tagsCode。
- store/index文件记录了其它的一些索引。
- commitLog,consumerQueue可以指定大小。
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
// 省略部分把消息写入到了内存中....../*** 刷盘,同步刷盘或者异步刷盘。同步刷盘直接持久化到磁盘,异步刷盘则定时持久化*/CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);/*** HA机制,主从关系下,需要向从节点复制数据。常用部署通常是异步刷盘,通过主从节点同步复制来保证数据可靠性。*/CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",msg.getTopic(), msg.getTags(), msg.getBornHostNameString());}}return putMessageResult;});}
五 Producer负载均衡
// 选择可用的队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);// 第一次会选择一个队列,发送失败重试,会再次选择并且避开之前同broker的队列public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();} else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);// 直到选的队列和之前的不在同一个broker为止, broker宕机处理策略。宕机恢复或者主从切换,需要定时任务同步最新broker信息if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;}}// 再次选择return selectOneMessageQueue();}}