参考项目:GitHub - mqtt-spring-boot-starter
背景说明:
和原作者一样,也是身处IOT物联网公司,身不由己,哈哈
实现功能:
1、多broker连接,这是原作者造好的轮子
2、指定信息发布,源码上有些问题,后面做了修复处理,已经可以正常使用了。
并且由于业务场景不一样,将原作者的点对点发送,修改为订阅|发布模式。
具体位置位于 MqttClientConfiguration文件的initializeMqttClient方法中的对应【入站通道】
3、ssl连接,目前没用上,先删除了
业务代码:
- package com.smart.common.mqtt;
- import org.springframework.stereotype.Component;
- import java.lang.annotation.*;
- /**
- * <p>
- * MqttClient
- * </p >
- *
- * @author TL
- * @version 1.0.0
- */
- @Target(ElementType.TYPE)
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- @Component
- public @interface MqttClient {
- /**
- * MQTT客户端ID
- */
- String value() default "";
- /**
- * 客户端名称,用于配置中引用
- */
- String name();
- }
复制代码 @MqttClient- package com.smart.common.mqtt;
- import org.springframework.core.annotation.AliasFor;
- import java.lang.annotation.*;
- /**
- * <p>
- * MqttSubscribe
- * </p >
- *
- * @author TL
- * @version 1.0.0
- */
- @Target(ElementType.METHOD)
- @Retention(RetentionPolicy.RUNTIME)
- @Documented
- public @interface MqttSubscribe {
- /**
- * 订阅的主题
- */
- @AliasFor("topic")
- String value() default "";
- /**
- * 订阅的主题
- */
- @AliasFor("value")
- String topic() default "";
- /**
- * QoS质量等级:0, 1, 2
- */
- int qos() default 1;
- /**
- * 客户端名称,用于指定哪个MQTT客户端处理该订阅
- */
- String client() default "default";
- }
复制代码 @MqttSubscribe
- package com.smart.common.mqtt;
- import lombok.Data;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.boot.context.properties.NestedConfigurationProperty;
- import java.util.HashMap;
- import java.util.Map;
- /**
- * <p>
- * MqttProperties
- * </p >
- *
- * @author TL
- * @version 1.0.0
- */
- @Data
- @ConfigurationProperties(prefix = "mqtt")
- public class MqttProperties {
- /**
- * 是否启用MQTT
- */
- private boolean enabled = true;
- /**
- * 线程池配置
- */
- @NestedConfigurationProperty
- private MqttSchedulerConfig threadPool = new MqttSchedulerConfig();
- /**
- * 默认客户端配置
- */
- @NestedConfigurationProperty
- private ClientConfig defaultClient = new ClientConfig();
- /**
- * 多客户端配置,key为客户端名称
- */
- private Map<String, ClientConfig> clients = new HashMap<>();
- @Data
- public static class ClientConfig {
- /**
- * MQTT服务器地址,例如:tcp://localhost:1883或ssl://localhost:8883
- */
- private String serverUri = "tcp://localhost:1883";
- /**
- * 客户端ID
- */
- private String clientId = "mqtt-client-" + System.currentTimeMillis();
- /**
- * 用户名
- */
- private String username;
- /**
- * 密码
- */
- private String password;
- /**
- * 清除会话
- */
- private boolean cleanSession = true;
- /**
- * 连接超时时间(秒)
- */
- private int connectionTimeout = 30;
- /**
- * 保持连接心跳时间(秒)
- */
- private int keepAliveInterval = 60;
- /**
- * 是否自动重连
- */
- private boolean automaticReconnect = true;
- /**
- * 默认的QoS级别
- */
- private int defaultQos = 1;
- /**
- * 默认主题
- */
- private String defaultTopic;
- }
- }
复制代码 MqttProperties
- package com.smart.common.mqtt;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
- import org.springframework.boot.context.properties.EnableConfigurationProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
- /**
- * <p>
- * MqttAutoConfiguration
- * </p >
- *
- * @author TL
- * @version 1.0.0
- */
- @Configuration
- @EnableConfigurationProperties(MqttProperties.class)
- @ConditionalOnProperty(prefix = "mqtt", name = "enabled", havingValue = "true", matchIfMissing = true)
- public class MqttAutoConfiguration {
- @Bean
- @ConditionalOnMissingBean
- public MqttClientFactory mqttClientFactory() {
- return new MqttPahoClientFactoryImpl();
- }
- /**
- * 配置MQTT消息处理器Bean
- * 当Spring容器中不存在该Bean时才会创建
- *
- * @return MqttMessageHandler 返回一个默认的MQTT消息处理器实例
- */
- @Bean // 条注解:当Spring容器中不存在相同类型的Bean时,才会创建这个Bean
- @ConditionalOnMissingBean
- public MqttMessageHandler mqttMessageHandler() { // 创建并返回一个默认的MQTT消息处理器实例
- return new DefaultMqttMessageHandler();
- }
- @Bean
- public MqttClientConfiguration mqttClientConfiguration(MqttProperties mqttProperties,
- MqttClientFactory mqttClientFactory,
- MqttMessageHandler mqttMessageHandler,
- ThreadPoolTaskScheduler mqttTaskScheduler) { // 注入调度器
- return new MqttClientConfiguration(mqttProperties, mqttClientFactory, mqttMessageHandler);
- }
- @Bean
- public MqttTemplate mqttTemplate(MqttClientConfiguration mqttClientConfiguration) {
- return new MqttTemplate(
- mqttClientConfiguration.getOutboundHandlers(),
- mqttClientConfiguration.getClientConfigs());
- }
- }
复制代码 MqttAutoConfiguration
- package com.smart.common.mqtt;
- import lombok.Getter;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.aop.framework.AopProxyUtils;
- import org.springframework.aop.support.AopUtils;
- import org.springframework.beans.BeansException;
- import org.springframework.beans.factory.BeanFactory;
- import org.springframework.beans.factory.BeanFactoryAware;
- import org.springframework.beans.factory.DisposableBean;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.config.BeanPostProcessor;
- import org.springframework.context.ApplicationListener;
- import org.springframework.context.event.ContextRefreshedEvent;
- import org.springframework.core.annotation.AnnotationUtils;
- import org.springframework.integration.channel.AbstractSubscribableChannel;
- import org.springframework.integration.channel.DirectChannel;
- import org.springframework.integration.channel.PublishSubscribeChannel;
- import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
- import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
- import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
- import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
- import org.springframework.util.Assert;
- import org.springframework.util.ReflectionUtils;
- import java.lang.reflect.Method;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- /**
- * <p>
- * MqttClientConfiguration
- * </p >
- *
- * @author TL
- * @version 1.0.0
- */
- @Slf4j
- public class MqttClientConfiguration implements BeanPostProcessor,
- ApplicationListener<ContextRefreshedEvent>,
- DisposableBean,
- BeanFactoryAware {
- private final MqttProperties mqttProperties;
- private final MqttClientFactory mqttClientFactory;
- private final MqttMessageHandler defaultMqttMessageHandler;
- @Autowired
- private ThreadPoolTaskScheduler mqttTaskScheduler;
- // 存储客户端工厂
- private final Map<String, MqttPahoClientFactory> clientFactories = new ConcurrentHashMap<>();
- // 存储MQTT出站处理器
- @Getter
- private final Map<String, MqttPahoMessageHandler> outboundHandlers = new ConcurrentHashMap<>();
- // 存储MQTT入站适配器
- private final Map<String, MqttPahoMessageDrivenChannelAdapter> inboundAdapters = new ConcurrentHashMap<>();
- // 存储消息通道
- private final Map<String, AbstractSubscribableChannel> channels = new ConcurrentHashMap<>();
- // 存储订阅信息
- private final Map<String, List<SubscriptionInfo>> subscriptions = new ConcurrentHashMap<>();
- private boolean initialized = false;
- @Autowired
- public MqttClientConfiguration(MqttProperties mqttProperties,
- MqttClientFactory mqttClientFactory,
- MqttMessageHandler defaultMqttMessageHandler) {
- this.mqttProperties = mqttProperties;
- this.mqttClientFactory = mqttClientFactory;
- this.defaultMqttMessageHandler = defaultMqttMessageHandler;
- }
- @Override
- public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
- }
- @Override
- public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
- return bean;
- }
- @Override
- public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
- Class<?> targetClass = AopUtils.isAopProxy(bean) ?
- AopProxyUtils.ultimateTargetClass(bean) : bean.getClass();
- // 处理MqttClient注解
- MqttClient mqttClientAnnotation = AnnotationUtils.findAnnotation(targetClass, MqttClient.class);
- if (mqttClientAnnotation != null) {
- log.info("Found MQTT client: {}", mqttClientAnnotation.name());
- }
- // 查找带有MqttSubscribe注解的方法
- ReflectionUtils.doWithMethods(targetClass, method -> {
- MqttSubscribe mqttSubscribe = AnnotationUtils.findAnnotation(method, MqttSubscribe.class);
- if (mqttSubscribe != null) {
- registerSubscription(bean, method, mqttSubscribe);
- }
- });
- return bean;
- }
- private void registerSubscription(Object bean, Method method, MqttSubscribe mqttSubscribe) {
- String topic = mqttSubscribe.topic().isEmpty() ? mqttSubscribe.value() : mqttSubscribe.topic();
- Assert.hasText(topic, "Topic must be specified in @MqttSubscribe annotation");
- String clientName = mqttSubscribe.client();
- int qos = mqttSubscribe.qos();
- log.info("Registering MQTT subscription: topic={}, qos={}, client={}, method={}.{}",
- topic, qos, clientName, bean.getClass().getSimpleName(), method.getName());
- // 将订阅信息存储起来,等待context刷新后统一处理
- SubscriptionInfo subscriptionInfo = new SubscriptionInfo(bean, method, topic, qos);
- subscriptions.computeIfAbsent(clientName, k -> new ArrayList<>()).add(subscriptionInfo);
- }
- @Override
- public void onApplicationEvent(ContextRefreshedEvent event) {
- if (initialized || !mqttProperties.isEnabled()) {
- return;
- }
- try {
- // 初始化所有MQTT客户端
- initializeMqttClients();
- // 处理所有订阅
- processSubscriptions();
- initialized = true;
- log.info("MQTT clients initialized successfully");
- } catch (Exception e) {
- log.error("Failed to initialize MQTT clients", e);
- throw new RuntimeException("Failed to initialize MQTT clients", e);
- }
- }
- private void initializeMqttClients() throws Exception {
- // 初始化默认客户端
- initializeMqttClient("default", mqttProperties.getDefaultClient());
- // 初始化其他客户端
- for (Map.Entry<String, MqttProperties.ClientConfig> entry : mqttProperties.getClients().entrySet()) {
- initializeMqttClient(entry.getKey(), entry.getValue());
- }
- }
- private void initializeMqttClient(String clientName, MqttProperties.ClientConfig config) throws Exception {
- // 创建MQTT客户端工厂
- MqttPahoClientFactory clientFactory = mqttClientFactory.createClientFactory(config);
- clientFactories.put(clientName, clientFactory);
- // 创建入站通道
- PublishSubscribeChannel inboundChannel = new PublishSubscribeChannel();
- channels.put(clientName + "-inbound", inboundChannel);
- // 创建出站通道
- DirectChannel outboundChannel = new DirectChannel();
- channels.put(clientName + "-outbound", outboundChannel);
- // 创建出站处理器
- MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
- config.getClientId() + "-outbound", clientFactory);
- messageHandler.setAsync(true);
- if (config.getDefaultTopic() != null) {
- messageHandler.setDefaultTopic(config.getDefaultTopic());
- }
- messageHandler.setDefaultQos(config.getDefaultQos());
- messageHandler.setConverter(new DefaultPahoMessageConverter());
- outboundHandlers.put(clientName, messageHandler);
- log.debug("Initialized MQTT client: {}", clientName);
- }
- private void processSubscriptions() {
- // 为每个客户端创建订阅适配器
- for (Map.Entry<String, List<SubscriptionInfo>> entry : subscriptions.entrySet()) {
- String clientName = entry.getKey();
- List<SubscriptionInfo> clientSubscriptions = entry.getValue();
- if (clientSubscriptions.isEmpty()) {
- continue;
- }
- // 获取客户端配置
- MqttProperties.ClientConfig config = clientName.equals("default") ?
- mqttProperties.getDefaultClient() : mqttProperties.getClients().get(clientName);
- if (config == null) {
- log.warn("No configuration found for MQTT client: {}, skipping subscriptions", clientName);
- continue;
- }
- // 获取客户端工厂
- MqttPahoClientFactory clientFactory = clientFactories.get(clientName);
- if (clientFactory == null) {
- log.warn("No factory found for MQTT client: {}, skipping subscriptions", clientName);
- continue;
- }
- // 获取入站通道
- AbstractSubscribableChannel inboundChannel = channels.get(clientName + "-inbound");
- // 创建入站适配器
- String[] topics = clientSubscriptions.stream()
- .map(SubscriptionInfo::getTopic)
- .distinct()
- .toArray(String[]::new);
- int[] qos = clientSubscriptions.stream()
- .map(SubscriptionInfo::getQos)
- .mapToInt(Integer::intValue)
- .toArray();
- MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
- config.getClientId() + "-inbound", clientFactory, topics);
- adapter.setQos(qos);
- adapter.setConverter(new DefaultPahoMessageConverter());
- adapter.setOutputChannel(inboundChannel);
- adapter.setCompletionTimeout(5000);
- adapter.setTaskScheduler(mqttTaskScheduler);
- // 启动适配器
- adapter.start();
- inboundAdapters.put(clientName, adapter);
- // 添加消息处理器
- inboundChannel.subscribe(message -> {
- String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
- // 调用默认处理器
- defaultMqttMessageHandler.handleMessage(message, topic, clientName);
- // 调用特定的订阅方法
- clientSubscriptions.stream()
- .filter(subscription -> {
- assert topic != null;
- return topicMatches(subscription.getTopic(), topic);
- })
- .forEach(subscription -> {
- try {
- ReflectionUtils.makeAccessible(subscription.getMethod());
- if (subscription.getMethod().getParameterCount() == 1) {
- subscription.getMethod().invoke(subscription.getBean(), message.getPayload());
- } else if (subscription.getMethod().getParameterCount() == 2) {
- subscription.getMethod().invoke(subscription.getBean(),
- message.getPayload(), topic);
- } else if (subscription.getMethod().getParameterCount() == 3) {
- subscription.getMethod().invoke(subscription.getBean(),
- message.getPayload(), topic, clientName);
- } else {
- subscription.getMethod().invoke(subscription.getBean());
- }
- } catch (Exception e) {
- log.error("Error invoking subscription method: {}",
- subscription.getMethod().getName(), e);
- }
- });
- });
- log.info("Started MQTT subscription adapter for client: {} with topics: {}",
- clientName, String.join(", ", topics));
- }
- }
- private boolean topicMatches(String subscription, String actualTopic) {
- // 将主题分割为段
- String[] subParts = subscription.split("/");
- String[] topicParts = actualTopic.split("/");
- // 如果订阅主题以 # 结尾,并且前面的所有部分都匹配,则匹配
- if (subParts.length > 0 && subParts[subParts.length - 1].equals("#")) {
- if (topicParts.length < subParts.length - 1) {
- return false;
- }
- for (int i = 0; i < subParts.length - 1; i++) {
- if (!subParts[i].equals("+") && !subParts[i].equals(topicParts[i])) {
- return false;
- }
- }
- return true;
- }
- // 如果段数不同且不是 # 结尾,则不匹配
- if (subParts.length != topicParts.length) {
- return false;
- }
- // 检查每个段是否匹配
- for (int i = 0; i < subParts.length; i++) {
- if (!subParts[i].equals("+") && !subParts[i].equals(topicParts[i])) {
- return false;
- }
- }
- return true;
- }
- @Override
- public void destroy() throws Exception {
- // 关闭所有入站适配器
- for (MqttPahoMessageDrivenChannelAdapter adapter : inboundAdapters.values()) {
- try {
- adapter.stop();
- } catch (Exception e) {
- log.warn("Error stopping MQTT adapter", e);
- }
- }
- log.info("MQTT clients destroyed");
- }
- public Map<String, MqttProperties.ClientConfig> getClientConfigs() {
- Map<String, MqttProperties.ClientConfig> configs = new HashMap<>();
- configs.put("default", mqttProperties.getDefaultClient());
- configs.putAll(mqttProperties.getClients());
- return configs;
- }
- // 订阅信息内部类
- @Getter
- private static class SubscriptionInfo {
- private final Object bean;
- private final Method method;
- private final String topic;
- private final int qos;
- public SubscriptionInfo(Object bean, Method method, String topic, int qos) {
- this.bean = bean;
- this.method = method;
- this.topic = topic;
- this.qos = qos;
- }
- }
- }
复制代码 MqttClientConfiguration
- package com.smart.common.mqtt;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
- /**
- * <p>
- * MqttSchedulerConfig
- * </p >
- *
- * @author TL
- * @version 1.0.0
- */
- @Configuration
- public class MqttSchedulerConfig {
- @Value("${mqtt.thread-pool.size:10}")
- private int size;
- /**
- * MQTT适配器需要一个TaskScheduler来管理连接和心跳
- */
- @Bean
- public ThreadPoolTaskScheduler mqttTaskScheduler() {
- ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
- scheduler.setPoolSize(size);
- scheduler.setThreadNamePrefix("mqtt-scheduler-");
- scheduler.setWaitForTasksToCompleteOnShutdown(true);
- scheduler.setAwaitTerminationSeconds(60);
- scheduler.setRemoveOnCancelPolicy(true);
- return scheduler;
- }
- }
复制代码 MqttSchedulerConfig
- package com.smart.common.mqtt;
- import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
- /**
- * <p>
- * MqttClientFactory
- * </p >
- *
- * @author TL
- * @version 1.0.0
- */
- public interface MqttClientFactory {
- /**
- * 创建MQTT客户端工厂
- *
- * @param clientConfig 客户端配置
- * @return MQTT客户端工厂
- */
- MqttPahoClientFactory createClientFactory(MqttProperties.ClientConfig clientConfig) throws Exception;
- }
复制代码 MqttClientFactory
- package com.smart.common.mqtt;
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
- import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
- /**
- * <p>
- * MqttPahoClientFactoryImpl
- * </p >
- *
- * @author TL
- * @version 1.0.0
- */
- public class MqttPahoClientFactoryImpl implements MqttClientFactory {
- @Override
- public MqttPahoClientFactory createClientFactory(MqttProperties.ClientConfig clientConfig) throws Exception {
- DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
- MqttConnectOptions options = new MqttConnectOptions();
- // 设置基本连接属性
- options.setServerURIs(new String[]{clientConfig.getServerUri()});
- if (clientConfig.getUsername() != null) {
- options.setUserName(clientConfig.getUsername());
- }
- if (clientConfig.getPassword() != null) {
- options.setPassword(clientConfig.getPassword().toCharArray());
- }
- options.setCleanSession(clientConfig.isCleanSession());
- options.setConnectionTimeout(clientConfig.getConnectionTimeout());
- options.setKeepAliveInterval(clientConfig.getKeepAliveInterval());
- options.setAutomaticReconnect(clientConfig.isAutomaticReconnect());
- factory.setConnectionOptions(options);
- return factory;
- }
- }
复制代码 MqttPahoClientFactoryImpl
- package com.smart.common.mqtt;
- import org.springframework.messaging.Message;
- /**
- * <p>
- * MqttMessageHandler
- * </p >
- *
- * @author TL
- * @version 1.0.0
- */
- public interface MqttMessageHandler {
- /**
- * 处理MQTT消息
- *
- * @param message 消息
- * @param topic 主题
- * @param clientName 客户端名称
- */
- void handleMessage(Message<?> message, String topic, String clientName);
- }
复制代码 MqttMessageHandler- package com.smart.common.mqtt;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.messaging.Message;
- /**
- * <p>
- * DefaultMqttMessageHandler
- * </p >
- *
- * @author TL
- * @version 1.0.0
- */
- @Slf4j
- public class DefaultMqttMessageHandler implements MqttMessageHandler {
- @Override
- public void handleMessage(Message<?> message, String topic, String clientName) {
- log.info("Received message from client [{}] on topic [{}]: {}",
- clientName, topic, message.getPayload());
- }
- }
复制代码 DefaultMqttMessageHandler
- package com.smart.common.mqtt;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
- import org.springframework.integration.mqtt.support.MqttHeaders;
- import org.springframework.integration.support.MessageBuilder;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.MessagingException;
- import org.springframework.util.Assert;
- import java.util.Map;
- /**
- * <p>
- * MqttTemplate
- * </p >
- *
- * @author TL
- * @version 1.0.0
- */
- @Slf4j
- public class MqttTemplate {
- private final Map<String, MqttPahoMessageHandler> messageHandlers;
- private final Map<String, MqttProperties.ClientConfig> clientConfigs;
- public MqttTemplate(Map<String, MqttPahoMessageHandler> messageHandlers,
- Map<String, MqttProperties.ClientConfig> clientConfigs) {
- this.messageHandlers = messageHandlers;
- this.clientConfigs = clientConfigs;
- }
- /**
- * 发送消息到默认主题
- *
- * @param payload 消息内容
- * @param clientName 客户端名称
- */
- public void sendToDefaultTopic(Object payload, String clientName) {
- MqttProperties.ClientConfig config = getClientConfig(clientName);
- Assert.hasText(config.getDefaultTopic(),
- "Default topic not configured for client: " + clientName);
- send(payload, config.getDefaultTopic(), config.getDefaultQos(), clientName);
- }
- /**
- * 发送消息到指定主题
- *
- * @param payload 消息内容
- * @param topic 主题
- * @param clientName 客户端名称
- */
- public void send(Object payload, String topic, String clientName) {
- MqttProperties.ClientConfig config = getClientConfig(clientName);
- send(payload, topic, config.getDefaultQos(), clientName);
- }
- /**
- * 发送消息到指定主题,并指定QoS
- *
- * @param payload 消息内容
- * @param topic 主题
- * @param qos QoS等级
- * @param clientName 客户端名称
- */
- public void send(Object payload, String topic, int qos, String clientName) {
- MqttPahoMessageHandler messageHandler = messageHandlers.get(clientName);
- if (messageHandler == null) {
- throw new IllegalStateException("No MQTT client found with name: " + clientName);
- }
- Message<?> message = MessageBuilder.withPayload(payload)
- .setHeader(MqttHeaders.TOPIC, topic)
- .setHeader(MqttHeaders.QOS, qos)
- .setHeader(MqttHeaders.RETAINED, true) // 设置消息保留
- .build();
- try {
- messageHandler.handleMessage(message);
- log.debug("Sent message to topic [{}] with client [{}]", topic, clientName);
- } catch (MessagingException e) {
- log.error("Failed to send message to topic [{}] with client [{}]", topic, clientName, e);
- throw e;
- }
- }
- private MqttProperties.ClientConfig getClientConfig(String clientName) {
- MqttProperties.ClientConfig config = clientConfigs.get(clientName);
- if (config == null) {
- throw new IllegalStateException("No MQTT client configuration found with name: " + clientName);
- }
- return config;
- }
- }
复制代码 MqttTemplateMAVEN依赖:- <dependency>
- <groupId>org.springframework.integration</groupId>
- spring-integration-mqtt</artifactId>
- </dependency>
复制代码 配置文件:- mqtt:
- enabled: true
- thread-pool:
- size: 10
- default-client:
- server-uri: tcp://secondary-broker:1883
- username: another-user
- password: another-password
- client-id: event-${random.uuid}
- default-topic: event
- default-qos: 1
- clients:
- secondary:
- server-uri: tcp://secondary-broker:1883
- username: another-user
- password: another-password
- default-topic: secondary/topic
- monitoring:
- server-uri: tcp://secondary-broker:1883
- username: another-user
- password: another-password
- default-topic: secondary/topic
复制代码 使用案例:- package com.smart.web.controller.biz;
- import cn.hutool.json.JSONUtil;
- import com.smart.common.core.domain.AjaxResult;
- import com.smart.common.mqtt.MqttSubscribe;
- import com.smart.common.mqtt.MqttTemplate;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.web.bind.annotation.*;
- @RestController
- @RequestMapping("/mqtt")
- @RequiredArgsConstructor
- @Slf4j
- public class MultiMqttController {
- private final MqttTemplate mqttService;
- /**
- * 向指定broker的topic主题发送信息
- * @param broker
- * @param topic
- * @param message
- * @return
- */
- @PostMapping("/publish/{broker}/{topic}")
- public AjaxResult publish(
- @PathVariable("broker") String broker,
- @PathVariable("topic") String topic,
- @RequestBody Object message) {
- mqttService.send(JSONUtil.toJsonStr(message),topic,broker);
- return AjaxResult.success("Message published to "+topic + " :"+message);
- }
- /**
- * 订阅测试
- * @param message
- */
- @MqttSubscribe(topic = "event", client = "default")
- public void handle(String message) {
- log.info("[{}]-[{}] Received message: {}","default","event", message);
- }
- }
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |