找回密码
 立即注册
首页 业界区 安全 Springboo下的MQTT多broker实现

Springboo下的MQTT多broker实现

姥恫 2025-11-18 17:10:00
参考项目:GitHub - mqtt-spring-boot-starter
背景说明:
和原作者一样,也是身处IOT物联网公司,身不由己,哈哈
实现功能:
1、多broker连接,这是原作者造好的轮子
2、指定信息发布,源码上有些问题,后面做了修复处理,已经可以正常使用了。
并且由于业务场景不一样,将原作者的点对点发送,修改为订阅|发布模式。
具体位置位于 MqttClientConfiguration文件的initializeMqttClient方法中的对应【入站通道】
3、ssl连接,目前没用上,先删除了
业务代码:
1.gif
2.gif
  1. package com.smart.common.mqtt;
  2. import org.springframework.stereotype.Component;
  3. import java.lang.annotation.*;
  4. /**
  5. * <p>
  6. * MqttClient
  7. * </p >
  8. *
  9. * @author TL
  10. * @version 1.0.0
  11. */
  12. @Target(ElementType.TYPE)
  13. @Retention(RetentionPolicy.RUNTIME)
  14. @Documented
  15. @Component
  16. public @interface MqttClient {
  17.     /**
  18.      * MQTT客户端ID
  19.      */
  20.     String value() default "";
  21.     /**
  22.      * 客户端名称,用于配置中引用
  23.      */
  24.     String name();
  25. }
复制代码
@MqttClient
3.gif
4.gif
  1. package com.smart.common.mqtt;
  2. import org.springframework.core.annotation.AliasFor;
  3. import java.lang.annotation.*;
  4. /**
  5. * <p>
  6. * MqttSubscribe
  7. * </p >
  8. *
  9. * @author TL
  10. * @version 1.0.0
  11. */
  12. @Target(ElementType.METHOD)
  13. @Retention(RetentionPolicy.RUNTIME)
  14. @Documented
  15. public @interface MqttSubscribe {
  16.     /**
  17.      * 订阅的主题
  18.      */
  19.     @AliasFor("topic")
  20.     String value() default "";
  21.     /**
  22.      * 订阅的主题
  23.      */
  24.     @AliasFor("value")
  25.     String topic() default "";
  26.     /**
  27.      * QoS质量等级:0, 1, 2
  28.      */
  29.     int qos() default 1;
  30.     /**
  31.      * 客户端名称,用于指定哪个MQTT客户端处理该订阅
  32.      */
  33.     String client() default "default";
  34. }
复制代码
@MqttSubscribe
5.gif
6.gif
  1. package com.smart.common.mqtt;
  2. import lombok.Data;
  3. import org.springframework.boot.context.properties.ConfigurationProperties;
  4. import org.springframework.boot.context.properties.NestedConfigurationProperty;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. /**
  8. * <p>
  9. * MqttProperties
  10. * </p >
  11. *
  12. * @author TL
  13. * @version 1.0.0
  14. */
  15. @Data
  16. @ConfigurationProperties(prefix = "mqtt")
  17. public class MqttProperties {
  18.     /**
  19.      * 是否启用MQTT
  20.      */
  21.     private boolean enabled = true;
  22.     /**
  23.      * 线程池配置
  24.      */
  25.     @NestedConfigurationProperty
  26.     private MqttSchedulerConfig threadPool = new MqttSchedulerConfig();
  27.     /**
  28.      * 默认客户端配置
  29.      */
  30.     @NestedConfigurationProperty
  31.     private ClientConfig defaultClient = new ClientConfig();
  32.     /**
  33.      * 多客户端配置,key为客户端名称
  34.      */
  35.     private Map<String, ClientConfig> clients = new HashMap<>();
  36.     @Data
  37.     public static class ClientConfig {
  38.         /**
  39.          * MQTT服务器地址,例如:tcp://localhost:1883或ssl://localhost:8883
  40.          */
  41.         private String serverUri = "tcp://localhost:1883";
  42.         /**
  43.          * 客户端ID
  44.          */
  45.         private String clientId = "mqtt-client-" + System.currentTimeMillis();
  46.         /**
  47.          * 用户名
  48.          */
  49.         private String username;
  50.         /**
  51.          * 密码
  52.          */
  53.         private String password;
  54.         /**
  55.          * 清除会话
  56.          */
  57.         private boolean cleanSession = true;
  58.         /**
  59.          * 连接超时时间(秒)
  60.          */
  61.         private int connectionTimeout = 30;
  62.         /**
  63.          * 保持连接心跳时间(秒)
  64.          */
  65.         private int keepAliveInterval = 60;
  66.         /**
  67.          * 是否自动重连
  68.          */
  69.         private boolean automaticReconnect = true;
  70.         /**
  71.          * 默认的QoS级别
  72.          */
  73.         private int defaultQos = 1;
  74.         /**
  75.          * 默认主题
  76.          */
  77.         private String defaultTopic;
  78.     }
  79. }
