ESP32 MQTT对接巴法云平台
MQTT(Message Queuing Telemetry Transport)是一种轻量级的 发布/订阅(Publish/Subscribe) 消息传输协议,专为 低带宽、高延迟、不稳定网络环境 设计,是物联网(IoT)领域的核心通信协议之一。
MQTT 核心特性
特性说明轻量级协议头最小仅 2字节,适合嵌入式设备发布/订阅模型设备不直接通信,通过 Broker(代理服务器) 中转,解耦性强低功耗适合电池供电设备,支持 心跳包(Keep Alive) 维持长连接QoS 质量等级支持 0/1/2 三级消息可靠性保证主题(Topic)分层式消息路由(如 home/living_room/temperature),支持通配符 + 和 #MQTT 与 HTTP 对比
对比维度MQTTHTTP协议开销极低(适合高频小数据)高(Header 冗余)通信模式双向实时推送单向请求-响应连接开销长连接(减少握手延迟)短连接(每次请求需重建连接)适用场景物联网、移动推送、实时监控Web 服务、API 接口MQTT 核心概念
- Broker(代理服务器)
- 核心枢纽,负责消息路由(如 Mosquitto、EMQX、HiveMQ)
- 实现消息存储转发、客户端管理、安全认证
- Topic(主题)
- 消息分类的层级路径(例如:factory/machine1/status)
- 通配符:
- +:单级匹配(home/+/temperature 匹配 home/living_room/temperature)
- #:多级匹配(home/# 匹配 home/living_room/light)
- QoS(服务质量)
- QoS 0:最多一次(尽力交付,可能丢失)
- QoS 1:至少一次(确保送达,可能重复)
- QoS 2:恰好一次(严格保证,无重复)
micro python可行性先行验证
- # 导入必要的库
- from umqtt.simple import MQTTClient # MQTT协议客户端库
- import time # 时间相关函数
- from machine import Timer # 硬件定时器控制
- #################### 用户可修改配置区域 ####################
- wifiName = "jianzhiji02" # WiFi名称(仅支持2.4G网络)
- wifiPassword = "8765432111" # WiFi密码
- clientID = "" # 设备密钥,从巴法云控制台获取
- subTopic = 'OTALX' # 订阅的主题(接收指令)
- pubTopic = 'bafa' # 发布的主题(发送数据)
- #################### 固定配置区域 ########################
- serverIP = "bemfa.com" # MQTT服务器地址
- port = 9501 # MQTT端口号
- ping_interval = 300 # 心跳包间隔(秒)
- ################# WiFi连接函数 ##########################
- def do_connect():
- """连接WiFi网络"""
- import network
- sta_if = network.WLAN(network.STA_IF)
- if not sta_if.isconnected():
- print('正在连接网络...')
- sta_if.active(True)
- sta_if.connect(wifiName, wifiPassword)
- while not sta_if.isconnected():
- pass
- print('WiFi连接成功')
- print('网络配置:', sta_if.ifconfig())
- ################# MQTT消息回调函数 #######################
- def msg_callback(topic, msg):
- """处理接收到的MQTT消息"""
- global client # 声明全局客户端对象
-
- print("[消息到达] 主题:", topic.decode(), "| 载荷:", msg.decode())
-
- # 判断是否是订阅的主题
- if topic.decode() == subTopic:
- # 根据指令执行操作
- if msg == b"on":
- print("执行开机操作")
- # 示例:控制GPIO输出高电平
- # pin.value(1)
-
- # 发送状态确认(推送消息)
- client.publish(pubTopic, "设备已开启")
-
- elif msg == b"off":
- print("执行关机操作")
- # 示例:控制GPIO输出低电平
- # pin.value(0)
-
- # 发送状态确认(推送消息)
- client.publish(pubTopic, "设备已关闭")
- ################ MQTT连接与订阅函数 #####################
- def connect_mqtt():
- """建立MQTT连接并订阅主题"""
- global client # 声明全局客户端对象
-
- try:
- # 创建客户端实例
- client = MQTTClient(clientID, serverIP, port)
- client.set_callback(msg_callback) # 设置回调函数
-
- # 建立连接
- client.connect()
- print(f"成功连接到MQTT服务器 {serverIP}:{port}")
-
- # 订阅主题
- client.subscribe(subTopic)
- print(f"已订阅主题: {subTopic}")
-
- # 设置心跳间隔(可选)
- client.keepalive = ping_interval
-
- return client
-
- except Exception as e:
- print("MQTT连接失败:", e)
- restart_and_reconnect()
- ################ 异常处理函数 ##########################
- def restart_and_reconnect():
- """重启设备并重连"""
- print("10秒后重启设备...")
- time.sleep(10)
- machine.reset()
- ################ 定时推送函数 ##########################
- def timed_publish(timer):
- """定时发布设备状态(示例)"""
- global client
- try:
- # 示例数据(可替换为传感器数据)
- status = "11"
- client.publish(pubTopic, f" {status}")
- print(f"定时推送: {status}")
- except:
- print("定时推送失败")
- ################# 主程序流程 ###########################
- if __name__ == "__main__":
- # 连接WiFi
- do_connect()
-
- # 初始化MQTT连接
- connect_mqtt()
-
- # 设置定时器(每30秒发送心跳)
- timer = Timer(-1) # 创建虚拟定时器
- timer.init(
- period=3000, # 间隔3秒(3000毫秒)
- mode=Timer.PERIODIC,
- callback=timed_publish
- )
- print("定时推送已启用")
-
- # 主循环
- try:
- while True:
- try:
- client.check_msg() # 检查新消息
- time.sleep(1) # 降低CPU占用
-
- except Exception as e:
- print("运行错误:", e)
- restart_and_reconnect()
-
- except KeyboardInterrupt:
- print("程序终止")
- client.disconnect()
- timer.deinit()
复制代码 终端打印
成功连接云平台,成功推送,订阅主题。可行性验证成功。
云平台
巴法云物联网平台_MQTT设备云
推送内容控制led
接受ESP32推送内容
Esp-idf+C语言实现
重要代码段分析
MQTT推送
- //----------------MQTT推送-----------------------------------------------------------------//
- //推送字符
- const char *status = "putting";
- //mqtt句柄,推送目标主题,推送内容,长度,服务质量,发布消息的标志位(通常设置为0)
- esp_mqtt_client_publish(mqtt_client, PUB_TOPIC, status, strlen(status), 0, 0);
- //推送数字
- num+=3;//推送运行时间
- snprintf(num_str, sizeof(num_str), "%d", num); // 将整数转为字符串
- esp_mqtt_client_publish(mqtt_client, PUB_TOPIC2, num_str, strlen(num_str), 0, 0);
复制代码 MQTT推送函数
esp_mqtt_client_publish:这是ESP-IDF提供的函数,用于通过MQTT客户端发布消息。
- mqtt_client:这是MQTT客户端的句柄,通常是在初始化MQTT客户端时创建的。
- PUB_TOPIC:这是您要发布消息的主题名称,通常是一个字符串。
- status:这是您要发布的消息内容,通常是一个字符串。
- strlen(status):这是消息内容的长度,strlen函数用于计算字符串的长度。
:这是QoS(Quality of Service)等级,表示服务质量。MQTT协议定义了三种QoS等级:
- 0:最多一次("fire-and-forget")
- 1:至少一次
- 2:只有一次
- 0:这是发布消息的标志位,通常设置为0。
MQTT订阅(MQTT成功连接)
- // MQTT事件处理
- static void mqtt_event_handler(void *handler_args, esp_event_base_t base,
- int32_t event_id, void *event_data) {
- esp_mqtt_event_handle_t event = event_data;
- switch (event->event_id) {
- case MQTT_EVENT_CONNECTED://<MQTT连接事件>---------->MQTT成功连接则订阅主题
- ESP_LOGI(TAG, "MQTT Connected");
- mqtt_connected = true;
- //----------------MQTT订阅(MQTT成功后)-----------------------------------------
- //mqtt句柄,订阅的主题,服务等级(0)
- esp_mqtt_client_subscribe(mqtt_client, SUB_TOPIC, 0);//订阅主题SUB_TOPIC(ledctrl)
-
-
- case MQTT_EVENT_DATA: {//<MQTT接收数据事件>
- // 处理接收数据
- //----------------判断接收主题-----------------------------------------------------------------//
- char topic[event->topic_len + 1];
- memcpy(topic, event->topic, event->topic_len);
- topic[event->topic_len] = '\0';
- //----------------判断接收数据-----------------------------------------------------------------//
- char data[event->data_len + 1];
- memcpy(data, event->data, event->data_len);
- data[event->data_len] = '\0';
- //----------------打印接收主题+数据-----------------------------------------------------------------//
- ESP_LOGI(TAG, "Received: Topic=%s, Data=%s", topic, data);
- if (strcmp(topic, SUB_TOPIC) == 0) {//判断接收主题(ledctrl)数据——>控制led灯->推送led状态信息
- if (strcmp(data, "on") == 0) {
- gpio_set_level(OUTPUT_PIN, 0);
- //推送led状态信息到主题ledstate
- esp_mqtt_client_publish(mqtt_client, PUB_TOPIC3, "led设备已开启", 0, 0, 0);//同时推送状态到主题ledstate
- } else if (strcmp(data, "off") == 0) {
- gpio_set_level(OUTPUT_PIN, 1);
- esp_mqtt_client_publish(mqtt_client, PUB_TOPIC3, "led设备已关闭", 0, 0, 0);
- }
- }
- break;
- }
- default:
- break;
- }
- }
复制代码 运行逻辑:
MQTT成功连接(MQTT事件)->订阅主题
MQTT接收数据事件->判断接收内容
MQTT订阅函数
esp_mqtt_client_subscribe(mqtt_client, SUB_TOPIC, 0); // 订阅主题SUB_TOPIC(ledctrl)
- esp_mqtt_client_subscribe:这是ESP-IDF提供的函数,用于订阅MQTT主题。
- mqtt_client:这是MQTT客户端的句柄,通常是在初始化MQTT客户端时创建的。
- SUB_TOPIC:这是您要订阅的主题名称,通常是一个字符串。在这个例子中,主题名称是ledctrl,表示用于控制LED灯的主题。
- 0:这是QoS(Quality of Service)等级,表示服务质量。MQTT协议定义了三种QoS等级:
- 0:最多一次("fire-and-forget")
- 1:至少一次
- 2:只有一次
终端现象分析
成功连接WIFI
成功连接MQTT服务器,并推送主题+内容
接受到MQTT服务器订阅的主题内容
完整代码:
- #include <string.h>
- #include "freertos/FreeRTOS.h"
- #include "freertos/task.h"
- #include "esp_system.h"
- #include "esp_log.h"
- #include "esp_event.h"
- #include "esp_netif.h"
- #include "nvs_flash.h"
- #include "protocol_examples_common.h"
- #include "mqtt_client.h"
- #include "driver/gpio.h"
- #include "esp_timer.h"
- #include "esp_wifi.h"
- // 用户配置参数
- #define WIFI_SSID "jianzhiji02"//wifi名称
- #define WIFI_PASS "8765432111"//wifi密码
- #define MQTT_CLIENT_ID ""//巴法云平台秘钥
- #define SUB_TOPIC "ledctrl"//订阅主题(控制核心板led)
- #define PUB_TOPIC "bafa"//推送主题1(测试推送字符)
- #define PUB_TOPIC2 "Time"//推送主题2(测试推送数字)
- #define PUB_TOPIC3 "ledstate"//推送主题2(推送led状态)
- // #define BROKER_URI "mqtt://bemfa.com:9501"//服务器URL(需DNS解析)
- #define BROKER_URI "mqtt://119.91.109.180:9501"//服务器ip地址+端口
- static const char *TAG = "MQTT_DEMO";
- static esp_mqtt_client_handle_t mqtt_client = NULL;//mqtt句柄
- static esp_timer_handle_t status_timer;//状态定时器句柄
- static bool mqtt_connected = false;//mqtt连接状态
- #define OUTPUT_PIN GPIO_NUM_48//led 引脚(低电平点亮)
- // GPIO初始化
- static void init_gpio(void) {
- gpio_config_t io_conf = {
- .pin_bit_mask = (1ULL << OUTPUT_PIN),
- .mode = GPIO_MODE_OUTPUT,
- .pull_up_en = GPIO_PULLUP_DISABLE,
- .pull_down_en = GPIO_PULLDOWN_DISABLE,
- .intr_type = GPIO_INTR_DISABLE
- };
- gpio_config(&io_conf);
- gpio_set_level(OUTPUT_PIN, 1);
- }
- // 定义静态变量 // 缓冲区用于存储转换后的字符串
- static int num = 0;static char num_str[16];
- // 定时器回调函数(1s周期)
- //----------------MQTT推送-----------------------------------------------------------------//
- static void publish_status(void *arg) {
- if (mqtt_connected) {
- const char *status = "putting";//推送主题
- esp_mqtt_client_publish(mqtt_client, PUB_TOPIC, status, strlen(status), 0, 0);
- ESP_LOGI(TAG, "Published status: %s", status);
-
- num+=3;//推送运行时间
- snprintf(num_str, sizeof(num_str), "%d", num); // 将整数转为字符串
- esp_mqtt_client_publish(mqtt_client, PUB_TOPIC2, num_str, strlen(num_str), 0, 0);
- ESP_LOGI(TAG, "num: %d", num); // 正确打印整型
- }
- }
- // MQTT事件处理
- static void mqtt_event_handler(void *handler_args, esp_event_base_t base,
- int32_t event_id, void *event_data) {
- esp_mqtt_event_handle_t event = event_data;
- switch (event->event_id) {
- case MQTT_EVENT_CONNECTED://<MQTT连接事件>
- ESP_LOGI(TAG, "MQTT Connected");
- mqtt_connected = true;
- //----------------MQTT订阅-----------------------------------------------------------------//
- esp_mqtt_client_subscribe(mqtt_client, SUB_TOPIC, 0);//订阅主题ledctrl
- // 创建状态定时器(3秒周期)
- esp_timer_create_args_t timer_cfg = {
- .callback = &publish_status,
- .name = "status_timer"
- };
- esp_timer_create(&timer_cfg, &status_timer);
- esp_timer_start_periodic(status_timer, 3000000);
- break;
- case MQTT_EVENT_DISCONNECTED:
- ESP_LOGI(TAG, "MQTT Disconnected");
- mqtt_connected = false;
- esp_timer_stop(status_timer);
- break;
- case MQTT_EVENT_DATA: {//<MQTT接收数据事件>
- // 处理接收数据
- //----------------判断接收主题-----------------------------------------------------------------//
- char topic[event->topic_len + 1];
- memcpy(topic, event->topic, event->topic_len);
- topic[event->topic_len] = '\0';
- //----------------判断接收数据-----------------------------------------------------------------//
- char data[event->data_len + 1];
- memcpy(data, event->data, event->data_len);
- data[event->data_len] = '\0';
- //----------------打印接收主题+数据-----------------------------------------------------------------//
- ESP_LOGI(TAG, "Received: Topic=%s, Data=%s", topic, data);
- if (strcmp(topic, SUB_TOPIC) == 0) {//判断接收主题(ledctrl)数据——>控制led灯->推送led状态信息
- if (strcmp(data, "on") == 0) {
- gpio_set_level(OUTPUT_PIN, 0);
- //推送led状态信息到主题ledstate
- esp_mqtt_client_publish(mqtt_client, PUB_TOPIC3, "led设备已开启", 0, 0, 0);//同时推送状态到主题ledstate
- } else if (strcmp(data, "off") == 0) {
- gpio_set_level(OUTPUT_PIN, 1);
- esp_mqtt_client_publish(mqtt_client, PUB_TOPIC3, "led设备已关闭", 0, 0, 0);
- }
- }
- break;
- }
- case MQTT_EVENT_ERROR:
- ESP_LOGE(TAG, "MQTT Error");
- break;
- default:
- break;
- }
- }
- // WiFi事件处理
- static void wifi_event_handler(void *arg, esp_event_base_t event_base,
- int32_t event_id, void *event_data) {
- if (event_id == IP_EVENT_STA_GOT_IP) {
- // WiFi连接成功后启动MQTT
- esp_mqtt_client_config_t mqtt_cfg = {
- .broker.address.uri = BROKER_URI,
- .credentials.client_id = MQTT_CLIENT_ID
- };
- mqtt_client = esp_mqtt_client_init(&mqtt_cfg);
- esp_mqtt_client_register_event(mqtt_client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL);
- esp_mqtt_client_start(mqtt_client);
- }
- }
- void app_main(void) {
- // 初始化NVS(堆栈存储wifi连接数据等。。。)
- esp_err_t ret = nvs_flash_init();
- if (ret == ESP_ERR_NVS_NO_FREE_PAGES || ret == ESP_ERR_NVS_NEW_VERSION_FOUND) {
- ESP_ERROR_CHECK(nvs_flash_erase());
- ret = nvs_flash_init();
- }
- ESP_ERROR_CHECK(ret);
- // 初始化网络接口
- ESP_ERROR_CHECK(esp_netif_init());
- ESP_ERROR_CHECK(esp_event_loop_create_default());
-
- // 配置WiFi
- esp_netif_create_default_wifi_sta();
- wifi_init_config_t cfg = WIFI_INIT_CONFIG_DEFAULT();
- ESP_ERROR_CHECK(esp_wifi_init(&cfg));
-
- // 注册WiFi事件
- esp_event_handler_instance_t instance_any_id;
- ESP_ERROR_CHECK(esp_event_handler_instance_register(IP_EVENT,
- IP_EVENT_STA_GOT_IP,
- &wifi_event_handler,
- NULL,
- &instance_any_id));
- // 设置WiFi参数
- wifi_config_t wifi_config = {
- .sta = {
- .ssid = WIFI_SSID,
- .password = WIFI_PASS,
- },
- };
- ESP_ERROR_CHECK(esp_wifi_set_mode(WIFI_MODE_STA));//设置为STA模式
- ESP_ERROR_CHECK(esp_wifi_set_config(WIFI_IF_STA, &wifi_config));//设置WiFi参数
- ESP_ERROR_CHECK(esp_wifi_start());//启动WiFi
- ESP_ERROR_CHECK(esp_wifi_connect());//连接WiFi
- // 初始化GPIO(led)
- init_gpio();
- // 保持主任务运行
- while (1) {
- vTaskDelay(pdMS_TO_TICKS(1000));
- }
- }
复制代码如果文章对你有所帮助,可以帮我点一下左下角推荐该文,万分感谢
博主目前在广州,深圳找实习。
如有大佬(HR,BOSS)能推荐实习,恳请私信小弟,给个机会,帮帮小弟,万分感谢!!!
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |