找回密码
 立即注册
首页 业界区 业界 ESP32 MQTT对接巴法云平台

ESP32 MQTT对接巴法云平台

施婉秀 2025-6-2 23:34:52
ESP32 MQTT对接巴法云平台

MQTT(Message Queuing Telemetry Transport)是一种轻量级的 发布/订阅(Publish/Subscribe) 消息传输协议,专为 低带宽、高延迟、不稳定网络环境 设计,是物联网(IoT)领域的核心通信协议之一。
MQTT 核心特性

特性说明轻量级协议头最小仅 2字节,适合嵌入式设备发布/订阅模型设备不直接通信,通过 Broker(代理服务器) 中转,解耦性强低功耗适合电池供电设备,支持 心跳包(Keep Alive) 维持长连接QoS 质量等级支持 0/1/2 三级消息可靠性保证主题(Topic)分层式消息路由(如 home/living_room/temperature),支持通配符 + 和 #MQTT 与 HTTP 对比

对比维度MQTTHTTP协议开销极低(适合高频小数据)高(Header 冗余)通信模式双向实时推送单向请求-响应连接开销长连接(减少握手延迟)短连接(每次请求需重建连接)适用场景物联网、移动推送、实时监控Web 服务、API 接口MQTT 核心概念


  • Broker(代理服务器)

    • 核心枢纽,负责消息路由(如 Mosquitto、EMQX、HiveMQ)
    • 实现消息存储转发、客户端管理、安全认证

  • Topic(主题)

    • 消息分类的层级路径(例如:factory/machine1/status)
    • 通配符:

      • +:单级匹配(home/+/temperature 匹配 home/living_room/temperature)
      • #:多级匹配(home/# 匹配 home/living_room/light)


  • QoS(服务质量)

    • QoS 0:最多一次(尽力交付,可能丢失)
    • QoS 1:至少一次(确保送达,可能重复)
    • QoS 2:恰好一次(严格保证,无重复)