复制代码
MqttProperties
7.gif
8.gif
  1. package com.smart.common.mqtt;
  2. import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
  3. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  4. import org.springframework.boot.context.properties.EnableConfigurationProperties;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  8. /**
  9. * <p>
  10. * MqttAutoConfiguration
  11. * </p >
  12. *
  13. * @author TL
  14. * @version 1.0.0
  15. */
  16. @Configuration
  17. @EnableConfigurationProperties(MqttProperties.class)
  18. @ConditionalOnProperty(prefix = "mqtt", name = "enabled", havingValue = "true", matchIfMissing = true)
  19. public class MqttAutoConfiguration {
  20.     @Bean
  21.     @ConditionalOnMissingBean
  22.     public MqttClientFactory mqttClientFactory() {
  23.         return new MqttPahoClientFactoryImpl();
  24.     }
  25. /**
  26. * 配置MQTT消息处理器Bean
  27. * 当Spring容器中不存在该Bean时才会创建
  28. *
  29. * @return MqttMessageHandler 返回一个默认的MQTT消息处理器实例
  30. */
  31.     @Bean    // 条注解:当Spring容器中不存在相同类型的Bean时,才会创建这个Bean
  32.     @ConditionalOnMissingBean
  33.     public MqttMessageHandler mqttMessageHandler() {    // 创建并返回一个默认的MQTT消息处理器实例
  34.         return new DefaultMqttMessageHandler();
  35.     }
  36.     @Bean
  37.     public MqttClientConfiguration mqttClientConfiguration(MqttProperties mqttProperties,
  38.                                                            MqttClientFactory mqttClientFactory,
  39.                                                            MqttMessageHandler mqttMessageHandler,
  40.                                                            ThreadPoolTaskScheduler mqttTaskScheduler) { // 注入调度器
  41.         return new MqttClientConfiguration(mqttProperties, mqttClientFactory, mqttMessageHandler);
  42.     }
  43.     @Bean
  44.     public MqttTemplate mqttTemplate(MqttClientConfiguration mqttClientConfiguration) {
  45.         return new MqttTemplate(
  46.                 mqttClientConfiguration.getOutboundHandlers(),
  47.                 mqttClientConfiguration.getClientConfigs());
  48.     }
  49. }
