diff --git a/include/irqhandler.h b/include/irqhandler.h index 04e62d93..712e5e41 100644 --- a/include/irqhandler.h +++ b/include/irqhandler.h @@ -11,6 +11,7 @@ #define BME_IRQ 0x080 #define MATRIX_DISPLAY_IRQ 0x100 #define PMU_IRQ 0x200 +#define MQTT_IRQ 0x400 #include "globals.h" #include "button.h" @@ -20,10 +21,12 @@ #include "bmesensor.h" #include "power.h" #include "ledmatrixdisplay.h" +#include "mqttclient.h" void irqHandler(void *pvParameters); void mask_user_IRQ(); void unmask_user_IRQ(); +void doIRQ(int irq); #ifdef HAS_DISPLAY void IRAM_ATTR DisplayIRQ(); @@ -41,5 +44,4 @@ void IRAM_ATTR ButtonIRQ(); void IRAM_ATTR PMUIRQ(); #endif - #endif \ No newline at end of file diff --git a/include/mqttclient.h b/include/mqttclient.h index 9bb90364..cf04543a 100644 --- a/include/mqttclient.h +++ b/include/mqttclient.h @@ -6,17 +6,20 @@ #include #include -#define MQTT_INTOPIC "paxcounter_in/" -#define MQTT_OUTTOPIC "paxcounter_out/" +#define MQTT_INTOPIC "paxcounter/in" +#define MQTT_OUTTOPIC "paxcounter/out" #define MQTT_PORT 1883 #define MQTT_SERVER "broker.hivemq.com" -#define MQTT_RETRYSEC 10 // retry reconnect every 10 seconds +//#define MQTT_SERVER "test.mosquitto.org" +#define MQTT_RETRYSEC 20 // retry reconnect every 20 seconds +#define MQTT_KEEPALIVE 10 // seconds to keep alive server connections extern TaskHandle_t mqttTask; -extern PubSubClient mqttClient; void mqtt_enqueuedata(MessageBuffer_t *message); void mqtt_queuereset(void); +void mqtt_irq(void); +void mqtt_loop(void); void mqtt_client_task(void *param); int mqtt_connect(const char *my_host, const uint16_t my_port); void mqtt_callback(char *topic, byte *payload, unsigned int length); diff --git a/src/cyclic.cpp b/src/cyclic.cpp index 77e7439d..96521a61 100644 --- a/src/cyclic.cpp +++ b/src/cyclic.cpp @@ -55,7 +55,6 @@ void doHousekeeping() { #ifdef HAS_MQTT ESP_LOGD(TAG, "MQTTloop %d bytes left | Taskstate = %d", uxTaskGetStackHighWaterMark(mqttTask), eTaskGetState(mqttTask)); - mqttClient.loop(); #endif #if (defined HAS_DCF77 || defined HAS_IF482) @@ -152,7 +151,6 @@ uint32_t getFreeRAM() { void reset_counters() { #if ((WIFICOUNTER) || (BLECOUNTER)) macs.clear(); // clear all macs container - macs_total = 0; // reset all counters macs_wifi = 0; macs_ble = 0; #ifdef HAS_DISPLAY diff --git a/src/irqhandler.cpp b/src/irqhandler.cpp index 196debed..528a8187 100644 --- a/src/irqhandler.cpp +++ b/src/irqhandler.cpp @@ -69,6 +69,14 @@ void irqHandler(void *pvParameters) { } #endif +// MQTT loop due? +#if (HAS_MQTT) + if (InterruptStatus & MQTT_IRQ) { + mqtt_loop(); + InterruptStatus &= ~MQTT_IRQ; + } +#endif + // are cyclic tasks due? if (InterruptStatus & CYCLIC_IRQ) { doHousekeeping(); @@ -91,53 +99,30 @@ void irqHandler(void *pvParameters) { } // for } // irqHandler() -// esp32 hardware timer triggered interrupt service routines +// timer triggered interrupt service routines // they notify the irq handler task -#ifdef HAS_DISPLAY -void IRAM_ATTR DisplayIRQ() { +void IRAM_ATTR doIRQ(int irq) { BaseType_t xHigherPriorityTaskWoken = pdFALSE; - - xTaskNotifyFromISR(irqHandlerTask, DISPLAY_IRQ, eSetBits, - &xHigherPriorityTaskWoken); + xTaskNotifyFromISR(irqHandlerTask, irq, eSetBits, &xHigherPriorityTaskWoken); if (xHigherPriorityTaskWoken) portYIELD_FROM_ISR(); } + +#ifdef HAS_DISPLAY +void IRAM_ATTR DisplayIRQ() { doIRQ(DISPLAY_IRQ); } #endif #ifdef HAS_MATRIX_DISPLAY -void IRAM_ATTR MatrixDisplayIRQ() { - BaseType_t xHigherPriorityTaskWoken = pdFALSE; - - xTaskNotifyFromISR(irqHandlerTask, MATRIX_DISPLAY_IRQ, eSetBits, - &xHigherPriorityTaskWoken); - if (xHigherPriorityTaskWoken) - portYIELD_FROM_ISR(); -} +void IRAM_ATTR MatrixDisplayIRQ() { doIRQ(MATRIX_DISPLAY_IRQ); } #endif #ifdef HAS_BUTTON -void IRAM_ATTR ButtonIRQ() { - BaseType_t xHigherPriorityTaskWoken = pdFALSE; - - xTaskNotifyFromISR(irqHandlerTask, BUTTON_IRQ, eSetBits, - &xHigherPriorityTaskWoken); - - if (xHigherPriorityTaskWoken) - portYIELD_FROM_ISR(); -} +void IRAM_ATTR ButtonIRQ() { doIRQ(BUTTON_IRQ); } #endif #ifdef HAS_PMU -void IRAM_ATTR PMUIRQ() { - BaseType_t xHigherPriorityTaskWoken = pdFALSE; - - xTaskNotifyFromISR(irqHandlerTask, PMU_IRQ, eSetBits, - &xHigherPriorityTaskWoken); - - if (xHigherPriorityTaskWoken) - portYIELD_FROM_ISR(); -} +void IRAM_ATTR PMUIRQ() { doIRQ(PMU_IRQ); } #endif void mask_user_IRQ() { xTaskNotify(irqHandlerTask, MASK_IRQ, eSetBits); } diff --git a/src/mqttclient.cpp b/src/mqttclient.cpp index bfd11319..deb6ce0b 100644 --- a/src/mqttclient.cpp +++ b/src/mqttclient.cpp @@ -7,8 +7,10 @@ static const char TAG[] = __FILE__; QueueHandle_t MQTTSendQueue; TaskHandle_t mqttTask; -WiFiClient EthClient; -PubSubClient mqttClient(EthClient); +Ticker mqttTimer; + +WiFiClient NetClient; +PubSubClient mqttClient(NetClient); void NetworkEvent(WiFiEvent_t event) { switch (event) { @@ -16,22 +18,26 @@ void NetworkEvent(WiFiEvent_t event) { ESP_LOGI(TAG, "Ethernet link layer started"); ETH.setHostname(ETH.macAddress().c_str()); break; + case SYSTEM_EVENT_ETH_STOP: + ESP_LOGI(TAG, "Ethernet link layer stopped"); + break; case SYSTEM_EVENT_ETH_CONNECTED: + case SYSTEM_EVENT_STA_CONNECTED: ESP_LOGI(TAG, "Network link connected"); break; + case SYSTEM_EVENT_ETH_DISCONNECTED: + case SYSTEM_EVENT_STA_DISCONNECTED: + ESP_LOGI(TAG, "Network link disconnected"); + break; case SYSTEM_EVENT_ETH_GOT_IP: ESP_LOGI(TAG, "ETH MAC: %s", ETH.macAddress().c_str()); ESP_LOGI(TAG, "IPv4: %s", ETH.localIP().toString().c_str()); ESP_LOGI(TAG, "Link Speed: %d Mbps %s", ETH.linkSpeed(), ETH.fullDuplex() ? "full duplex" : "half duplex"); + case SYSTEM_EVENT_STA_GOT_IP: mqtt_connect(MQTT_SERVER, MQTT_PORT); break; - case SYSTEM_EVENT_ETH_DISCONNECTED: - ESP_LOGI(TAG, "Network link disconnected"); - break; - case SYSTEM_EVENT_ETH_STOP: - ESP_LOGI(TAG, "Ethernet link layer stopped"); - break; + default: break; } @@ -40,10 +46,12 @@ void NetworkEvent(WiFiEvent_t event) { int mqtt_connect(const char *my_host, const uint16_t my_port) { IPAddress mqtt_server_ip; - static String clientId = "paxcounter-" + ETH.macAddress(); + // static String clientId = "paxcounter-" + ETH.macAddress(); + static String clientId = "paxcounter-" + String(random(0xffff), HEX); + ESP_LOGI(TAG, "MQTT name is %s", clientId.c_str()); - // resolve server + // resolve server host name if (WiFi.hostByName(my_host, mqtt_server_ip)) { ESP_LOGI(TAG, "Attempting to connect to %s [%s]", my_host, mqtt_server_ip.toString().c_str()); @@ -53,9 +61,10 @@ int mqtt_connect(const char *my_host, const uint16_t my_port) { } // attempt to connect to MQTT server - if (EthClient.connect(mqtt_server_ip, my_port, HOMECYCLE * 2 * 1000)) { + if (NetClient.connect(mqtt_server_ip, my_port, HOMECYCLE * 2 * 1000)) { + NetClient.setTimeout(MQTT_KEEPALIVE); // seconds mqttClient.setServer(mqtt_server_ip, my_port); - mqttClient.setKeepAlive(HOMECYCLE * 2); + mqttClient.setKeepAlive(MQTT_KEEPALIVE); mqttClient.setCallback(mqtt_callback); if (mqttClient.connect(clientId.c_str())) { @@ -92,23 +101,30 @@ void mqtt_client_task(void *param) { mqttClient.write('/'); mqttClient.write(msg.Message, msg.MessageSize); if (mqttClient.endPublish()) { - ESP_LOGI(TAG, "%d byte(s) sent to MQTT", msg.MessageSize + 2); - continue; // while(1) - } else - goto reconnect; + ESP_LOGI(TAG, "%d byte(s) sent to MQTT server", msg.MessageSize + 2); + continue; + } else { + mqtt_enqueuedata(&msg); // postpone the undelivered message + ESP_LOGD(TAG, + "Couldn't sent message to MQTT server, message postponed"); + } } else { - // attempt to reconnect to MQTT server - reconnect: + ESP_LOGD(TAG, "MQTT client reconnecting..."); mqtt_enqueuedata(&msg); // postpone the undelivered message delay(MQTT_RETRYSEC * 1000); mqtt_connect(MQTT_SERVER, MQTT_PORT); } + } // while(1) } esp_err_t mqtt_init(void) { + + WiFi.onEvent(NetworkEvent); + ETH.begin(); + assert(SEND_QUEUE_SIZE); MQTTSendQueue = xQueueCreate(SEND_QUEUE_SIZE, sizeof(MessageBuffer_t)); if (MQTTSendQueue == 0) { @@ -119,10 +135,8 @@ esp_err_t mqtt_init(void) { SEND_QUEUE_SIZE * PAYLOAD_BUFFER_SIZE); ESP_LOGI(TAG, "Starting MQTTloop..."); - xTaskCreate(mqtt_client_task, "mqttloop", 4096, (void *)NULL, 2, &mqttTask); - - WiFi.onEvent(NetworkEvent); - ETH.begin(); + mqttTimer.attach(MQTT_KEEPALIVE, mqtt_irq); + xTaskCreate(mqtt_client_task, "mqttloop", 4096, (void *)NULL, 1, &mqttTask); return ESP_OK; } @@ -151,13 +165,17 @@ void mqtt_enqueuedata(MessageBuffer_t *message) { } void mqtt_callback(char *topic, byte *payload, unsigned int length) { - String s = ""; - for (int i = 0; i < length; i++) - s += (char)payload[i]; - ESP_LOGD(TAG, "MQTT: Received %u byte(s) of payload [%s]", length, s); - // rcommand(payload, length); + + char buffer[11]; + snprintf(buffer, 10, "%X02 %X02 %X02 %X02 %X02 %X02 %X02 %X02 %X02 %X02", + payload); + ESP_LOGI(TAG, "MQTT: Received %u byte(s) of payload [%s]", length, buffer); + + rcommand(payload, length); } void mqtt_queuereset(void) { xQueueReset(MQTTSendQueue); } +void mqtt_irq(void) { xTaskNotify(irqHandlerTask, MQTT_IRQ, eSetBits); } +void mqtt_loop(void) { mqttClient.loop(); } #endif // HAS_MQTT \ No newline at end of file