目录
特点
术语词汇
特点
- 支持事务消息
- 支持顺序消息
- 支持consumer端tag过滤(还没研究)
术语词汇
- Name Server:服务注册发现,用于生产者获取Broker节点信息 端口号:9876
- Broker:存储和负责转发消息 默认10911
- Producer :生产者
- Consumer:消费者
单机版本安装
- 官网下载压缩包,解压配置文件: unzip rocketmq-all-4.6.0-bin-release.zip
- 内存修改,由于默认比较大,找到# runserver.sh,# runbroker.sh两个文件修改内存为 JAVA_OPT="${JAVA_OPT} -server –Xms128m –Xmx128m –Xmn128m"
- 启动NameServer: nohup sh bin/mqnamesrv &
- 启动mqbroker: nohup sh bin/mqbroker -c ./conf/broker.conf -n 127.0.0.1:9876 &
- 查看是否成功: cat nohup.out ,出现success字样为成功
- systemctl stop firewalld关闭防火墙(不关会访问不通)
- 关闭命令:
关闭namesrv服务:sh bin/mqshutdown namesrv 关闭broker服务 :sh bin/mqshutdown broker
-
使用Rocketmq-console(可视化控制台)可以更好的管理,git有项目,直接d就可以
集群部署
环境准备
-
两台已经安装单机版完毕的机器,(我的ip分别为 192.168.199.153,192.168.199.152)
-
systemctl stop firewalld 关闭防火墙(不关会访问不通)
-
在192.168.199.153找到conf/broker.conf,修改内容
#集群名
brokerClusterName = zdxCluster
#Broker名
brokerName = broker-a
#0:Master,1:Slave
brokerId = 0
#删除动作,默认值04表示凌晨4 点
deleteWhen = 04
#两天触发一次
fileReservedTime = 48
#ASYNC_MASTER 异步复制Master,SYNC_MASTER 同步双写Master - SLAVE
brokerRole = ASYNC_MASTER
#刷盘方式 - ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘
flushDiskType = ASYNC_FLUSH -
在192.168.199.152找到conf/broker.conf,修改内容
#集群名
brokerClusterName = zdxCluster
#Broker名
brokerName = broker-b
#0:Master,1:Slave
brokerId = 0
#删除动作,默认值04表示凌晨4 点
deleteWhen = 04
#两天触发一次
fileReservedTime = 48
#ASYNC_MASTER 异步复制Master,SYNC_MASTER 同步双写Master - SLAVE
brokerRole = ASYNC_MASTER
#刷盘方式 - ASYNC_FLUSH 异步刷盘 - SYNC_FLUSH 同步刷盘
flushDiskType = ASYNC_FLUSH -
启动方式在153上:nohup sh bin/mqnamesrv &
- 启动方式在153上:nohup sh bin/mqbroker -c ./conf/broker.conf -n 127.0.0.1:9876 &
-
(将152的broker注册在152的nameServer上)启动方式在152上:nohup sh bin/mqbroker -c ./conf/broker.conf -n 192.168.199.153:9876 &
-
Rocketmq-console观看效果
SpringBoot整合 Rocketmq
Maven依赖
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RELEASE</version></parent><!-- springboot-web组件 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 连接rocketmq组件 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.3</version></dependency><!-- set get组件 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency>
配置文件
application.yml
rocketmq:###连接地址nameServername-server: 192.168.199.153:9876;###生产者命名producer:group: zdx_producer
server:port: 8088
建一个TOPIC(主题)(我用的主题名为zdxTtopic,如果命名不同,下面代码请酌情修改)
实体类
@Data
public class OrderEntity implements Serializable {private String orderId;private String orderName;public OrderEntity(String orderId, String orderName) {this.orderId = orderId;this.orderName = orderName;}public OrderEntity() {}@Overridepublic String toString() {return "OrderEntity{" +"orderId='" + orderId + '\'' +", orderName='" + orderName + '\'' +'}';}
生产者
public class ProducerSend {public void sendMsg() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {// 创建分组DefaultMQProducer producer = new DefaultMQProducer();// 生产者名称producer.setProducerGroup("zdxGroup");// 连接地址producer.setNamesrvAddr("192.168.199.153:9876");producer.start();Message msg=new Message("zdxTtopic","晚上一起去烧烤".getBytes());long orderId=System.currentTimeMillis();SendResult result = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message message, Object o) {//手动将消息存放在我们的队列return mqs.get(0);}}, orderId);}public static void main(String[] args) {try {new ProducerSend().sendMsg();} catch (InterruptedException e) {e.printStackTrace();} catch (RemotingException e) {e.printStackTrace();} catch (MQClientException e) {e.printStackTrace();} catch (MQBrokerException e) {e.printStackTrace();}}}
消费者
@Service
@RocketMQMessageListener(topic = "zdxTtopic", consumerGroup = "zdxConsumer")
public class OrdeConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String t) {System.out.println("这是个消费者:" + t);}
}
启动项目可以看到效果
(RocketMQ除了要开放9876和10911端口外,还需要开放10909和10912端口)