复制代码
MqttAutoConfiguration
9.gif
10.gif
  1. package com.smart.common.mqtt;
  2. import lombok.Getter;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.aop.framework.AopProxyUtils;
  5. import org.springframework.aop.support.AopUtils;
  6. import org.springframework.beans.BeansException;
  7. import org.springframework.beans.factory.BeanFactory;
  8. import org.springframework.beans.factory.BeanFactoryAware;
  9. import org.springframework.beans.factory.DisposableBean;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.beans.factory.config.BeanPostProcessor;
  12. import org.springframework.context.ApplicationListener;
  13. import org.springframework.context.event.ContextRefreshedEvent;
  14. import org.springframework.core.annotation.AnnotationUtils;
  15. import org.springframework.integration.channel.AbstractSubscribableChannel;
  16. import org.springframework.integration.channel.DirectChannel;
  17. import org.springframework.integration.channel.PublishSubscribeChannel;
  18. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  19. import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
  20. import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
  21. import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
  22. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  23. import org.springframework.util.Assert;
  24. import org.springframework.util.ReflectionUtils;
  25. import java.lang.reflect.Method;
  26. import java.util.ArrayList;
  27. import java.util.HashMap;
  28. import java.util.List;
  29. import java.util.Map;
  30. import java.util.concurrent.ConcurrentHashMap;
  31. /**
  32. * <p>
  33. * MqttClientConfiguration
  34. * </p >
  35. *
  36. * @author TL
  37. * @version 1.0.0
  38. */
  39. @Slf4j
  40. public class MqttClientConfiguration implements BeanPostProcessor,
  41.         ApplicationListener<ContextRefreshedEvent>,
  42.         DisposableBean,
  43.         BeanFactoryAware {
  44.     private final MqttProperties mqttProperties;
  45.     private final MqttClientFactory mqttClientFactory;
  46.     private final MqttMessageHandler defaultMqttMessageHandler;
  47.     @Autowired
  48.     private ThreadPoolTaskScheduler mqttTaskScheduler;
  49.     // 存储客户端工厂
  50.     private final Map<String, MqttPahoClientFactory> clientFactories = new ConcurrentHashMap<>();
  51.     // 存储MQTT出站处理器
  52.     @Getter
  53.     private final Map<String, MqttPahoMessageHandler> outboundHandlers = new ConcurrentHashMap<>();
  54.     // 存储MQTT入站适配器
  55.     private final Map<String, MqttPahoMessageDrivenChannelAdapter> inboundAdapters = new ConcurrentHashMap<>();
  56.     // 存储消息通道
  57.     private final Map<String, AbstractSubscribableChannel> channels = new ConcurrentHashMap<>();
  58.     // 存储订阅信息
  59.     private final Map<String, List<SubscriptionInfo>> subscriptions = new ConcurrentHashMap<>();
  60.     private boolean initialized = false;
  61.     @Autowired
  62.     public MqttClientConfiguration(MqttProperties mqttProperties,
  63.                                    MqttClientFactory mqttClientFactory,
  64.                                    MqttMessageHandler defaultMqttMessageHandler) {
  65.         this.mqttProperties = mqttProperties;
  66.         this.mqttClientFactory = mqttClientFactory;
  67.         this.defaultMqttMessageHandler = defaultMqttMessageHandler;
  68.     }
  69.     @Override
  70.     public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
  71.     }
  72.     @Override
  73.     public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
  74.         return bean;
  75.     }
  76.     @Override
  77.     public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
  78.         Class<?> targetClass = AopUtils.isAopProxy(bean) ?
  79.                 AopProxyUtils.ultimateTargetClass(bean) : bean.getClass();
  80.         // 处理MqttClient注解
  81.         MqttClient mqttClientAnnotation = AnnotationUtils.findAnnotation(targetClass, MqttClient.class);
  82.         if (mqttClientAnnotation != null) {
  83.             log.info("Found MQTT client: {}", mqttClientAnnotation.name());
  84.         }
  85.         // 查找带有MqttSubscribe注解的方法
  86.         ReflectionUtils.doWithMethods(targetClass, method -> {
  87.             MqttSubscribe mqttSubscribe = AnnotationUtils.findAnnotation(method, MqttSubscribe.class);
  88.             if (mqttSubscribe != null) {
  89.                 registerSubscription(bean, method, mqttSubscribe);
  90.             }
  91.         });
  92.         return bean;
  93.     }
  94.     private void registerSubscription(Object bean, Method method, MqttSubscribe mqttSubscribe) {
  95.         String topic = mqttSubscribe.topic().isEmpty() ? mqttSubscribe.value() : mqttSubscribe.topic();
  96.         Assert.hasText(topic, "Topic must be specified in @MqttSubscribe annotation");
  97.         String clientName = mqttSubscribe.client();
  98.         int qos = mqttSubscribe.qos();
  99.         log.info("Registering MQTT subscription: topic={}, qos={}, client={}, method={}.{}",
  100.                 topic, qos, clientName, bean.getClass().getSimpleName(), method.getName());
  101.         // 将订阅信息存储起来,等待context刷新后统一处理
  102.         SubscriptionInfo subscriptionInfo = new SubscriptionInfo(bean, method, topic, qos);
  103.         subscriptions.computeIfAbsent(clientName, k -> new ArrayList<>()).add(subscriptionInfo);
  104.     }
  105.     @Override
  106.     public void onApplicationEvent(ContextRefreshedEvent event) {
  107.         if (initialized || !mqttProperties.isEnabled()) {
  108.             return;
  109.         }
  110.         try {
  111.             // 初始化所有MQTT客户端
  112.             initializeMqttClients();
  113.             // 处理所有订阅
  114.             processSubscriptions();
  115.             initialized = true;
  116.             log.info("MQTT clients initialized successfully");
  117.         } catch (Exception e) {
  118.             log.error("Failed to initialize MQTT clients", e);
  119.             throw new RuntimeException("Failed to initialize MQTT clients", e);
  120.         }
  121.     }
  122.     private void initializeMqttClients() throws Exception {
  123.         // 初始化默认客户端
  124.         initializeMqttClient("default", mqttProperties.getDefaultClient());
  125.         // 初始化其他客户端
  126.         for (Map.Entry<String, MqttProperties.ClientConfig> entry : mqttProperties.getClients().entrySet()) {
  127.             initializeMqttClient(entry.getKey(), entry.getValue());
  128.         }
  129.     }
  130.     private void initializeMqttClient(String clientName, MqttProperties.ClientConfig config) throws Exception {
  131.         // 创建MQTT客户端工厂
  132.         MqttPahoClientFactory clientFactory = mqttClientFactory.createClientFactory(config);
  133.         clientFactories.put(clientName, clientFactory);
  134.         // 创建入站通道
  135.         PublishSubscribeChannel inboundChannel = new PublishSubscribeChannel();
  136.         channels.put(clientName + "-inbound", inboundChannel);
  137.         // 创建出站通道
  138.         DirectChannel outboundChannel = new DirectChannel();
  139.         channels.put(clientName + "-outbound", outboundChannel);
  140.         // 创建出站处理器
  141.         MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
  142.                 config.getClientId() + "-outbound", clientFactory);
  143.         messageHandler.setAsync(true);
  144.         if (config.getDefaultTopic() != null) {
  145.             messageHandler.setDefaultTopic(config.getDefaultTopic());
  146.         }
  147.         messageHandler.setDefaultQos(config.getDefaultQos());
  148.         messageHandler.setConverter(new DefaultPahoMessageConverter());
  149.         outboundHandlers.put(clientName, messageHandler);
  150.         log.debug("Initialized MQTT client: {}", clientName);
  151.     }
  152.     private void processSubscriptions() {
  153.         // 为每个客户端创建订阅适配器
  154.         for (Map.Entry<String, List<SubscriptionInfo>> entry : subscriptions.entrySet()) {
  155.             String clientName = entry.getKey();
  156.             List<SubscriptionInfo> clientSubscriptions = entry.getValue();
  157.             if (clientSubscriptions.isEmpty()) {
  158.                 continue;
  159.             }
  160.             // 获取客户端配置
  161.             MqttProperties.ClientConfig config = clientName.equals("default") ?
  162.                     mqttProperties.getDefaultClient() : mqttProperties.getClients().get(clientName);
  163.             if (config == null) {
  164.                 log.warn("No configuration found for MQTT client: {}, skipping subscriptions", clientName);
  165.                 continue;
  166.             }
  167.             // 获取客户端工厂
  168.             MqttPahoClientFactory clientFactory = clientFactories.get(clientName);
  169.             if (clientFactory == null) {
  170.                 log.warn("No factory found for MQTT client: {}, skipping subscriptions", clientName);
  171.                 continue;
  172.             }
  173.             // 获取入站通道
  174.             AbstractSubscribableChannel inboundChannel = channels.get(clientName + "-inbound");
  175.             // 创建入站适配器
  176.             String[] topics = clientSubscriptions.stream()
  177.                     .map(SubscriptionInfo::getTopic)
  178.                     .distinct()
  179.                     .toArray(String[]::new);
  180.             int[] qos = clientSubscriptions.stream()
  181.                     .map(SubscriptionInfo::getQos)
  182.                     .mapToInt(Integer::intValue)
  183.                     .toArray();
  184.             MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
  185.                     config.getClientId() + "-inbound", clientFactory, topics);
  186.             adapter.setQos(qos);
  187.             adapter.setConverter(new DefaultPahoMessageConverter());
  188.             adapter.setOutputChannel(inboundChannel);
  189.             adapter.setCompletionTimeout(5000);
  190.             adapter.setTaskScheduler(mqttTaskScheduler);
  191.             // 启动适配器
  192.             adapter.start();
  193.             inboundAdapters.put(clientName, adapter);
  194.             // 添加消息处理器
  195.             inboundChannel.subscribe(message -> {
  196.                 String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
  197.                 // 调用默认处理器
  198.                 defaultMqttMessageHandler.handleMessage(message, topic, clientName);
  199.                 // 调用特定的订阅方法
  200.                 clientSubscriptions.stream()
  201.                         .filter(subscription -> {
  202.                             assert topic != null;
  203.                             return topicMatches(subscription.getTopic(), topic);
  204.                         })
  205.                         .forEach(subscription -> {
  206.                             try {
  207.                                 ReflectionUtils.makeAccessible(subscription.getMethod());
  208.                                 if (subscription.getMethod().getParameterCount() == 1) {
  209.                                     subscription.getMethod().invoke(subscription.getBean(), message.getPayload());
  210.                                 } else if (subscription.getMethod().getParameterCount() == 2) {
  211.                                     subscription.getMethod().invoke(subscription.getBean(),
  212.                                             message.getPayload(), topic);
  213.                                 } else if (subscription.getMethod().getParameterCount() == 3) {
  214.                                     subscription.getMethod().invoke(subscription.getBean(),
  215.                                             message.getPayload(), topic, clientName);
  216.                                 } else {
  217.                                     subscription.getMethod().invoke(subscription.getBean());
  218.                                 }
  219.                             } catch (Exception e) {
  220.                                 log.error("Error invoking subscription method: {}",
  221.                                         subscription.getMethod().getName(), e);
  222.                             }
  223.                         });
  224.             });
  225.             log.info("Started MQTT subscription adapter for client: {} with topics: {}",
  226.                     clientName, String.join(", ", topics));
  227.         }
  228.     }
  229.     private boolean topicMatches(String subscription, String actualTopic) {
  230.         // 将主题分割为段
  231.         String[] subParts = subscription.split("/");
  232.         String[] topicParts = actualTopic.split("/");
  233.         // 如果订阅主题以 # 结尾,并且前面的所有部分都匹配,则匹配
  234.         if (subParts.length > 0 && subParts[subParts.length - 1].equals("#")) {
  235.             if (topicParts.length < subParts.length - 1) {
  236.                 return false;
  237.             }
  238.             for (int i = 0; i < subParts.length - 1; i++) {
  239.                 if (!subParts[i].equals("+") && !subParts[i].equals(topicParts[i])) {
  240.                     return false;
  241.                 }
  242.             }
  243.             return true;
  244.         }
  245.         // 如果段数不同且不是 # 结尾,则不匹配
  246.         if (subParts.length != topicParts.length) {
  247.             return false;
  248.         }
  249.         // 检查每个段是否匹配
  250.         for (int i = 0; i < subParts.length; i++) {
  251.             if (!subParts[i].equals("+") && !subParts[i].equals(topicParts[i])) {
  252.                 return false;
  253.             }
  254.         }
  255.         return true;
  256.     }
  257.     @Override
  258.     public void destroy() throws Exception {
  259.         // 关闭所有入站适配器
  260.         for (MqttPahoMessageDrivenChannelAdapter adapter : inboundAdapters.values()) {
  261.             try {
  262.                 adapter.stop();
  263.             } catch (Exception e) {
  264.                 log.warn("Error stopping MQTT adapter", e);
  265.             }
  266.         }
  267.         log.info("MQTT clients destroyed");
  268.     }
  269.     public Map<String, MqttProperties.ClientConfig> getClientConfigs() {
  270.         Map<String, MqttProperties.ClientConfig> configs = new HashMap<>();
  271.         configs.put("default", mqttProperties.getDefaultClient());
  272.         configs.putAll(mqttProperties.getClients());
  273.         return configs;
  274.     }
  275.     // 订阅信息内部类
  276.     @Getter
  277.     private static class SubscriptionInfo {
  278.         private final Object bean;
  279.         private final Method method;
  280.         private final String topic;
  281.         private final int qos;
  282.         public SubscriptionInfo(Object bean, Method method, String topic, int qos) {
  283.             this.bean = bean;
  284.             this.method = method;
  285.             this.topic = topic;
  286.             this.qos = qos;
  287.         }
  288.     }
  289. }