micro python可行性先行验证
  1. # 导入必要的库
  2. from umqtt.simple import MQTTClient  # MQTT协议客户端库
  3. import time                          # 时间相关函数
  4. from machine import Timer            # 硬件定时器控制
  5. #################### 用户可修改配置区域 ####################
  6. wifiName = "jianzhiji02"             # WiFi名称(仅支持2.4G网络)
  7. wifiPassword = "8765432111"          # WiFi密码
  8. clientID = ""  # 设备密钥,从巴法云控制台获取
  9. subTopic = 'OTALX'                   # 订阅的主题(接收指令)
  10. pubTopic = 'bafa'               # 发布的主题(发送数据)
  11. #################### 固定配置区域 ########################
  12. serverIP = "bemfa.com"               # MQTT服务器地址
  13. port = 9501                          # MQTT端口号
  14. ping_interval = 300                  # 心跳包间隔(秒)
  15. ################# WiFi连接函数 ##########################
  16. def do_connect():
  17.     """连接WiFi网络"""
  18.     import network
  19.     sta_if = network.WLAN(network.STA_IF)
  20.     if not sta_if.isconnected():
  21.         print('正在连接网络...')
  22.         sta_if.active(True)
  23.         sta_if.connect(wifiName, wifiPassword)
  24.         while not sta_if.isconnected():
  25.             pass
  26.     print('WiFi连接成功')
  27.     print('网络配置:', sta_if.ifconfig())
  28. ################# MQTT消息回调函数 #######################
  29. def msg_callback(topic, msg):
  30.     """处理接收到的MQTT消息"""
  31.     global client  # 声明全局客户端对象
  32.    
  33.     print("[消息到达] 主题:", topic.decode(), "| 载荷:", msg.decode())
  34.    
  35.     # 判断是否是订阅的主题
  36.     if topic.decode() == subTopic:
  37.         # 根据指令执行操作
  38.         if msg == b"on":
  39.             print("执行开机操作")
  40.             # 示例:控制GPIO输出高电平
  41.             # pin.value(1)
  42.             
  43.             # 发送状态确认(推送消息)
  44.             client.publish(pubTopic, "设备已开启")
  45.             
  46.         elif msg == b"off":
  47.             print("执行关机操作")
  48.             # 示例:控制GPIO输出低电平
  49.             # pin.value(0)
  50.             
  51.             # 发送状态确认(推送消息)
  52.             client.publish(pubTopic, "设备已关闭")
  53. ################ MQTT连接与订阅函数 #####################
  54. def connect_mqtt():
  55.     """建立MQTT连接并订阅主题"""
  56.     global client  # 声明全局客户端对象
  57.    
  58.     try:
  59.         # 创建客户端实例
  60.         client = MQTTClient(clientID, serverIP, port)
  61.         client.set_callback(msg_callback)  # 设置回调函数
  62.         
  63.         # 建立连接
  64.         client.connect()
  65.         print(f"成功连接到MQTT服务器 {serverIP}:{port}")
  66.         
  67.         # 订阅主题
  68.         client.subscribe(subTopic)
  69.         print(f"已订阅主题: {subTopic}")
  70.         
  71.         # 设置心跳间隔(可选)
  72.         client.keepalive = ping_interval
  73.         
  74.         return client
  75.    
  76.     except Exception as e:
  77.         print("MQTT连接失败:", e)
  78.         restart_and_reconnect()
  79. ################ 异常处理函数 ##########################
  80. def restart_and_reconnect():
  81.     """重启设备并重连"""
  82.     print("10秒后重启设备...")
  83.     time.sleep(10)
  84.     machine.reset()
  85. ################ 定时推送函数 ##########################
  86. def timed_publish(timer):
  87.     """定时发布设备状态(示例)"""
  88.     global client
  89.     try:
  90.         # 示例数据(可替换为传感器数据)
  91.         status = "11"
  92.         client.publish(pubTopic, f" {status}")
  93.         print(f"定时推送: {status}")
  94.     except:
  95.         print("定时推送失败")
  96. ################# 主程序流程 ###########################
  97. if __name__ == "__main__":
  98.     # 连接WiFi
  99.     do_connect()
  100.    
  101.     # 初始化MQTT连接
  102.     connect_mqtt()
  103.    
  104.     # 设置定时器(每30秒发送心跳)
  105.     timer = Timer(-1)  # 创建虚拟定时器
  106.     timer.init(
  107.         period=3000,    # 间隔3秒(3000毫秒)
  108.         mode=Timer.PERIODIC,
  109.         callback=timed_publish
  110.     )
  111.     print("定时推送已启用")
  112.    
  113.     # 主循环
  114.     try:
  115.         while True:
  116.             try:
  117.                 client.check_msg()  # 检查新消息
  118.                 time.sleep(1)       # 降低CPU占用
  119.                
  120.             except Exception as e:
  121.                 print("运行错误:", e)
  122.                 restart_and_reconnect()
  123.                
  124.     except KeyboardInterrupt:
  125.         print("程序终止")
  126.         client.disconnect()
  127.         timer.deinit()
复制代码
终端打印
1.png

成功连接云平台,成功推送,订阅主题。可行性验证成功。
云平台

巴法云物联网平台_MQTT设备云
推送内容控制led

2.png
接受ESP32推送内容

3.png
Esp-idf+C语言实现

重要代码段分析

MQTT推送
  1. //----------------MQTT推送-----------------------------------------------------------------//
  2. //推送字符
  3.         const char *status = "putting";
  4.         //mqtt句柄,推送目标主题,推送内容,长度,服务质量,发布消息的标志位(通常设置为0)
  5.         esp_mqtt_client_publish(mqtt_client, PUB_TOPIC, status, strlen(status), 0, 0);
  6.   //推送数字
  7.         num+=3;//推送运行时间
  8.         snprintf(num_str, sizeof(num_str), "%d", num); // 将整数转为字符串
  9.         esp_mqtt_client_publish(mqtt_client, PUB_TOPIC2, num_str, strlen(num_str), 0, 0);
复制代码
MQTT推送函数

esp_mqtt_client_publish:这是ESP-IDF提供的函数,用于通过MQTT客户端发布消息。


  • mqtt_client:这是MQTT客户端的句柄,通常是在初始化MQTT客户端时创建的。
  • PUB_TOPIC:这是您要发布消息的主题名称,通常是一个字符串。
  • status:这是您要发布的消息内容,通常是一个字符串。
  • strlen(status):这是消息内容的长度,strlen函数用于计算字符串的长度。
    :这是QoS(Quality of Service)等级,表示服务质量。MQTT协议定义了三种QoS等级:

    • 0:最多一次("fire-and-forget")
    • 1:至少一次
    • 2:只有一次

  • 0:这是发布消息的标志位,通常设置为0。
