第一种acks
/*** 生产者发送消息** @param key 推送数据的key* @param data 推送数据的data*/private void send(String key, String data) {// topic key名称 data消息数据ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send("cqTestTopic", key, data);// 开启消息监听//发送成功后回调SuccessCallback<SendResult<String,String>> successCallback = new SuccessCallback<SendResult<String,String>>() {@Overridepublic void onSuccess(SendResult<String,String> result) {System.out.println("发送消息成功 - "+"kay:"+key+" value:"+data);}};//发送失败回调FailureCallback failureCallback = new FailureCallback() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送消息失败-添加消息到发送失败日志中 - "+"kay:"+key+" value:"+data);}};listenableFuture.addCallback(successCallback,failureCallback);}
第二种acks
/*** acks* 消息发送的监听器,用于回调返回信息** 使用:在需要监听Producer推送消息回调的kafkaTemplate.send(*)方法下引入*/public void acks() {kafkaTemplate.setProducerListener(new org.springframework.kafka.support.ProducerListener<String, String>() {@Overridepublic void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {System.out.println("acks onSuccess >> topic:" +producerRecord.topic() + " key:" +producerRecord.key() + " value:" +producerRecord.value() + " timestamp:" +producerRecord.timestamp());}@Overridepublic void onError(ProducerRecord<String, String> producerRecord, Exception exception) {System.out.println("acks onError >> topic:" +producerRecord.topic() + " key:" +producerRecord.key()+ " value:" +producerRecord.value());}});}