当前位置: 代码迷 >> 综合 >> RocketMQ-求生之路
  详细解决方案

RocketMQ-求生之路

热度:57   发布时间:2023-11-17 01:29:55.0

目录

特点

术语词汇


特点

  1. 支持事务消息
  2. 支持顺序消息
  3. 支持consumer端tag过滤(还没研究)

术语词汇

  1. Name Server:服务注册发现,用于生产者获取Broker节点信息 端口号:9876
  2. Broker:存储和负责转发消息 默认10911
  3. Producer :生产者
  4. Consumer:消费者

单机版本安装

  1. 官网下载压缩包,解压配置文件:   unzip rocketmq-all-4.6.0-bin-release.zip
  2. 内存修改,由于默认比较大,找到# runserver.sh,# runbroker.sh两个文件修改内存为                             JAVA_OPT="${JAVA_OPT} -server –Xms128m –Xmx128m –Xmn128m"
  3. 启动NameServer: nohup sh bin/mqnamesrv &
  4. 启动mqbroker: nohup sh bin/mqbroker -c ./conf/broker.conf -n 127.0.0.1:9876 &
  5. 查看是否成功: cat nohup.out   ,出现success字样为成功
  6. systemctl stop firewalld关闭防火墙(不关会访问不通)
  7. 关闭命令:

    关闭namesrv服务:sh bin/mqshutdown namesrv    关闭broker服务 :sh bin/mqshutdown broker

  8. 使用Rocketmq-console(可视化控制台)可以更好的管理,git有项目,直接d就可以

集群部署

环境准备

  1. 两台已经安装单机版完毕的机器,(我的ip分别为 192.168.199.153,192.168.199.152)

  2. systemctl stop firewalld  关闭防火墙(不关会访问不通)

  3. 在192.168.199.153找到conf/broker.conf,修改内容

    #集群名
    brokerClusterName = zdxCluster
    #Broker名
    brokerName = broker-a
    #0:Master,1:Slave
    brokerId = 0
    #删除动作,默认值04表示凌晨4 点
    deleteWhen = 04
    #两天触发一次
    fileReservedTime = 48
    #ASYNC_MASTER 异步复制Master,SYNC_MASTER 同步双写Master - SLAVE
    brokerRole = ASYNC_MASTER
    #刷盘方式 - ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘
    flushDiskType = ASYNC_FLUSH
  4. 在192.168.199.152找到conf/broker.conf,修改内容

    #集群名
    brokerClusterName = zdxCluster
    #Broker名
    brokerName = broker-b
    #0:Master,1:Slave
    brokerId = 0
    #删除动作,默认值04表示凌晨4 点
    deleteWhen = 04
    #两天触发一次
    fileReservedTime = 48
    #ASYNC_MASTER 异步复制Master,SYNC_MASTER 同步双写Master - SLAVE
    brokerRole = ASYNC_MASTER
    #刷盘方式 - ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘
    flushDiskType = ASYNC_FLUSH
  5. 启动方式在153上:nohup sh bin/mqnamesrv &

  6. 启动方式在153上:nohup sh bin/mqbroker -c ./conf/broker.conf -n 127.0.0.1:9876 &
  7. (将152的broker注册在152的nameServer上)启动方式在152上:nohup sh bin/mqbroker -c ./conf/broker.conf -n 192.168.199.153:9876 &

  8. Rocketmq-console观看效果

SpringBoot整合 Rocketmq

Maven依赖

 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RELEASE</version></parent><!-- springboot-web组件 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 连接rocketmq组件 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.3</version></dependency><!-- set get组件 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>

配置文件

application.yml
rocketmq:###连接地址nameServername-server: 192.168.199.153:9876;###生产者命名producer:group: zdx_producer
server:port: 8088

建一个TOPIC(主题)(我用的主题名为zdxTtopic,如果命名不同,下面代码请酌情修改)

实体类

@Data
public class OrderEntity implements Serializable {private String orderId;private String orderName;public OrderEntity(String orderId, String orderName) {this.orderId = orderId;this.orderName = orderName;}public OrderEntity() {}@Overridepublic String toString() {return "OrderEntity{" +"orderId='" + orderId + '\'' +", orderName='" + orderName + '\'' +'}';}

生产者

public class ProducerSend {public void sendMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {// 创建分组DefaultMQProducer producer = new DefaultMQProducer();// 生产者名称producer.setProducerGroup("zdxGroup");// 连接地址producer.setNamesrvAddr("192.168.199.153:9876");producer.start();Message msg=new Message("zdxTtopic","晚上一起去烧烤".getBytes());long orderId=System.currentTimeMillis();SendResult result = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message message, Object o) {//手动将消息存放在我们的队列return mqs.get(0);}}, orderId);}public static void main(String[] args) {try {new ProducerSend().sendMsg();} catch (InterruptedException e) {e.printStackTrace();} catch (RemotingException e) {e.printStackTrace();} catch (MQClientException e) {e.printStackTrace();} catch (MQBrokerException e) {e.printStackTrace();}}}

消费者

@Service
@RocketMQMessageListener(topic = "zdxTtopic", consumerGroup = "zdxConsumer")
public class OrdeConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String t) {System.out.println("这是个消费者:" + t);}
}

启动项目可以看到效果

(RocketMQ除了要开放9876和10911端口外,还需要开放10909和10912端口)