复制代码
MqttClientConfiguration
11.gif
12.gif
  1. package com.smart.common.mqtt;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  6. /**
  7. * <p>
  8. * MqttSchedulerConfig
  9. * </p >
  10. *
  11. * @author TL
  12. * @version 1.0.0
  13. */
  14. @Configuration
  15. public class MqttSchedulerConfig {
  16.     @Value("${mqtt.thread-pool.size:10}")
  17.     private int size;
  18.     /**
  19.      * MQTT适配器需要一个TaskScheduler来管理连接和心跳
  20.      */
  21.     @Bean
  22.     public ThreadPoolTaskScheduler mqttTaskScheduler() {
  23.         ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
  24.         scheduler.setPoolSize(size);
  25.         scheduler.setThreadNamePrefix("mqtt-scheduler-");
  26.         scheduler.setWaitForTasksToCompleteOnShutdown(true);
  27.         scheduler.setAwaitTerminationSeconds(60);
  28.         scheduler.setRemoveOnCancelPolicy(true);
  29.         return scheduler;
  30.     }
  31. }
复制代码
MqttSchedulerConfig
13.gif
14.gif
  1. package com.smart.common.mqtt;
  2. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  3. /**
  4. * <p>
  5. * MqttClientFactory
  6. * </p >
  7. *
  8. * @author TL
  9. * @version 1.0.0
  10. */
  11. public interface MqttClientFactory {
  12.     /**
  13.      * 创建MQTT客户端工厂
  14.      *
  15.      * @param clientConfig 客户端配置
  16.      * @return MQTT客户端工厂
  17.      */
  18.     MqttPahoClientFactory createClientFactory(MqttProperties.ClientConfig clientConfig) throws Exception;
  19. }
