From c5c9e933d4052b9a1df7b7959b8ca3fa1a2d9e26 Mon Sep 17 00:00:00 2001 From: Klaus K Wilting Date: Sun, 17 May 2020 23:07:38 +0200 Subject: [PATCH] MQTT client (experimental) --- include/mqttclient.h | 8 +++-- src/mqttclient.cpp | 80 +++++++++++++++++++++++++++----------------- 2 files changed, 55 insertions(+), 33 deletions(-) diff --git a/include/mqttclient.h b/include/mqttclient.h index 28013301..4e366aba 100644 --- a/include/mqttclient.h +++ b/include/mqttclient.h @@ -6,16 +6,18 @@ #include #include -#define MQTT_NAME "paxcounter" -#define MQTT_INTOPIC "rcommand" +#define MQTT_CLIENT "paxcounter" +#define MQTT_INTOPIC "pax_IN" +#define MQTT_OUTTOPIC "pax_OUT" #define MQTT_PORT 1883 +#define MQTT_SERVER "broker.hivemq.com" extern TaskHandle_t mqttTask; void mqtt_enqueuedata(MessageBuffer_t *message); void mqtt_queuereset(void); void mqtt_client_task(void *param); -void mqtt_connect(IPAddress mqtt_host, uint16_t mqtt_port); +int mqtt_connect(IPAddress mqtt_host, uint16_t mqtt_port); void mqtt_callback(char *topic, byte *payload, unsigned int length); void WiFiEvent(WiFiEvent_t event); esp_err_t mqtt_init(void); diff --git a/src/mqttclient.cpp b/src/mqttclient.cpp index 7b521658..a113b7f5 100644 --- a/src/mqttclient.cpp +++ b/src/mqttclient.cpp @@ -4,58 +4,81 @@ static const char TAG[] = __FILE__; -IPAddress mqtt_server_ip(192, 168, 11, 57); +IPAddress mqtt_server_ip; QueueHandle_t MQTTSendQueue; TaskHandle_t mqttTask; WiFiClient ipClient; -PubSubClient client(ipClient); +PubSubClient mqttClient(ipClient); void WiFiEvent(WiFiEvent_t event) { switch (event) { case SYSTEM_EVENT_ETH_START: - ESP_LOGI(TAG, "ETH Started"); - ETH.setHostname(MQTT_NAME); + ESP_LOGI(TAG, "Ethernet link layer started"); + ETH.setHostname(MQTT_CLIENT); break; case SYSTEM_EVENT_ETH_CONNECTED: - ESP_LOGI(TAG, "ETH Connected"); + ESP_LOGI(TAG, "Network link connected"); break; case SYSTEM_EVENT_ETH_GOT_IP: - ESP_LOGI(TAG, "ETH MAC: %s", ETH.macAddress()); - ESP_LOGI(TAG, "IPv4: %s", ETH.localIP()); + ESP_LOGI(TAG, "ETH MAC: %s", ETH.macAddress().c_str()); + ESP_LOGI(TAG, "IPv4: %s", ETH.localIP().toString().c_str()); ESP_LOGI(TAG, "Link Speed %d Mbps %s", ETH.linkSpeed(), ETH.fullDuplex() ? "full duplex" : "half duplex"); mqtt_connect(mqtt_server_ip, MQTT_PORT); break; case SYSTEM_EVENT_ETH_DISCONNECTED: - ESP_LOGI(TAG, "ETH Disconnected"); + ESP_LOGI(TAG, "Network link disconnected"); break; case SYSTEM_EVENT_ETH_STOP: - ESP_LOGI(TAG, "ETH Stopped"); + ESP_LOGI(TAG, "Ethernet link layer stopped"); break; default: break; } } -void mqtt_connect(IPAddress mqtt_host, uint16_t mqtt_port) { +int mqtt_connect(IPAddress mqtt_host, uint16_t mqtt_port) { + // resolve server + if (WiFi.hostByName(MQTT_SERVER, mqtt_server_ip)) { + ESP_LOGI(TAG, "Attempting to connect to %s [%s]", MQTT_SERVER, + mqtt_server_ip.toString().c_str()); + } else { + ESP_LOGI(TAG, "Could not resolve %s", MQTT_SERVER); + return -1; + } + // attempt to connect to MQTT server - if (ipClient.connect(mqtt_server_ip, MQTT_PORT)) { - if (client.connect(MQTT_NAME)) { - ESP_LOGW(TAG, "MQTT server connected, subscribing"); - client.subscribe(MQTT_INTOPIC); + if (ipClient.connect(mqtt_host, mqtt_port)) { + + mqttClient.setServer(mqtt_server_ip, MQTT_PORT); + mqttClient.setCallback(mqtt_callback); + + String clientId = "Paxcounter-"; + clientId += String(random(0xffff), HEX); + + if (mqttClient.connect(clientId.c_str())) { + ESP_LOGI(TAG, "MQTT server connected, subscribing..."); + mqttClient.publish(MQTT_OUTTOPIC, "hello world"); + mqttClient.subscribe(MQTT_INTOPIC); + ESP_LOGI(TAG, "MQTT topic subscribed"); } else { ESP_LOGW(TAG, "MQTT server not responding, retrying later"); + return -1; } - } else + } else { ESP_LOGW(TAG, "MQTT server not connected, retrying later"); + return -1; + } } void mqtt_client_task(void *param) { + + MessageBuffer_t msg; + char cPort[4], cMsg[PAYLOAD_BUFFER_SIZE + 1]; + while (1) { - MessageBuffer_t msg; - char cPort[4], cMsg[PAYLOAD_BUFFER_SIZE + 1]; // fetch next or wait for payload to send from queue if (xQueueReceive(MQTTSendQueue, &msg, portMAX_DELAY) != pdTRUE) { @@ -63,12 +86,12 @@ void mqtt_client_task(void *param) { continue; } - // send data - if (client.connected()) { - snprintf(cPort, sizeof(cPort), "%d", msg.MessagePort); + // send data to mqtt server + if (mqttClient.connected()) { + snprintf(cPort, sizeof(cPort), "Port_%d", msg.MessagePort); snprintf(cMsg, sizeof(cMsg), "%s", msg.Message); - client.publish(cPort, cMsg); - client.loop(); + mqttClient.publish(cPort, cMsg); + mqttClient.loop(); ESP_LOGI(TAG, "%d byte(s) sent to MQTT", msg.MessageSize); } else { mqtt_enqueuedata(&msg); // re-enqueue the undelivered message @@ -76,7 +99,7 @@ void mqtt_client_task(void *param) { // attempt to reconnect to MQTT server mqtt_connect(mqtt_server_ip, MQTT_PORT); } - } + } // while(1) } esp_err_t mqtt_init(void) { @@ -89,15 +112,12 @@ esp_err_t mqtt_init(void) { ESP_LOGI(TAG, "MQTT send queue created, size %d Bytes", SEND_QUEUE_SIZE * PAYLOAD_BUFFER_SIZE); - WiFi.onEvent(WiFiEvent); - assert(ETH.begin()); - - client.setServer(mqtt_server_ip, MQTT_PORT); - client.setCallback(mqtt_callback); - ESP_LOGI(TAG, "Starting MQTTloop..."); xTaskCreate(mqtt_client_task, "mqttloop", 4096, (void *)NULL, 2, &mqttTask); + WiFi.onEvent(WiFiEvent); + ETH.begin(); + return ESP_OK; } @@ -127,7 +147,7 @@ void mqtt_enqueuedata(MessageBuffer_t *message) { void mqtt_queuereset(void) { xQueueReset(MQTTSendQueue); } void mqtt_callback(char *topic, byte *payload, unsigned int length) { - if ((length) && (topic == MQTT_INTOPIC)) + if ((length >= 1) && (topic == MQTT_INTOPIC)) rcommand(payload, length); }