From a30eab54f79dc2f3368bcbc034d74b3ac352ec8f Mon Sep 17 00:00:00 2001 From: cyberman54 Date: Wed, 23 Dec 2020 16:31:31 +0100 Subject: [PATCH] mqttclient reworked (experimental) --- include/irqhandler.h | 2 -- include/mqttclient.h | 13 +++++-------- src/irqhandler.cpp | 8 -------- src/main.cpp | 1 - src/mqttclient.cpp | 45 ++++++++++++++++++++------------------------ 5 files changed, 25 insertions(+), 44 deletions(-) diff --git a/include/irqhandler.h b/include/irqhandler.h index 712e5e41..958acc4e 100644 --- a/include/irqhandler.h +++ b/include/irqhandler.h @@ -11,7 +11,6 @@ #define BME_IRQ 0x080 #define MATRIX_DISPLAY_IRQ 0x100 #define PMU_IRQ 0x200 -#define MQTT_IRQ 0x400 #include "globals.h" #include "button.h" @@ -21,7 +20,6 @@ #include "bmesensor.h" #include "power.h" #include "ledmatrixdisplay.h" -#include "mqttclient.h" void irqHandler(void *pvParameters); void mask_user_IRQ(); diff --git a/include/mqttclient.h b/include/mqttclient.h index 804ee0f3..6e351e2a 100644 --- a/include/mqttclient.h +++ b/include/mqttclient.h @@ -6,17 +6,15 @@ #include #include -#define MQTT_ETHERNET 0 // set to 0 to run on Wifi +#define MQTT_ETHERNET 0 // select PHY: set 0 for Wifi, 1 for ethernet #define MQTT_INTOPIC "paxin" #define MQTT_OUTTOPIC "paxout" #define MQTT_PORT 1883 -#define MQTT_SERVER "broker.shiftr.io" -//#define MQTT_CLIENTNAME "arduino" -#define MQTT_USER "try" -#define MQTT_PASSWD "try" +#define MQTT_SERVER "paxcounter.cloud.shiftr.io" +#define MQTT_USER "public" +#define MQTT_PASSWD "public" #define MQTT_RETRYSEC 20 // retry reconnect every 20 seconds #define MQTT_KEEPALIVE 10 // keep alive interval in seconds -#define MQTT_TIMEOUT 1000 // timeout for all mqtt commands in milliseconds #ifndef MQTT_CLIENTNAME #define MQTT_CLIENTNAME clientId.c_str() @@ -27,13 +25,12 @@ extern TaskHandle_t mqttTask; void mqtt_enqueuedata(MessageBuffer_t *message); uint32_t mqtt_queuewaiting(void); void mqtt_queuereset(void); -void setMqttIRQ(void); -void mqtt_loop(void); void mqtt_client_task(void *param); int mqtt_connect(const char *my_host, const uint16_t my_port); void mqtt_callback(MQTTClient *client, char topic[], char payload[], int length); void NetworkEvent(WiFiEvent_t event); esp_err_t mqtt_init(void); +void mqtt_deinit(void); #endif // _MQTTCLIENT_H \ No newline at end of file diff --git a/src/irqhandler.cpp b/src/irqhandler.cpp index 687a6965..244910ad 100644 --- a/src/irqhandler.cpp +++ b/src/irqhandler.cpp @@ -69,14 +69,6 @@ void irqHandler(void *pvParameters) { } #endif -// MQTT loop due? -#if (HAS_MQTT) - if (InterruptStatus & MQTT_IRQ) { - mqtt_loop(); - InterruptStatus &= ~MQTT_IRQ; - } -#endif - // are cyclic tasks due? if (InterruptStatus & CYCLIC_IRQ) { doHousekeeping(); diff --git a/src/main.cpp b/src/main.cpp index 2ce13018..b52ce977 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -68,7 +68,6 @@ TIMESYNC_IRQ -> setTimeSyncIRQ() CYCLIC_IRQ -> setCyclicIRQ() SENDCYCLE_IRQ -> setSendIRQ() BME_IRQ -> setBMEIRQ() -MQTT_IRQ -> setMqttIRQ() ClockTask (Core 1), see timekeeper.cpp diff --git a/src/mqttclient.cpp b/src/mqttclient.cpp index 7a1f3af7..867f06db 100644 --- a/src/mqttclient.cpp +++ b/src/mqttclient.cpp @@ -11,13 +11,17 @@ Ticker mqttTimer; WiFiClient netClient; MQTTClient mqttClient; +void mqtt_deinit(void) { + mqttClient.onMessageAdvanced(NULL); + mqttClient.disconnect(); + vTaskDelete(mqttTask); +} + esp_err_t mqtt_init(void) { // setup network connection WiFi.onEvent(NetworkEvent); ETH.begin(); - // WiFi.mode(WIFI_STA); - // WiFi.begin("SSID", "PASSWORD"); // setup mqtt client mqttClient.begin(MQTT_SERVER, MQTT_PORT, netClient); @@ -33,7 +37,6 @@ esp_err_t mqtt_init(void) { SEND_QUEUE_SIZE * PAYLOAD_BUFFER_SIZE); ESP_LOGI(TAG, "Starting MQTTloop..."); - mqttTimer.attach(MQTT_KEEPALIVE, setMqttIRQ); xTaskCreate(mqtt_client_task, "mqttloop", 4096, (void *)NULL, 1, &mqttTask); return ESP_OK; @@ -113,16 +116,19 @@ void mqtt_client_task(void *param) { while (1) { - // fetch next or wait for payload to send from queue - // do not delete item from queue until it is transmitted - if (xQueuePeek(MQTTSendQueue, &msg, portMAX_DELAY) != pdTRUE) { - ESP_LOGE(TAG, "Premature return from xQueueReceive() with no data!"); - continue; - } - - // send data to mqtt server, if we are connected if (mqttClient.connected()) { + // check for incoming messages + mqttClient.loop(); + + // fetch next or wait for payload to send from queue + // do not delete item from queue until it is transmitted + // consider mqtt timeout while waiting + if (xQueuePeek(MQTTSendQueue, &msg, + MQTT_KEEPALIVE * 1000 / portTICK_PERIOD_MS) != pdTRUE) + continue; + + // send data to mqtt server char buffer[PAYLOAD_BUFFER_SIZE + 3]; snprintf(buffer, msg.MessageSize + 3, "%u/%s", msg.MessagePort, msg.Message); @@ -131,20 +137,17 @@ void mqtt_client_task(void *param) { ESP_LOGI(TAG, "%d byte(s) sent to MQTT server", msg.MessageSize + 2); // delete sent item from queue xQueueReceive(MQTTSendQueue, &msg, (TickType_t)0); - continue; } else ESP_LOGD(TAG, "Couldn't sent message to MQTT server"); - } else { // attempt to reconnect to MQTT server - ESP_LOGD(TAG, "MQTT client reconnecting..."); - ESP_LOGD(TAG, "MQTT last_error = %d / rc = %d", mqttClient.lastError(), + ESP_LOGD(TAG, "MQTT error = %d / rc = %d", mqttClient.lastError(), mqttClient.returnCode()); + ESP_LOGD(TAG, "MQTT client reconnecting..."); delay(MQTT_RETRYSEC * 1000); mqtt_connect(MQTT_SERVER, MQTT_PORT); } - - } // while(1) + } // while (1) } void mqtt_enqueuedata(MessageBuffer_t *message) { @@ -159,18 +162,10 @@ void mqtt_callback(MQTTClient *client, char topic[], char payload[], rcommand((const uint8_t *)payload, (const uint8_t)length); } -void mqtt_loop(void) { - if (!mqttClient.loop()) - ESP_LOGD(TAG, "MQTT last_error = %d / rc = %d", mqttClient.lastError(), - mqttClient.returnCode()); -} - void mqtt_queuereset(void) { xQueueReset(MQTTSendQueue); } uint32_t mqtt_queuewaiting(void) { return uxQueueMessagesWaiting(MQTTSendQueue); } -void setMqttIRQ(void) { xTaskNotify(irqHandlerTask, MQTT_IRQ, eSetBits); } - #endif // HAS_MQTT \ No newline at end of file