复制代码
MqttClientFactory
15.gif
16.gif
  1. package com.smart.common.mqtt;
  2. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  3. import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
  4. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  5. /**
  6. * <p>
  7. * MqttPahoClientFactoryImpl
  8. * </p >
  9. *
  10. * @author TL
  11. * @version 1.0.0
  12. */
  13. public class MqttPahoClientFactoryImpl implements MqttClientFactory {
  14.     @Override
  15.     public MqttPahoClientFactory createClientFactory(MqttProperties.ClientConfig clientConfig) throws Exception {
  16.         DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  17.         MqttConnectOptions options = new MqttConnectOptions();
  18.         // 设置基本连接属性
  19.         options.setServerURIs(new String[]{clientConfig.getServerUri()});
  20.         if (clientConfig.getUsername() != null) {
  21.             options.setUserName(clientConfig.getUsername());
  22.         }
  23.         if (clientConfig.getPassword() != null) {
  24.             options.setPassword(clientConfig.getPassword().toCharArray());
  25.         }
  26.         options.setCleanSession(clientConfig.isCleanSession());
  27.         options.setConnectionTimeout(clientConfig.getConnectionTimeout());
  28.         options.setKeepAliveInterval(clientConfig.getKeepAliveInterval());
  29.         options.setAutomaticReconnect(clientConfig.isAutomaticReconnect());
  30.         factory.setConnectionOptions(options);
  31.         return factory;
  32.     }
  33. }