MQTT订阅(MQTT成功连接)
  1. // MQTT事件处理
  2. static void mqtt_event_handler(void *handler_args, esp_event_base_t base,
  3.                               int32_t event_id, void *event_data) {
  4.     esp_mqtt_event_handle_t event = event_data;
  5.     switch (event->event_id) {
  6.         case MQTT_EVENT_CONNECTED://<MQTT连接事件>---------->MQTT成功连接则订阅主题
  7.             ESP_LOGI(TAG, "MQTT Connected");
  8.             mqtt_connected = true;
  9.             //----------------MQTT订阅(MQTT成功后)-----------------------------------------
  10.             //mqtt句柄,订阅的主题,服务等级(0)
  11.             esp_mqtt_client_subscribe(mqtt_client, SUB_TOPIC, 0);//订阅主题SUB_TOPIC(ledctrl)
  12.             
  13.             
  14.         case MQTT_EVENT_DATA: {//<MQTT接收数据事件>
  15.             // 处理接收数据
  16.             //----------------判断接收主题-----------------------------------------------------------------//
  17.             char topic[event->topic_len + 1];
  18.             memcpy(topic, event->topic, event->topic_len);
  19.             topic[event->topic_len] = '\0';
  20.             //----------------判断接收数据-----------------------------------------------------------------//
  21.             char data[event->data_len + 1];
  22.             memcpy(data, event->data, event->data_len);
  23.             data[event->data_len] = '\0';
  24.             //----------------打印接收主题+数据-----------------------------------------------------------------//
  25.             ESP_LOGI(TAG, "Received: Topic=%s, Data=%s", topic, data);
  26.             if (strcmp(topic, SUB_TOPIC) == 0) {//判断接收主题(ledctrl)数据——>控制led灯->推送led状态信息
  27.                 if (strcmp(data, "on") == 0) {
  28.                     gpio_set_level(OUTPUT_PIN, 0);
  29.                     //推送led状态信息到主题ledstate
  30.                     esp_mqtt_client_publish(mqtt_client, PUB_TOPIC3, "led设备已开启", 0, 0, 0);//同时推送状态到主题ledstate
  31.                 } else if (strcmp(data, "off") == 0) {
  32.                     gpio_set_level(OUTPUT_PIN, 1);
  33.                     esp_mqtt_client_publish(mqtt_client, PUB_TOPIC3, "led设备已关闭", 0, 0, 0);
  34.                 }
  35.             }
  36.             break;
  37.         }
  38.         default:
  39.             break;
  40.     }
  41. }
复制代码
运行逻辑
MQTT成功连接(MQTT事件)->订阅主题
MQTT接收数据事件->判断接收内容
MQTT订阅函数

esp_mqtt_client_subscribe(mqtt_client, SUB_TOPIC, 0); // 订阅主题SUB_TOPIC(ledctrl)


  • esp_mqtt_client_subscribe:这是ESP-IDF提供的函数,用于订阅MQTT主题。
  • mqtt_client:这是MQTT客户端的句柄,通常是在初始化MQTT客户端时创建的。
  • SUB_TOPIC:这是您要订阅的主题名称,通常是一个字符串。在这个例子中,主题名称是ledctrl,表示用于控制LED灯的主题。
  • 0:这是QoS(Quality of Service)等级,表示服务质量。MQTT协议定义了三种QoS等级:

    • 0:最多一次("fire-and-forget")
    • 1:至少一次
    • 2:只有一次

终端现象分析

成功连接WIFI

4.png

成功连接MQTT服务器,并推送主题+内容

5.png

接受到MQTT服务器订阅的主题内容

6.png

