1.依赖类
<!-- mqtt --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId></dependency><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
2.application.yml 配置文件
mqtt:hostUrl: tcp://****:1883username: adminpassword: publicclientId: ***cleanSession: truereconnect: truetimeout: 100keepAlive: 100defaultTopic: client:report:1isOpen: trueqos: 2
3. MqttProperties 配置文件类
@Component
@ConfigurationProperties("mqtt")
@Data
public class MqttProperties {/*** 用户名*/private String username;/*** 密码*/private String password;/*** 连接地址*/private String hostUrl;/*** 默认连接主题*/private String defaultTopic;/*** 超时时间*/private int timeout;/*** 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端* 发送个消息判断客户端是否在线,但这个方法并没有重连的机制*/private int keepAlive;/*** 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连* 接记录,这里设置为true表示每次连接到服务器都以新的身份连接*/private Boolean cleanSession;/*** 客户端Id,同一台服务器下,不允许出现重复的客户端id*/private String clientId;/*** 是否断线重连*/private Boolean reconnect;/*** 启动的时候是否关闭mqtt*/private Boolean isOpen;/*** 连接方式*/private Integer qos;
}
4. MqttConfig 根据环境是否启动 mqtt
@Configuration
public class MqttConfig {@Autowiredprivate MqttAcceptClient mqttAcceptClient;/*** 订阅mqtt* @Conditional 按照一定的条件进行判断,满足条件给容器注册bean。* @return*/@Conditional(MqttCondition.class)@Beanpublic MqttAcceptClient getMqttPushClient() {mqttAcceptClient.connect();return mqttAcceptClient;}}
5. springboot 启动配置 mqtt类
public class MqttCondition implements Condition {@Overridepublic boolean matches(ConditionContext context, AnnotatedTypeMetadata annotatedTypeMetadata) {//获取当前环境信息Environment environment = context.getEnvironment();String isOpen = environment.getProperty("mqtt.isOpen");return Boolean.parseBoolean(isOpen);}
}
6.mqtt客户端类
@Component
public class MqttAcceptClient {private static final Logger logger = LoggerFactory.getLogger(MqttAcceptClient.class);public static MqttClient client;@Autowiredprivate MqttAcceptCallback mqttAcceptCallback;@Autowiredprivate MqttProperties mqttProperties;private static void setClient(MqttClient client) {MqttAcceptClient.client = client;}/*** 客户端连接*/public void connect() {MqttClient client;try {client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(mqttProperties.getUsername());options.setPassword(mqttProperties.getPassword().toCharArray());options.setConnectionTimeout(mqttProperties.getTimeout());options.setKeepAliveInterval(mqttProperties.getKeepAlive());options.setAutomaticReconnect(mqttProperties.getReconnect());options.setCleanSession(mqttProperties.getCleanSession());MqttAcceptClient.setClient(client);// 设置回调client.setCallback(mqttAcceptCallback);client.connect(options);} catch (Exception e) {logger.error("[客户端连接初始化异常:{}]", e.toString());}}/*** 重新连接*/public void reconnection() {//try {while (true) {client.close();this.connect();if (client.isConnected()) {logger.info("MQTT重新连接成功:" + client);break;}Thread.sleep(10000);}} catch (MqttException | InterruptedException e) {e.printStackTrace();}}/*** 订阅某个主题** @param topic 主题* @param qos 连接方式*/public void subscribe(String topic, int qos) {logger.info("==============开始订阅主题==============" + topic);try {client.subscribe(topic, qos);} catch (MqttException e) {logger.error("");}}/*** 取消订阅某个主题** @param topic 主题*/public void unsubscribe(String topic) {logger.info("==============开始取消订阅主题==============" + topic);try {client.unsubscribe(topic);} catch (MqttException e) {e.printStackTrace();}}
}
7. mqtt消费端回调 类
@Component
public class MqttAcceptCallback implements MqttCallbackExtended {private static final Logger logger = LoggerFactory.getLogger(MqttAcceptCallback.class);@Autowiredprivate MqttAcceptClient mqttAcceptClient;@Autowiredprivate AlertVehicleService alertVehicleService;/*** 客户端断开后触发** @param throwable 异常信息*/@Overridepublic void connectionLost(Throwable throwable) {logger.info("连接断开,可以做重连");if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) {logger.info("emqx重新连接....................................................");mqttAcceptClient.reconnection();}}/*** 客户端收到消息触发** @param topic 主题* @param mqttMessage 消息*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) {logger.info("接收消息主题 : " + topic);String payLoad = new String(mqttMessage.getPayload());logger.info("接收消息 : " + payLoad );}/*** 发布消息成功** @param token token*/@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {String[] topics = token.getTopics();for (String topic : topics) {logger.info("向主题:" + topic + "发送消息成功!");}try {MqttMessage message = token.getMessage();byte[] payload = message.getPayload();String s = new String(payload, StandardCharsets.UTF_8);logger.info("消息的内容是:" + s);} catch (MqttException e) {e.printStackTrace();}}/*** 连接emq服务器后触发** @param reconnect If true, the connection was the result of automatic reconnect* @param serverUri the server uri that the connection was made to*/@Overridepublic void connectComplete(boolean reconnect, String serverUri) {logger.info("--------------------ClientId:"+ MqttAcceptClient.client.getClientId() + "客户端连接成功!--------------------");// 以/#结尾表示订阅所有以test开头的主题//需要填写你的主题//需要填写你的主题//需要填写你的主题//需要填写你的主题//需要填写你的主题mqttAcceptClient.subscribe(主题名称, 1);}
}