当前位置: 代码迷 >> 综合 >> kafka 学习(四)spring-kafka生产者消费者收发消息
  详细解决方案

kafka 学习(四)spring-kafka生产者消费者收发消息

热度:133   发布时间:2023-10-17 18:16:00.0

1、maven 依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

2.yml配置

spring:kafka:bootstrap-servers: 192.168.1.3:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: testenable-auto-commit: trueauto-commit-interval: 1000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerserver:port: 8081

3.代码

生产者

package com.example.bootkafka;import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;/*** kafka生产者测试** @author chenye*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {BootKafkaProducerApplication.class})
public class ProducerTest {@Autowired(required = false)private KafkaTemplate defaultKafkaTemplate;@Testpublic void testDefaultKafkaTemplate() {defaultKafkaTemplate.send("test_topic", "I`m send msg to default topic");}@Beforepublic void testBefore() {System.out.println("before");}@Afterpublic void testAfter() {System.out.println("after");}
}

 

消费者

package com.example.bootkafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** kafka消费者测试** @author chenye*/
@Component
public class ConsumerTest {@KafkaListener(topics = "test_topic")public void listen(ConsumerRecord<?, ?> record) {System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());}
}
  相关解决方案