diff --git a/include/mqttclient.h b/include/mqttclient.h new file mode 100644 index 00000000..28013301 --- /dev/null +++ b/include/mqttclient.h @@ -0,0 +1,23 @@ +#ifndef _MQTTCLIENT_H +#define _MQTTCLIENT_H + +#include "globals.h" +#include "rcommand.h" +#include +#include + +#define MQTT_NAME "paxcounter" +#define MQTT_INTOPIC "rcommand" +#define MQTT_PORT 1883 + +extern TaskHandle_t mqttTask; + +void mqtt_enqueuedata(MessageBuffer_t *message); +void mqtt_queuereset(void); +void mqtt_client_task(void *param); +void mqtt_connect(IPAddress mqtt_host, uint16_t mqtt_port); +void mqtt_callback(char *topic, byte *payload, unsigned int length); +void WiFiEvent(WiFiEvent_t event); +esp_err_t mqtt_init(void); + +#endif // _MQTTCLIENT_H \ No newline at end of file diff --git a/include/senddata.h b/include/senddata.h index 7be85016..fd31f884 100644 --- a/include/senddata.h +++ b/include/senddata.h @@ -2,6 +2,7 @@ #define _SENDDATA_H #include "spislave.h" +#include "mqttclient.h" #include "cyclic.h" #include "sensor.h" #include "lorawan.h" diff --git a/src/hal/olimexpoeiso.h b/src/hal/olimexpoeiso.h index 8035a887..227473f9 100644 --- a/src/hal/olimexpoeiso.h +++ b/src/hal/olimexpoeiso.h @@ -10,16 +10,16 @@ // enable only if you want to store a local paxcount table on the device #define HAS_SDCARD 2 // this board has an SD-card-reader/writer +// enable only if you want to send paxcount via ethernet port to mqtt server +#define HAS_MQTT 1 // use MQTT on ethernet interface + #define DISABLE_BROWNOUT 1 // comment out if you want to keep brownout feature //#define BAT_MEASURE_ADC ADC1_GPIO35_CHANNEL // battery probe GPIO pin -> ADC1_CHANNEL_7 +//#define BAT_VOLTAGE_DIVIDER 2 // voltage divider 470k/470k on board #define BAT_MEASURE_ADC ADC1_GPIO39_CHANNEL // external power probe GPIO pin #define BAT_VOLTAGE_DIVIDER 2.1277f // voltage divider 47k/442k on board -#define EXT_POWER_SW GPIO_NUM_12 // switches PoE power, Vext control 0 = off / 1 = on -#define EXT_POWER_ON 1 -//#define EXT_POWER_OFF 1 - #define HAS_BUTTON KEY_BUILTIN // on board button #define HAS_LED NOT_A_PIN // no on board LED diff --git a/src/main.cpp b/src/main.cpp index 66d9c3cb..ffb9f68c 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -330,6 +330,12 @@ void setup() { assert(spi_init() == ESP_OK); #endif +// initialize MQTT +#ifdef HAS_MQTT + strcat_P(features, " MQTT"); + assert(mqtt_init() == ESP_OK); +#endif + #ifdef HAS_SDCARD if (sdcard_init()) strcat_P(features, " SD"); diff --git a/src/mqttclient.cpp b/src/mqttclient.cpp new file mode 100644 index 00000000..7b521658 --- /dev/null +++ b/src/mqttclient.cpp @@ -0,0 +1,134 @@ +#ifdef HAS_MQTT + +#include "mqttclient.h" + +static const char TAG[] = __FILE__; + +IPAddress mqtt_server_ip(192, 168, 11, 57); + +QueueHandle_t MQTTSendQueue; +TaskHandle_t mqttTask; + +WiFiClient ipClient; +PubSubClient client(ipClient); + +void WiFiEvent(WiFiEvent_t event) { + switch (event) { + case SYSTEM_EVENT_ETH_START: + ESP_LOGI(TAG, "ETH Started"); + ETH.setHostname(MQTT_NAME); + break; + case SYSTEM_EVENT_ETH_CONNECTED: + ESP_LOGI(TAG, "ETH Connected"); + break; + case SYSTEM_EVENT_ETH_GOT_IP: + ESP_LOGI(TAG, "ETH MAC: %s", ETH.macAddress()); + ESP_LOGI(TAG, "IPv4: %s", ETH.localIP()); + ESP_LOGI(TAG, "Link Speed %d Mbps %s", ETH.linkSpeed(), + ETH.fullDuplex() ? "full duplex" : "half duplex"); + mqtt_connect(mqtt_server_ip, MQTT_PORT); + break; + case SYSTEM_EVENT_ETH_DISCONNECTED: + ESP_LOGI(TAG, "ETH Disconnected"); + break; + case SYSTEM_EVENT_ETH_STOP: + ESP_LOGI(TAG, "ETH Stopped"); + break; + default: + break; + } +} + +void mqtt_connect(IPAddress mqtt_host, uint16_t mqtt_port) { + // attempt to connect to MQTT server + if (ipClient.connect(mqtt_server_ip, MQTT_PORT)) { + if (client.connect(MQTT_NAME)) { + ESP_LOGW(TAG, "MQTT server connected, subscribing"); + client.subscribe(MQTT_INTOPIC); + } else { + ESP_LOGW(TAG, "MQTT server not responding, retrying later"); + } + } else + ESP_LOGW(TAG, "MQTT server not connected, retrying later"); +} + +void mqtt_client_task(void *param) { + while (1) { + MessageBuffer_t msg; + char cPort[4], cMsg[PAYLOAD_BUFFER_SIZE + 1]; + + // fetch next or wait for payload to send from queue + if (xQueueReceive(MQTTSendQueue, &msg, portMAX_DELAY) != pdTRUE) { + ESP_LOGE(TAG, "Premature return from xQueueReceive() with no data!"); + continue; + } + + // send data + if (client.connected()) { + snprintf(cPort, sizeof(cPort), "%d", msg.MessagePort); + snprintf(cMsg, sizeof(cMsg), "%s", msg.Message); + client.publish(cPort, cMsg); + client.loop(); + ESP_LOGI(TAG, "%d byte(s) sent to MQTT", msg.MessageSize); + } else { + mqtt_enqueuedata(&msg); // re-enqueue the undelivered message + delay(10000); + // attempt to reconnect to MQTT server + mqtt_connect(mqtt_server_ip, MQTT_PORT); + } + } +} + +esp_err_t mqtt_init(void) { + assert(SEND_QUEUE_SIZE); + MQTTSendQueue = xQueueCreate(SEND_QUEUE_SIZE, sizeof(MessageBuffer_t)); + if (MQTTSendQueue == 0) { + ESP_LOGE(TAG, "Could not create MQTT send queue. Aborting."); + return ESP_FAIL; + } + ESP_LOGI(TAG, "MQTT send queue created, size %d Bytes", + SEND_QUEUE_SIZE * PAYLOAD_BUFFER_SIZE); + + WiFi.onEvent(WiFiEvent); + assert(ETH.begin()); + + client.setServer(mqtt_server_ip, MQTT_PORT); + client.setCallback(mqtt_callback); + + ESP_LOGI(TAG, "Starting MQTTloop..."); + xTaskCreate(mqtt_client_task, "mqttloop", 4096, (void *)NULL, 2, &mqttTask); + + return ESP_OK; +} + +void mqtt_enqueuedata(MessageBuffer_t *message) { + // enqueue message in MQTT send queue + BaseType_t ret; + MessageBuffer_t DummyBuffer; + sendprio_t prio = message->MessagePrio; + + switch (prio) { + case prio_high: + // clear space in queue if full, then fallthrough to normal + if (!uxQueueSpacesAvailable(MQTTSendQueue)) + xQueueReceive(MQTTSendQueue, &DummyBuffer, (TickType_t)0); + case prio_normal: + ret = xQueueSendToFront(MQTTSendQueue, (void *)message, (TickType_t)0); + break; + case prio_low: + default: + ret = xQueueSendToBack(MQTTSendQueue, (void *)message, (TickType_t)0); + break; + } + if (ret != pdTRUE) + ESP_LOGW(TAG, "MQTT sendqueue is full"); +} + +void mqtt_queuereset(void) { xQueueReset(MQTTSendQueue); } + +void mqtt_callback(char *topic, byte *payload, unsigned int length) { + if ((length) && (topic == MQTT_INTOPIC)) + rcommand(payload, length); +} + +#endif // HAS_MQTT \ No newline at end of file diff --git a/src/senddata.cpp b/src/senddata.cpp index 07de525a..d6067131 100644 --- a/src/senddata.cpp +++ b/src/senddata.cpp @@ -50,6 +50,9 @@ void SendPayload(uint8_t port, sendprio_t prio) { #ifdef HAS_SPI spi_enqueuedata(&SendBuffer); #endif +#ifdef HAS_MQTT + mqtt_enqueuedata(&SendBuffer); +#endif // write data to sdcard, if present #ifdef HAS_SDCARD @@ -179,4 +182,7 @@ void flushQueues() { #ifdef HAS_SPI spi_queuereset(); #endif +#ifdef HAS_MQTT + mqtt_queuereset(); +#endif }