当前位置: 代码迷 >> 综合 >> Pulsar - Producer is already present on the connection解决方案
  详细解决方案

Pulsar - Producer is already present on the connection解决方案

热度:31   发布时间:2024-01-28 19:21:13.0

一、前言

服务端使用的是2.4.1版本的Pulsar,客户端使用Pulsar Proxy发送消息到Topic中。前几个月运行稳定,最近一直偶然出现 Producer is already present on the connection 问题。在初步尝试修改producer名后,仍然会偶现这个异常。在遍访众多大神文章、github后找到了原因以及解决办法。

二、出现原因

前置条件:

  • pulsar开启了负载均衡

  • 负载均衡会在下面的情况下触发

    • List item

触发后 server端发送disconnect到client端,并处理一些列善后操作,client端接收disconnect后 会触发重连机制,一直重试连接 。server端收到请求后先执行topic.addProducer 里面会检查checkownership,此时 发现服务端中有相同producerId对应的ProducerFuture(其实是善后工作还未完成) 随即抛出异常 。但是该版本的pulsar (2.4.1)
处理逻辑是这样的:

....try {//--- 本案例 这里会抛出 RuntimeExceptiontopic.addProducer(producer);if (isActive()) {if (producerFuture.complete(producer)) {log.info("[{}] Created new producer: {}", remoteAddress, producer);ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,producer.getLastSequenceId(), producer.getSchemaVersion()));return;} else {// The producer's future was completed before by// a close commandproducer.closeNow();log.info("[{}] Cleared producer created after timeout on client side {}",remoteAddress, producer);}} else {producer.closeNow();log.info("[{}] Cleared producer created after connection was closed: {}",remoteAddress, producer);producerFuture.completeExceptionally(new IllegalStateException("Producer created after connection was closed"));}// 重点是这里 代码中只捕获处理BrokerServiceException这个异常 。 导致RuntimeException 未被处理。 最终导致 producerFuture 处于dangling} catch (BrokerServiceException ise) {log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName,ise.getMessage());ctx.writeAndFlush(Commands.newError(requestId,BrokerServiceException.getClientErrorCode(ise), ise.getMessage()));producerFuture.completeExceptionally(ise);}

所以再后续的client重连producer,server端检查到有问相同的任务,才一直报错。

详细原因介绍 请看这里,原作者是基于2.5.0版本操作体验

三、解决办法

升级服务端Pulsar版本>=2.5.2 ,就是这么简单。至截稿时(2020年07月15日),官方早已推出2.6.0, 推荐这个版本。

原因: 2020年5月6号,官方已合并bug解决方案 , O(∩_∩)O哈哈~ 下面就是修改的地方。
在这里插入图片描述

  相关解决方案