简介
本文主要介绍如何使用 ESP8266 连接MQTT服务器,利用 PubSubClient
库,实现客户端与 MQTT 服务器的连接、订阅、收发消息等功能。服务器使用EMQX为例。部分代码使用 PubSubClient
库自带的示例。
连接到MQTT服务器分为TCP方式连接和TLS/SSL方式连接,使用EMQX的公共服务器的话是TCP方式连接,使用EMQX的私有服务器为TLS/SSL方式连接。TCP和TLS/SSL连接的区别在于定义espClient的方式不同,连接服务器的端口不同,以及TLS/SSL需要填写服务器指纹,并使用espClient.setFingerprint(fingerprint);
设置指纹
EMQX私有服务器链接,使用说明,私有服务器有一定的免费额度,每月1百万连接分钟数(大约23个设备持续在线连接1个月)和1G流量。
首先先在Arduino IDE的 管理库 中添加 PubSubClient
库
常用函数
WiFiClient espClient; //TCP连接
WiFiClientSecure espClient; //TLS/SSL连接
PubSubClient client(espClient); //定义client
espClient.setFingerprint(fingerprint); //TLS/SSL连接,设置指纹
client.setServer(mqtt_server, mqtt_port); //设置服务器地址和端口
client.setCallback(callback); //设置接收消息的函数
client.connect(clientId.c_str()); //连接服务器,不使用用户名和密码
client.connect(clientId.c_str() , mqtt_name , mqtt_password); //连接服务器,使用用户名和密码
client.connected(); //返回是否连接成功,0为失败,1为成功
client.state(); //返回连接状态码
client.publish(topic, "connected"); //向主题发布消息,可以不用订阅主题
client.subscribe(topic); //订阅主题,用于接收消息,qos默认为0
client.subscribe(topic,qos); //订阅主题,用于接收消息,并填写qos
client.unsubscribe(topic); //取消订阅
client.loop();
头文件
#include <ESP8266WiFi.h>
#include <PubSubClient.h>
定义基本信息
const char* ssid = ""; //WiFi名
const char* password = ""; //WiFi密码
const char* mqtt_server = ""; //MQTT服务器地址
const int mqtt_port = 1883; //服务器连接端口
const char* mqtt_name = ""; //MQTT连接用户名(可选)
const char* mqtt_password = ""; //MQTT连接密码(可选)
const char* topic=""; //订阅、发布的主题
MQTT连接用户名和密码都是可选的,具体要根据MQTT服务器。一般来说,使用TCP连接服务器,端口为1883;使用TLS/SSL连接服务器,端口为8883。
如果使用 EMQX的公共服务器,使用TCP连接,服务器地址为broker.emqx.io
,端口为1883,连接用户名为emqx
,密码为public
,也可以不填。
如果使用私有服务器,使用TLS/SSL连接,服务器地址根据控制台具体信息,端口为8883,用户名和密码自己在控制台里定义。
定义 espClient
和 client
。如果使用TCP连接,则按以下定义:
WiFiClient espClient;
PubSubClient client(espClient);
如果使用TLS/SSL连接,则按以下连接:
const char* fingerprint = ""; //服务器指纹
WiFiClientSecure espClient;
PubSubClient client(espClient);
根据官方文档 “EMQX 服务器的常见指纹,仅供参考。如果你没有使用 Serverless 部署或公共 broker、 你需要计算出你的服务器证书的 sha1 指纹并更新’fingerprint’变量。”
服务器地址为 broker.emqx.io
时:
const char* fingerprint = "B6 C6 FF 82 C6 59 09 BB D6 39 80 7F E7 BC 10 C9 19 C8 21 8E";
服务器地址为 *.emqxsl.com
时:
const char* fingerprint = "42:AE:D8:A3:42:F1:C4:1F:CD:64:9C:D7:4B:A1:EE:5B:5E:D7:E2:B5";
服务器地址为 *.emqxsl.cn
时:
const char* fingerprint = "7E:52:D3:84:48:3C:5A:9F:A4:39:9A:8B:27:01:B1:F8:C6:AD:D4:47";
setup
函数
如果TLS/SSL连接,则需要使用 Client.setFingerprint(fingerprint);
设置指纹
void setup() {
Serial.begin(115200, SERIAL_8N1, SERIAL_TX_ONLY);
pinMode(BUILTIN_LED, OUTPUT);
Serial.begin(115200);
setup_wifi(); //连接WiFi
//espClient.setFingerprint(fingerprint);
client.setServer(mqtt_server, mqtt_port); //设置服务器地址和端口
client.setCallback(callback); //设置接收消息的函数
}
建立WiFi连接
void setup_wifi() {
digitalWrite(BUILTIN_LED, LOW);
delay(10);
// We start by connecting to a WiFi network
Serial.println();
Serial.print("Connecting to ");
Serial.println(ssid);
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
randomSeed(micros());
Serial.println("");
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
digitalWrite(BUILTIN_LED, HIGH);
}
建立MQTT连接
建立MQTT连接以及断开后的重连,这里需要定义设备名, 设备名可以自己定义,也可以使用XX-随机数,XX-IP地址等,例如ESP8266-1898,ESP8266-123.456.7.891
如果存在相同的设备ID,那么所有拥有相同设备ID的设备将会不断的断开重连。所以建议在设备ID后添加上IP地址。
前面提过,连接用户名和密码可填可不填,如果使用用户名和密码,则使用 client.connect(clientId.c_str() , mqtt_name , mqtt_password)
来连接MQTT服务器,如果不使用用户名和密码,则使用 client.connect(clientId.c_str())
来连接。
void reconnect() {
//循环直到回连成功
while (!client.connected())
{
Serial.print("Attempting MQTT connection...");
String clientId = "ESP8266-"; //设备名
// clientId += String(random(0xffff), HEX); 使用XX-随机数
clientId += WiFi.localIP().toString().c_str(); //XX-IP地址
// 尝试连接
if (client.connect(clientId.c_str() , mqtt_name , mqtt_password)) //client.connect(clientId.c_str())
{
Serial.println("connected"); //连接成功
client.publish(topic, "connected"); //连接成功后,向主题发布消息,也可以不发布
client.subscribe(topic); //订阅主题
}
else
{
Serial.print("failed, rc="); //连接失败,并打印状态码
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000); //5秒后再次尝试连接
}
}
}
PubSubClient
库中 PubSubClient.h
对于状态码的定义
// Possible values for client.state()
#define MQTT_CONNECTION_TIMEOUT -4
#define MQTT_CONNECTION_LOST -3
#define MQTT_CONNECT_FAILED -2
#define MQTT_DISCONNECTED -1
#define MQTT_CONNECTED 0
#define MQTT_CONNECT_BAD_PROTOCOL 1
#define MQTT_CONNECT_BAD_CLIENT_ID 2
#define MQTT_CONNECT_UNAVAILABLE 3
#define MQTT_CONNECT_BAD_CREDENTIALS 4
#define MQTT_CONNECT_UNAUTHORIZED 5
接收订阅的消息
void callback(char* topic, byte* payload, unsigned int length) //主题,消息,消息长度
{
Serial.print("Message arrived [");
Serial.print(topic); //打印主题
Serial.print("] ");
char msg[length];
for (int i = 0; i < length; i++)
msg[i]=(char)payload[i]; //将消息转存到msg中
snprintf (msg, length, msg);
Serial.println(msg); //打印消息
if (!strcmp(msg, "on")) digitalWrite(BUILTIN_LED, LOW); //如果接收到on,则将esp8266板载的led点亮,注意,将BUILTIN_LED置为低时,LED会被点亮
if (!strcmp(msg, "off")) digitalWrite(BUILTIN_LED, HIGH); //如果接收到on,则将led熄灭,注意,将BUILTIN_LED置为高时,LED会被熄灭
}
loop函数
void loop()
{
if (!client.connected())
//如果断连,则进行重连
{
reconnect();
}
client.loop();
unsigned long now = millis();
if (now - lastMsg > 2000) {
lastMsg = now;
++value;
snprintf (msg, MSG_BUFFER_SIZE, "hello world #%ld", value);
Serial.print("Publish message: ");
Serial.println(msg);
client.publish(topic, msg);
//每2秒发送一次消息
}
}
完整代码示例
#include <ESP8266WiFi.h>
#include <PubSubClient.h>
const char* ssid = ""; //WiFi名
const char* password = ""; //WiFi密码
const char* mqtt_server = "broker.emqx.io"; //MQTT服务器地址
const int mqtt_port = 1883; //服务器连接端口
const char* mqtt_name = "emqx"; //MQTT连接用户名(可选)
const char* mqtt_password = "public"; //MQTT连接密码(可选)
const char* topic="testtopic"; //订阅、发布的主题
//const char* fingerprint = "";//服务器指纹
//WiFiClientSecure espClient;
WiFiClient espClient;
PubSubClient client(espClient);
unsigned long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char msg[MSG_BUFFER_SIZE];
int value = 0;
void setup() {
Serial.begin(115200, SERIAL_8N1, SERIAL_TX_ONLY);
pinMode(BUILTIN_LED, OUTPUT);
Serial.begin(115200);
setup_wifi(); //连接WiFi
//espClient.setFingerprint(fingerprint);
client.setServer(mqtt_server, mqtt_port); //设置服务器地址和端口
client.setCallback(callback); //设置接收消息的函数
}
void setup_wifi() {
digitalWrite(BUILTIN_LED, LOW);
delay(10);
// We start by connecting to a WiFi network
Serial.println();
Serial.print("Connecting to ");
Serial.println(ssid);
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
randomSeed(micros());
Serial.println("");
Serial.println("WiFi connected");
Serial.println("IP address: ");
Serial.println(WiFi.localIP());
digitalWrite(BUILTIN_LED, HIGH);
}
void reconnect() {
//循环直到回连成功
while (!client.connected())
{
Serial.print("Attempting MQTT connection...");
String clientId = "ESP8266-"; //设备名
// clientId += String(random(0xffff), HEX); 使用XX-随机数
clientId += WiFi.localIP().toString().c_str(); //XX-IP地址
// 尝试连接
if (client.connect(clientId.c_str() , mqtt_name , mqtt_password)) //client.connect(clientId.c_str())
{
Serial.println("connected"); //连接成功
client.publish(topic, "connected"); //连接成功后,向主题发布消息,也可以不发布
client.subscribe(topic); //订阅主题
}
else
{
Serial.print("failed, rc="); //连接失败,并打印状态码
Serial.print(client.state());
Serial.println(" try again in 5 seconds");
delay(5000); //5秒后再次尝试连接
}
}
}
void callback(char* topic, byte* payload, unsigned int length) //主题,消息,消息长度
{
Serial.print("Message arrived [");
Serial.print(topic); //打印主题
Serial.print("] ");
char msg[length];
for (int i = 0; i < length; i++)
msg[i]=(char)payload[i]; //将消息转存到msg中
snprintf (msg, length, msg);
Serial.println(msg); //打印消息
if (!strcmp(msg, "on")) digitalWrite(BUILTIN_LED, LOW); //如果接收到on,则将esp8266板载的led点亮,注意,将BUILTIN_LED置为低时,LED会被点亮
if (!strcmp(msg, "off")) digitalWrite(BUILTIN_LED, HIGH); //如果接收到on,则将led熄灭,注意,将BUILTIN_LED置为高时,LED会被熄灭
}
void loop()
{
if (!client.connected()) //如果断连,则进行重连
{
reconnect();
}
client.loop();
unsigned long now = millis();
if (now - lastMsg > 2000) {
lastMsg = now;
++value;
snprintf (msg, MSG_BUFFER_SIZE, "hello world #%ld", value);
Serial.print("Publish message: ");
Serial.println(msg);
client.publish(topic, msg); //每2秒发送一次消息
}
}