复制代码
MqttPahoClientFactoryImpl
17.gif
18.gif
  1. package com.smart.common.mqtt;
  2. import org.springframework.messaging.Message;
  3. /**
  4. * <p>
  5. * MqttMessageHandler
  6. * </p >
  7. *
  8. * @author TL
  9. * @version 1.0.0
  10. */
  11. public interface MqttMessageHandler {
  12.     /**
  13.      * 处理MQTT消息
  14.      *
  15.      * @param message 消息
  16.      * @param topic 主题
  17.      * @param clientName 客户端名称
  18.      */
  19.     void handleMessage(Message<?> message, String topic, String clientName);
  20. }
复制代码
MqttMessageHandler
19.gif
20.gif
  1. package com.smart.common.mqtt;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.messaging.Message;
  4. /**
  5. * <p>
  6. * DefaultMqttMessageHandler
  7. * </p >
  8. *
  9. * @author TL
  10. * @version 1.0.0
  11. */
  12. @Slf4j
  13. public class DefaultMqttMessageHandler implements MqttMessageHandler {
  14.     @Override
  15.     public void handleMessage(Message<?> message, String topic, String clientName) {
  16.         log.info("Received message from client [{}] on topic [{}]: {}",
  17.                 clientName, topic, message.getPayload());
  18.     }
  19. }
