1、maven 依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
2.yml配置
spring:kafka:bootstrap-servers: 192.168.1.3:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: testenable-auto-commit: trueauto-commit-interval: 1000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerserver:port: 8081
3.代码
生产者
package com.example.bootkafka;import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.junit4.SpringRunner;/*** kafka生产者测试** @author chenye*/ @RunWith(SpringRunner.class) @SpringBootTest(classes = {BootKafkaProducerApplication.class}) public class ProducerTest {@Autowired(required = false)private KafkaTemplate defaultKafkaTemplate;@Testpublic void testDefaultKafkaTemplate() {defaultKafkaTemplate.send("test_topic", "I`m send msg to default topic");}@Beforepublic void testBefore() {System.out.println("before");}@Afterpublic void testAfter() {System.out.println("after");} }
消费者
package com.example.bootkafka;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component;/*** kafka消费者测试** @author chenye*/ @Component public class ConsumerTest {@KafkaListener(topics = "test_topic")public void listen(ConsumerRecord<?, ?> record) {System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());} }