当前位置: 代码迷 >> 综合 >> kafka - Producer acks应答
  详细解决方案

kafka - Producer acks应答

热度:2   发布时间:2024-01-28 20:41:25.0

 

 第一种acks

/*** 生产者发送消息** @param key  推送数据的key* @param data 推送数据的data*/private void send(String key, String data) {// topic key名称 data消息数据ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send("cqTestTopic", key, data);// 开启消息监听//发送成功后回调SuccessCallback<SendResult<String,String>> successCallback = new SuccessCallback<SendResult<String,String>>() {@Overridepublic void onSuccess(SendResult<String,String> result) {System.out.println("发送消息成功 - "+"kay:"+key+"  value:"+data);}};//发送失败回调FailureCallback failureCallback = new FailureCallback() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送消息失败-添加消息到发送失败日志中 - "+"kay:"+key+"  value:"+data);}};listenableFuture.addCallback(successCallback,failureCallback);}

第二种acks 

    /*** acks* 消息发送的监听器,用于回调返回信息** 使用:在需要监听Producer推送消息回调的kafkaTemplate.send(*)方法下引入*/public void acks() {kafkaTemplate.setProducerListener(new org.springframework.kafka.support.ProducerListener<String, String>() {@Overridepublic void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {System.out.println("acks onSuccess >> topic:" +producerRecord.topic() + "  key:" +producerRecord.key() + "  value:" +producerRecord.value() + "   timestamp:" +producerRecord.timestamp());}@Overridepublic void onError(ProducerRecord<String, String> producerRecord, Exception exception) {System.out.println("acks onError >> topic:" +producerRecord.topic() + "  key:" +producerRecord.key()+ "  value:" +producerRecord.value());}});}