复制代码
DefaultMqttMessageHandler
21.gif
22.gif
  1. package com.smart.common.mqtt;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
  4. import org.springframework.integration.mqtt.support.MqttHeaders;
  5. import org.springframework.integration.support.MessageBuilder;
  6. import org.springframework.messaging.Message;
  7. import org.springframework.messaging.MessagingException;
  8. import org.springframework.util.Assert;
  9. import java.util.Map;
  10. /**
  11. * <p>
  12. * MqttTemplate
  13. * </p >
  14. *
  15. * @author TL
  16. * @version 1.0.0
  17. */
  18. @Slf4j
  19. public class MqttTemplate {
  20.     private final Map<String, MqttPahoMessageHandler> messageHandlers;
  21.     private final Map<String, MqttProperties.ClientConfig> clientConfigs;
  22.     public MqttTemplate(Map<String, MqttPahoMessageHandler> messageHandlers,
  23.                         Map<String, MqttProperties.ClientConfig> clientConfigs) {
  24.         this.messageHandlers = messageHandlers;
  25.         this.clientConfigs = clientConfigs;
  26.     }
  27.     /**
  28.      * 发送消息到默认主题
  29.      *
  30.      * @param payload 消息内容
  31.      * @param clientName 客户端名称
  32.      */
  33.     public void sendToDefaultTopic(Object payload, String clientName) {
  34.         MqttProperties.ClientConfig config = getClientConfig(clientName);
  35.         Assert.hasText(config.getDefaultTopic(),
  36.                 "Default topic not configured for client: " + clientName);
  37.         send(payload, config.getDefaultTopic(), config.getDefaultQos(), clientName);
  38.     }
  39.     /**
  40.      * 发送消息到指定主题
  41.      *
  42.      * @param payload 消息内容
  43.      * @param topic 主题
  44.      * @param clientName 客户端名称
  45.      */
  46.     public void send(Object payload, String topic, String clientName) {
  47.         MqttProperties.ClientConfig config = getClientConfig(clientName);
  48.         send(payload, topic, config.getDefaultQos(), clientName);
  49.     }
  50.     /**
  51.      * 发送消息到指定主题,并指定QoS
  52.      *
  53.      * @param payload 消息内容
  54.      * @param topic 主题
  55.      * @param qos QoS等级
  56.      * @param clientName 客户端名称
  57.      */
  58.     public void send(Object payload, String topic, int qos, String clientName) {
  59.         MqttPahoMessageHandler messageHandler = messageHandlers.get(clientName);
  60.         if (messageHandler == null) {
  61.             throw new IllegalStateException("No MQTT client found with name: " + clientName);
  62.         }
  63.         Message<?> message = MessageBuilder.withPayload(payload)
  64.                 .setHeader(MqttHeaders.TOPIC, topic)
  65.                 .setHeader(MqttHeaders.QOS, qos)
  66.                 .setHeader(MqttHeaders.RETAINED, true)  // 设置消息保留
  67.                 .build();
  68.         try {
  69.             messageHandler.handleMessage(message);
  70.             log.debug("Sent message to topic [{}] with client [{}]", topic, clientName);
  71.         } catch (MessagingException e) {
  72.             log.error("Failed to send message to topic [{}] with client [{}]", topic, clientName, e);
  73.             throw e;
  74.         }
  75.     }
  76.     private MqttProperties.ClientConfig getClientConfig(String clientName) {
  77.         MqttProperties.ClientConfig config = clientConfigs.get(clientName);
  78.         if (config == null) {
  79.             throw new IllegalStateException("No MQTT client configuration found with name: " + clientName);
  80.         }
  81.         return config;
  82.     }
  83. }
