当前位置: 代码迷 >> 综合 >> RocketMQ Producer
  详细解决方案

RocketMQ Producer

热度:71   发布时间:2023-10-13 15:15:04.0

文章目录

  • 一 生产者属性
  • 二 默认topic作用
    • 1 Broker初始化TBW102
    • 2 生产者获取默认topic信息
  • 三 发送消息重试
  • 四 Broker接收消息分析
    • 1 Broker启动注册Processor
    • 2 SendMessageProcessor
      • 2.1 Broker创建topic
      • 2.2 消息存储
  • 五 Producer负载均衡

一 生产者属性

    /*** 生产者组名称*/private String producerGroup;/*** 默认topic名称,其它topic不存在时,会先获取默认topic的队列和broker信息。* 然后发送消息,topic不存在时,会在第一次发送消息时创建。*/private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;/*** 创建topic时默认的队列数量*/private volatile int defaultTopicQueueNums = 4;/*** 发送消息超时时间*/private int sendMsgTimeout = 3000;/*** 当消息超过4kb时,进行压缩*/private int compressMsgBodyOverHowmuch = 1024 * 4;/*** 消息发送失败重试发消息次数,同步模式*/private int retryTimesWhenSendFailed = 2;/*** 消息发送失败发消息次数,异步模式*/private int retryTimesWhenSendAsyncFailed = 2;/*** 消息最大为4M*/private int maxMessageSize = 1024 * 1024 * 4;

二 默认topic作用

private String createTopicKey = TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC;
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = “TBW102”;

1 Broker初始化TBW102

Broker启动会定时上报topic的信息到nameSrv。

        /*** 定时向nameSrv上报topic数据*/this.scheduledExecutorService.scheduleAtFixedRate(() -> {
    try {
    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {
    log.error("registerBrokerAll Exception", e);}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);/*** 上报topic信息,topic信息来自config目录的topics.json文件* * @param checkOrderConfig* @param oneway* @param forceRegister*/public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
    ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
    TopicConfig tmp =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),this.brokerConfig.getBrokerPermission());topicConfigTable.put(topicConfig.getTopicName(), tmp);}topicConfigWrapper.setTopicConfigTable(topicConfigTable);}if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills())) {
    doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}}

Topic配置文件topics.json,默认topic为TBW102

{
    "dataVersion":{
    "counter":1,"timestamp":1643214418621},"topicConfigTable":{
    ......"TBW102":{
    "order":false,"perm":7,"readQueueNums":8,"topicFilterType":"SINGLE_TAG","topicName":"TBW102","topicSysFlag":0,"writeQueueNums":8},......}
}

2 生产者获取默认topic信息

    private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 第一次返回默认topic信息, 这里不返回下面的判断逻辑进不去TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {
    ......// 真正发送消息的方法sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);......}}/*** 第一次尝试从nameSrc获取默认topic的信息并返回,保证消息能够正常发送* 默认topic TWB12的信息已经在Broker启动时上报到了nameSrv* * @param topic* @return*/private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    /*** 第一次进来topic肯定是没有创建的,这里为NULL*/TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);if (null == topicPublishInfo || !topicPublishInfo.ok()) {
    this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
    return topicPublishInfo;} else {
    /*** 当topic不存在时获取默认topic的信息并返回*/this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}}

三 发送消息重试

private int retryTimesWhenSendFailed = 2;

    private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //....../*** 获取重试次数*/int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];/*** 如果不是超时,则重发消息*/for (; times < timesTotal; times++) {
    String lastBrokerName = null == mq ? null : mq.getBrokerName();MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {
    mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {
    beginTimestampPrev = System.currentTimeMillis();if (times > 0) {
    //Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {
    callTimeout = true;break;}// 发送消息逻辑,Netty RemotingClientsendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);}// .......}}}

四 Broker接收消息分析

1 Broker启动注册Processor

public class BrokerStartup {
    // ......public static void main(String[] args) {
    /*** createBrokerController创建BrokerController*/start(createBrokerController(args));}public static BrokerController createBrokerController(String[] args) {
    // ....../*** 初始化BrokerController*/boolean initResult = controller.initialize();// ......}......
}public class BrokerController {
    /*** 初始化BrokerController时,注册Processor处理器* * @return* @throws CloneNotSupportedException*/public boolean initialize() throws CloneNotSupportedException {
    // ......this.registerProcessor();// ......}/*** 注册请求处理器,通过RequestCode处理不同的请求*/public void registerProcessor() {
    /*** SendMessageProcessor*/SendMessageProcessor sendProcessor = new SendMessageProcessor(this);sendProcessor.registerSendMessageHook(sendMessageHookList);sendProcessor.registerConsumeMessageHook(consumeMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);/*** PullMessageProcessor*/this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);/*** ReplyMessageProcessor*/ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);replyMessageProcessor.registerSendMessageHook(sendMessageHookList);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);/*** QueryMessageProcessor*/NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);/*** ClientManageProcessor*/ClientManageProcessor clientProcessor = new ClientManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);/*** ConsumerManageProcessor*/ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);/*** EndTransactionProcessor*/this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);/*** Default*/AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);}// ......
}

2 SendMessageProcessor

2.1 Broker创建topic

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
    public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
    ....../*** Broker接收到消息处理逻辑*/return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);......}private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {
    /*** 消息预处理*/final RemotingCommand response = preSend(ctx, request, requestHeader);......}private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,SendMessageRequestHeader requestHeader) {
    ....../*** 消息检查*/super.msgCheck(ctx, requestHeader, response);......}
}public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
    ......TopicConfig topicConfig =this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (null == topicConfig) {
    ....../*** 消息检查时,如果topic不存在,则创建topic,创建时会向nameSrv上报。* 上报成功后,下一次发送消息,则可以获取到topic相关的信息,而不是用默认的topic信息*/topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(requestHeader.getTopic(),requestHeader.getDefaultTopic(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.getDefaultTopicQueueNums(), topicSysFlag);if (null == topicConfig) {
    if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    topicConfig =this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,topicSysFlag);}}......}......}
}

2.2 消息存储

  • 所有的消息存储在store/commitLog
  • store/consumerQueue下面是topic目录,topic目录下面是队列id标号目录,队列id编号下面是索引文件,用来记录消息commitLogOffset、msgSize、tagsCode。
  • store/index文件记录了其它的一些索引。
  • commitLog,consumerQueue可以指定大小。

RocketMQ Producer

    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // 省略部分把消息写入到了内存中....../*** 刷盘,同步刷盘或者异步刷盘。同步刷盘直接持久化到磁盘,异步刷盘则定时持久化*/CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);/*** HA机制,主从关系下,需要向从节点复制数据。常用部署通常是异步刷盘,通过主从节点同步复制来保证数据可靠性。*/CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
    if (flushStatus != PutMessageStatus.PUT_OK) {
    putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {
    putMessageResult.setPutMessageStatus(replicaStatus);if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {
    log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",msg.getTopic(), msg.getTags(), msg.getBornHostNameString());}}return putMessageResult;});}

RocketMQ Producer

五 Producer负载均衡

// 选择可用的队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);// 第一次会选择一个队列,发送失败重试,会再次选择并且避开之前同broker的队列public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
    return selectOneMessageQueue();} else {
    for (int i = 0; i < this.messageQueueList.size(); i++) {
    int index = this.sendWhichQueue.incrementAndGet();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);// 直到选的队列和之前的不在同一个broker为止, broker宕机处理策略。宕机恢复或者主从切换,需要定时任务同步最新broker信息if (!mq.getBrokerName().equals(lastBrokerName)) {
    return mq;}}// 再次选择return selectOneMessageQueue();}}