当前位置: 代码迷 >> 综合 >> SpringCloud-Stream
  详细解决方案

SpringCloud-Stream

热度:59   发布时间:2023-09-05 18:32:22.0

1.什么是消息驱动

SpringCloud Stream消息驱动可以简化开发人员对消息中间件的使用复杂度,让系统开发人员更多尽力专注与核心业务逻辑的开发。SpringCloud Stream基于SpringBoot实现,自动配置化的功能可以帮助我们快速上手学习,类似与我们之前学习的orm框架,可以平滑的切换多种不同的数据库。
目前SpringCloud Stream 目前只支持 rabbitMQ和kafka

2.消息驱动原理

绑定器
通过定义绑定器作为中间层,实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通过,是的应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。
SpringCloud-Stream

在该模型图上有如下几个核心概念:
? Source: 当需要发送消息时,我们就需要通过Source,Source将会把我们所要发送的消息(POJO对象)进行序列化(默认转换成JSON格式字符串),然后将这些数据发送到Channel中;
? Sink: 当我们需要监听消息时就需要通过Sink来,Sink负责从消息通道中获取消息,并将消息反序列化成消息对象(POJO对象),然后交给具体的消息监听处理进行业务处理;
? Channel: 消息通道是Stream的抽象之一。通常我们向消息中间件发送消息或者监听消息时需要指定主题(Topic)/消息队列名称,但这样一旦我们需要变更主题名称的时候需要修改消息发送或者消息监听的代码,但是通过Channel抽象,我们的业务代码只需要对Channel就可以了,具体这个Channel对应的是那个主题,就可以在配置文件中来指定,这样当主题变更的时候我们就不用对代码做任何修改,从而实现了与具体消息中间件的解耦;
? Binder: Stream中另外一个抽象层。通过不同的Binder可以实现与不同消息中间件的整合,比如上面的示例我们所使用的就是针对Kafka的Binder,通过Binder提供统一的消息收发接口,从而使得我们可以根据实际需要部署不同的消息中间件,或者根据实际生产中所部署的消息中间件来调整我们的配置。

3.SpringCloud-Stream创建生产者

依赖

	<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.1.RELEASE</version></parent><dependencies><!-- SpringBoot整合Web组件 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- springcloud stream 就是对springboot 整合 rabbit mq 在做了一层封装 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId><version>2.0.1.RELEASE</version></dependency></dependencies></project>

配置文件

server:port: 9000
spring:application:name: spingcloud-stream-producer
#####默认是本地配置 
# rabbitmq:
# host: 127.0.0.1
# port: 5672
# username: guest
# password: guest

创建发送通道的接口

// 创建 发送消息通道
public interface SendMessageInterface {// 1.创建发送消息通道,@Output("my_stream_channel")SubscribableChannel sendMsg();
}

controller

@RestController
public class SendMsgController {@Autowiredprivate SendMessageInterface sendMessageInterface;// 生产者流程:// 1.创建发送消息通道// 2.生产投递消息(生产者往通道中发送消息)@RequestMapping("/sendMsg")public String sendMsg() {String msg = UUID.randomUUID().toString();System.out.println("生产者发送内容msg:" + msg);Message build = MessageBuilder.withPayload(msg.getBytes()).build();sendMessageInterface.sendMsg().send(build);return "success";}// 3.开启绑定(结合)}

启动类

@SpringBootApplication
@EnableBinding(SendMessageInterface.class)
public class AppProducer {public static void main(String[] args) {SpringApplication.run(AppProducer.class, args);}// 思考: 在rabbit 有交换机 队列// 默认以通道名称 创建交换机,消费者启动的时候 随机创建一个队列名称
}

4.SpringCloud-Stream创建消费者

	<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.1.RELEASE</version></parent><dependencies><!-- SpringBoot整合Web组件 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- springcloud stream 就是对springboot 整合 rabbit mq 在做了一层封装 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId><version>2.0.1.RELEASE</version></dependency></dependencies>
</project>

配置文件

server:port: 8002
spring:application:name: spring-cloud-stream
# rabbitmq:
# host: 192.168.174.128
# port: 5672
# username: guest
# password: guestcloud:stream:bindings:my_stream_channel: ###指定 管道名称#指定该应用实例属于 stream 消费组group: stream

定义消费者

@Component
public class Consumer {@Value("${server.port}")private String serverPort;@StreamListener("my_stream_channel")public void redMsg(String msg) {System.out.println("消费者获取到生产者投递的消息:" + msg + ",端口号:" + serverPort);}}
public interface RedMsgInterface {@Input("my_stream_channel")SubscribableChannel redMsg();
}

启动类

@SpringBootApplication
@EnableBinding(RedMsgInterface.class)
public class AppConsumer {public static void main(String[] args) {SpringApplication.run(AppConsumer.class, args);}// 消费者队列 底层自动创建一个队列 绑定my_stream_channel
}

可以看到,我们上面并创建队列,而是他自动帮我创建的

5.更改环境为kafka

Maven依赖

		<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>2.0.1.RELEASE</version></dependency>

生产者配置

server:port: 9000
spring:cloud:stream:# 设置成使用kafkakafka:binder:# Kafka的服务端列表,默认localhostbrokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092# Kafka服务端连接的ZooKeeper节点列表,默认localhostzkNodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181minPartitionCount: 1autoCreateTopics: trueautoAddPartitions: true

消费者配置

server:port: 8000
spring:application:name: springcloud_kafka_consumercloud:instance-count: 1instance-index: 0stream:kafka:binder:brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092zk-nodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181auto-add-partitions: trueauto-create-topics: truemin-partition-count: 1bindings:input:destination: my_msggroup: s1consumer:autoCommitOffset: falseconcurrency: 1partitioned: false
  相关解决方案