复制代码
MqttTemplateMAVEN依赖:
  1. <dependency>
  2.        <groupId>org.springframework.integration</groupId>
  3.        spring-integration-mqtt</artifactId>
  4. </dependency>
复制代码
配置文件:
  1. mqtt:
  2.     enabled: true
  3.     thread-pool:
  4.         size: 10
  5.     default-client:
  6.         server-uri: tcp://secondary-broker:1883
  7.         username: another-user
  8.         password: another-password
  9.         client-id: event-${random.uuid}
  10.         default-topic: event
  11.         default-qos: 1
  12.     clients:
  13.         secondary:
  14.             server-uri: tcp://secondary-broker:1883
  15.             username: another-user
  16.             password: another-password
  17.             default-topic: secondary/topic
  18.         monitoring:
  19.             server-uri: tcp://secondary-broker:1883
  20.             username: another-user
  21.             password: another-password
  22.             default-topic: secondary/topic
复制代码
使用案例:
  1. package com.smart.web.controller.biz;
  2. import cn.hutool.json.JSONUtil;
  3. import com.smart.common.core.domain.AjaxResult;
  4. import com.smart.common.mqtt.MqttSubscribe;
  5. import com.smart.common.mqtt.MqttTemplate;
  6. import lombok.RequiredArgsConstructor;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.web.bind.annotation.*;
  9. @RestController
  10. @RequestMapping("/mqtt")
  11. @RequiredArgsConstructor
  12. @Slf4j
  13. public class MultiMqttController {
  14.     private final MqttTemplate mqttService;
  15.     /**
  16.      * 向指定broker的topic主题发送信息
  17.      * @param broker
  18.      * @param topic
  19.      * @param message
  20.      * @return
  21.      */
  22.     @PostMapping("/publish/{broker}/{topic}")
  23.     public AjaxResult publish(
  24.             @PathVariable("broker") String broker,
  25.             @PathVariable("topic") String topic,
  26.             @RequestBody Object message) {
  27.         mqttService.send(JSONUtil.toJsonStr(message),topic,broker);
  28.         return AjaxResult.success("Message published to "+topic + " :"+message);
  29.     }
  30.     /**
  31.      * 订阅测试
  32.      * @param message
  33.      */
  34.     @MqttSubscribe(topic = "event", client = "default")
  35.     public void handle(String message) {
  36.         log.info("[{}]-[{}] Received message: {}","default","event", message);
  37.     }
  38. }
复制代码
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

2025-11-30 01:18:29

举报

您需要登录后才可以回帖 登录 | 立即注册