完整代码:
  1. #include <string.h>
  2. #include "freertos/FreeRTOS.h"
  3. #include "freertos/task.h"
  4. #include "esp_system.h"
  5. #include "esp_log.h"
  6. #include "esp_event.h"
  7. #include "esp_netif.h"
  8. #include "nvs_flash.h"
  9. #include "protocol_examples_common.h"
  10. #include "mqtt_client.h"
  11. #include "driver/gpio.h"
  12. #include "esp_timer.h"
  13. #include "esp_wifi.h"
  14. // 用户配置参数
  15. #define WIFI_SSID      "jianzhiji02"//wifi名称
  16. #define WIFI_PASS      "8765432111"//wifi密码
  17. #define MQTT_CLIENT_ID ""//巴法云平台秘钥
  18. #define SUB_TOPIC      "ledctrl"//订阅主题(控制核心板led)
  19. #define PUB_TOPIC      "bafa"//推送主题1(测试推送字符)
  20. #define PUB_TOPIC2     "Time"//推送主题2(测试推送数字)
  21. #define PUB_TOPIC3     "ledstate"//推送主题2(推送led状态)
  22. // #define BROKER_URI     "mqtt://bemfa.com:9501"//服务器URL(需DNS解析)
  23. #define BROKER_URI     "mqtt://119.91.109.180:9501"//服务器ip地址+端口
  24. static const char *TAG = "MQTT_DEMO";
  25. static esp_mqtt_client_handle_t mqtt_client = NULL;//mqtt句柄
  26. static esp_timer_handle_t status_timer;//状态定时器句柄
  27. static bool mqtt_connected = false;//mqtt连接状态
  28. #define OUTPUT_PIN     GPIO_NUM_48//led 引脚(低电平点亮)
  29. // GPIO初始化
  30. static void init_gpio(void) {
  31.     gpio_config_t io_conf = {
  32.         .pin_bit_mask = (1ULL << OUTPUT_PIN),
  33.         .mode = GPIO_MODE_OUTPUT,
  34.         .pull_up_en = GPIO_PULLUP_DISABLE,
  35.         .pull_down_en = GPIO_PULLDOWN_DISABLE,
  36.         .intr_type = GPIO_INTR_DISABLE
  37.     };
  38.     gpio_config(&io_conf);
  39.     gpio_set_level(OUTPUT_PIN, 1);
  40. }
  41. // 定义静态变量 // 缓冲区用于存储转换后的字符串
  42. static int num = 0;static char num_str[16];
  43. // 定时器回调函数(1s周期)
  44. //----------------MQTT推送-----------------------------------------------------------------//
  45. static void publish_status(void *arg) {
  46.     if (mqtt_connected) {
  47.         const char *status = "putting";//推送主题
  48.         esp_mqtt_client_publish(mqtt_client, PUB_TOPIC, status, strlen(status), 0, 0);
  49.         ESP_LOGI(TAG, "Published status: %s", status);
  50.    
  51.         num+=3;//推送运行时间
  52.         snprintf(num_str, sizeof(num_str), "%d", num); // 将整数转为字符串
  53.         esp_mqtt_client_publish(mqtt_client, PUB_TOPIC2, num_str, strlen(num_str), 0, 0);
  54.         ESP_LOGI(TAG, "num: %d", num);  // 正确打印整型
  55.     }
  56. }
  57. // MQTT事件处理
  58. static void mqtt_event_handler(void *handler_args, esp_event_base_t base,
  59.                               int32_t event_id, void *event_data) {
  60.     esp_mqtt_event_handle_t event = event_data;
  61.     switch (event->event_id) {
  62.         case MQTT_EVENT_CONNECTED://<MQTT连接事件>
  63.             ESP_LOGI(TAG, "MQTT Connected");
  64.             mqtt_connected = true;
  65.             //----------------MQTT订阅-----------------------------------------------------------------//
  66.             esp_mqtt_client_subscribe(mqtt_client, SUB_TOPIC, 0);//订阅主题ledctrl
  67.             // 创建状态定时器(3秒周期)
  68.             esp_timer_create_args_t timer_cfg = {
  69.                 .callback = &publish_status,
  70.                 .name = "status_timer"
  71.             };
  72.             esp_timer_create(&timer_cfg, &status_timer);
  73.             esp_timer_start_periodic(status_timer, 3000000);
  74.             break;
  75.         case MQTT_EVENT_DISCONNECTED:
  76.             ESP_LOGI(TAG, "MQTT Disconnected");
  77.             mqtt_connected = false;
  78.             esp_timer_stop(status_timer);
  79.             break;
  80.         case MQTT_EVENT_DATA: {//<MQTT接收数据事件>
  81.             // 处理接收数据
  82.             //----------------判断接收主题-----------------------------------------------------------------//
  83.             char topic[event->topic_len + 1];
  84.             memcpy(topic, event->topic, event->topic_len);
  85.             topic[event->topic_len] = '\0';
  86.             //----------------判断接收数据-----------------------------------------------------------------//
  87.             char data[event->data_len + 1];
  88.             memcpy(data, event->data, event->data_len);
  89.             data[event->data_len] = '\0';
  90.             //----------------打印接收主题+数据-----------------------------------------------------------------//
  91.             ESP_LOGI(TAG, "Received: Topic=%s, Data=%s", topic, data);
  92.             if (strcmp(topic, SUB_TOPIC) == 0) {//判断接收主题(ledctrl)数据——>控制led灯->推送led状态信息
  93.                 if (strcmp(data, "on") == 0) {
  94.                     gpio_set_level(OUTPUT_PIN, 0);
  95.                     //推送led状态信息到主题ledstate
  96.                     esp_mqtt_client_publish(mqtt_client, PUB_TOPIC3, "led设备已开启", 0, 0, 0);//同时推送状态到主题ledstate
  97.                 } else if (strcmp(data, "off") == 0) {
  98.                     gpio_set_level(OUTPUT_PIN, 1);
  99.                     esp_mqtt_client_publish(mqtt_client, PUB_TOPIC3, "led设备已关闭", 0, 0, 0);
  100.                 }
  101.             }
  102.             break;
  103.         }
  104.         case MQTT_EVENT_ERROR:
  105.             ESP_LOGE(TAG, "MQTT Error");
  106.             break;
  107.         default:
  108.             break;
  109.     }
  110. }
  111. // WiFi事件处理
  112. static void wifi_event_handler(void *arg, esp_event_base_t event_base,
  113.                               int32_t event_id, void *event_data) {
  114.     if (event_id == IP_EVENT_STA_GOT_IP) {
  115.         // WiFi连接成功后启动MQTT
  116.         esp_mqtt_client_config_t mqtt_cfg = {
  117.             .broker.address.uri = BROKER_URI,
  118.             .credentials.client_id = MQTT_CLIENT_ID
  119.         };
  120.         mqtt_client = esp_mqtt_client_init(&mqtt_cfg);
  121.         esp_mqtt_client_register_event(mqtt_client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL);
  122.         esp_mqtt_client_start(mqtt_client);
  123.     }
  124. }
  125. void app_main(void) {
  126.     // 初始化NVS(堆栈存储wifi连接数据等。。。)
  127.     esp_err_t ret = nvs_flash_init();
  128.     if (ret == ESP_ERR_NVS_NO_FREE_PAGES || ret == ESP_ERR_NVS_NEW_VERSION_FOUND) {
  129.         ESP_ERROR_CHECK(nvs_flash_erase());
  130.         ret = nvs_flash_init();
  131.     }
  132.     ESP_ERROR_CHECK(ret);
  133.     // 初始化网络接口
  134.     ESP_ERROR_CHECK(esp_netif_init());
  135.     ESP_ERROR_CHECK(esp_event_loop_create_default());
  136.    
  137.     // 配置WiFi
  138.     esp_netif_create_default_wifi_sta();
  139.     wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
  140.     ESP_ERROR_CHECK(esp_wifi_init(&cfg));
  141.    
  142.     // 注册WiFi事件
  143.     esp_event_handler_instance_t instance_any_id;
  144.     ESP_ERROR_CHECK(esp_event_handler_instance_register(IP_EVENT,
  145.                                                         IP_EVENT_STA_GOT_IP,
  146.                                                         &wifi_event_handler,
  147.                                                         NULL,
  148.                                                         &instance_any_id));
  149.     // 设置WiFi参数
  150.     wifi_config_t wifi_config = {
  151.         .sta = {
  152.             .ssid = WIFI_SSID,
  153.             .password = WIFI_PASS,
  154.         },
  155.     };
  156.     ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA));//设置为STA模式
  157.     ESP_ERROR_CHECK(esp_wifi_set_config(WIFI_IF_STA, &wifi_config));//设置WiFi参数
  158.     ESP_ERROR_CHECK(esp_wifi_start());//启动WiFi
  159.     ESP_ERROR_CHECK(esp_wifi_connect());//连接WiFi
  160.     // 初始化GPIO(led)
  161.     init_gpio();
  162.     // 保持主任务运行
  163.     while (1) {
  164.         vTaskDelay(pdMS_TO_TICKS(1000));
  165.     }
  166. }
复制代码
如果文章对你有所帮助,可以帮我点一下左下角推荐该文,万分感谢
博主目前在广州,深圳找实习。
如有大佬(HR,BOSS)能推荐实习,恳请私信小弟,给个机会,帮帮小弟,万分感谢!!!

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册