mqttclient reworked (experimental)

This commit is contained in:
cyberman54 2020-12-23 16:31:31 +01:00
parent 1950650bfb
commit a30eab54f7
5 changed files with 25 additions and 44 deletions

View File

@ -11,7 +11,6 @@
#define BME_IRQ 0x080 #define BME_IRQ 0x080
#define MATRIX_DISPLAY_IRQ 0x100 #define MATRIX_DISPLAY_IRQ 0x100
#define PMU_IRQ 0x200 #define PMU_IRQ 0x200
#define MQTT_IRQ 0x400
#include "globals.h" #include "globals.h"
#include "button.h" #include "button.h"
@ -21,7 +20,6 @@
#include "bmesensor.h" #include "bmesensor.h"
#include "power.h" #include "power.h"
#include "ledmatrixdisplay.h" #include "ledmatrixdisplay.h"
#include "mqttclient.h"
void irqHandler(void *pvParameters); void irqHandler(void *pvParameters);
void mask_user_IRQ(); void mask_user_IRQ();

View File

@ -6,17 +6,15 @@
#include <MQTT.h> #include <MQTT.h>
#include <ETH.h> #include <ETH.h>
#define MQTT_ETHERNET 0 // set to 0 to run on Wifi #define MQTT_ETHERNET 0 // select PHY: set 0 for Wifi, 1 for ethernet
#define MQTT_INTOPIC "paxin" #define MQTT_INTOPIC "paxin"
#define MQTT_OUTTOPIC "paxout" #define MQTT_OUTTOPIC "paxout"
#define MQTT_PORT 1883 #define MQTT_PORT 1883
#define MQTT_SERVER "broker.shiftr.io" #define MQTT_SERVER "paxcounter.cloud.shiftr.io"
//#define MQTT_CLIENTNAME "arduino" #define MQTT_USER "public"
#define MQTT_USER "try" #define MQTT_PASSWD "public"
#define MQTT_PASSWD "try"
#define MQTT_RETRYSEC 20 // retry reconnect every 20 seconds #define MQTT_RETRYSEC 20 // retry reconnect every 20 seconds
#define MQTT_KEEPALIVE 10 // keep alive interval in seconds #define MQTT_KEEPALIVE 10 // keep alive interval in seconds
#define MQTT_TIMEOUT 1000 // timeout for all mqtt commands in milliseconds
#ifndef MQTT_CLIENTNAME #ifndef MQTT_CLIENTNAME
#define MQTT_CLIENTNAME clientId.c_str() #define MQTT_CLIENTNAME clientId.c_str()
@ -27,13 +25,12 @@ extern TaskHandle_t mqttTask;
void mqtt_enqueuedata(MessageBuffer_t *message); void mqtt_enqueuedata(MessageBuffer_t *message);
uint32_t mqtt_queuewaiting(void); uint32_t mqtt_queuewaiting(void);
void mqtt_queuereset(void); void mqtt_queuereset(void);
void setMqttIRQ(void);
void mqtt_loop(void);
void mqtt_client_task(void *param); void mqtt_client_task(void *param);
int mqtt_connect(const char *my_host, const uint16_t my_port); int mqtt_connect(const char *my_host, const uint16_t my_port);
void mqtt_callback(MQTTClient *client, char topic[], char payload[], void mqtt_callback(MQTTClient *client, char topic[], char payload[],
int length); int length);
void NetworkEvent(WiFiEvent_t event); void NetworkEvent(WiFiEvent_t event);
esp_err_t mqtt_init(void); esp_err_t mqtt_init(void);
void mqtt_deinit(void);
#endif // _MQTTCLIENT_H #endif // _MQTTCLIENT_H

View File

@ -69,14 +69,6 @@ void irqHandler(void *pvParameters) {
} }
#endif #endif
// MQTT loop due?
#if (HAS_MQTT)
if (InterruptStatus & MQTT_IRQ) {
mqtt_loop();
InterruptStatus &= ~MQTT_IRQ;
}
#endif
// are cyclic tasks due? // are cyclic tasks due?
if (InterruptStatus & CYCLIC_IRQ) { if (InterruptStatus & CYCLIC_IRQ) {
doHousekeeping(); doHousekeeping();

View File

@ -68,7 +68,6 @@ TIMESYNC_IRQ -> setTimeSyncIRQ()
CYCLIC_IRQ -> setCyclicIRQ() CYCLIC_IRQ -> setCyclicIRQ()
SENDCYCLE_IRQ -> setSendIRQ() SENDCYCLE_IRQ -> setSendIRQ()
BME_IRQ -> setBMEIRQ() BME_IRQ -> setBMEIRQ()
MQTT_IRQ -> setMqttIRQ()
ClockTask (Core 1), see timekeeper.cpp ClockTask (Core 1), see timekeeper.cpp

View File

@ -11,13 +11,17 @@ Ticker mqttTimer;
WiFiClient netClient; WiFiClient netClient;
MQTTClient mqttClient; MQTTClient mqttClient;
void mqtt_deinit(void) {
mqttClient.onMessageAdvanced(NULL);
mqttClient.disconnect();
vTaskDelete(mqttTask);
}
esp_err_t mqtt_init(void) { esp_err_t mqtt_init(void) {
// setup network connection // setup network connection
WiFi.onEvent(NetworkEvent); WiFi.onEvent(NetworkEvent);
ETH.begin(); ETH.begin();
// WiFi.mode(WIFI_STA);
// WiFi.begin("SSID", "PASSWORD");
// setup mqtt client // setup mqtt client
mqttClient.begin(MQTT_SERVER, MQTT_PORT, netClient); mqttClient.begin(MQTT_SERVER, MQTT_PORT, netClient);
@ -33,7 +37,6 @@ esp_err_t mqtt_init(void) {
SEND_QUEUE_SIZE * PAYLOAD_BUFFER_SIZE); SEND_QUEUE_SIZE * PAYLOAD_BUFFER_SIZE);
ESP_LOGI(TAG, "Starting MQTTloop..."); ESP_LOGI(TAG, "Starting MQTTloop...");
mqttTimer.attach(MQTT_KEEPALIVE, setMqttIRQ);
xTaskCreate(mqtt_client_task, "mqttloop", 4096, (void *)NULL, 1, &mqttTask); xTaskCreate(mqtt_client_task, "mqttloop", 4096, (void *)NULL, 1, &mqttTask);
return ESP_OK; return ESP_OK;
@ -113,16 +116,19 @@ void mqtt_client_task(void *param) {
while (1) { while (1) {
// fetch next or wait for payload to send from queue
// do not delete item from queue until it is transmitted
if (xQueuePeek(MQTTSendQueue, &msg, portMAX_DELAY) != pdTRUE) {
ESP_LOGE(TAG, "Premature return from xQueueReceive() with no data!");
continue;
}
// send data to mqtt server, if we are connected
if (mqttClient.connected()) { if (mqttClient.connected()) {
// check for incoming messages
mqttClient.loop();
// fetch next or wait for payload to send from queue
// do not delete item from queue until it is transmitted
// consider mqtt timeout while waiting
if (xQueuePeek(MQTTSendQueue, &msg,
MQTT_KEEPALIVE * 1000 / portTICK_PERIOD_MS) != pdTRUE)
continue;
// send data to mqtt server
char buffer[PAYLOAD_BUFFER_SIZE + 3]; char buffer[PAYLOAD_BUFFER_SIZE + 3];
snprintf(buffer, msg.MessageSize + 3, "%u/%s", msg.MessagePort, snprintf(buffer, msg.MessageSize + 3, "%u/%s", msg.MessagePort,
msg.Message); msg.Message);
@ -131,19 +137,16 @@ void mqtt_client_task(void *param) {
ESP_LOGI(TAG, "%d byte(s) sent to MQTT server", msg.MessageSize + 2); ESP_LOGI(TAG, "%d byte(s) sent to MQTT server", msg.MessageSize + 2);
// delete sent item from queue // delete sent item from queue
xQueueReceive(MQTTSendQueue, &msg, (TickType_t)0); xQueueReceive(MQTTSendQueue, &msg, (TickType_t)0);
continue;
} else } else
ESP_LOGD(TAG, "Couldn't sent message to MQTT server"); ESP_LOGD(TAG, "Couldn't sent message to MQTT server");
} else { } else {
// attempt to reconnect to MQTT server // attempt to reconnect to MQTT server
ESP_LOGD(TAG, "MQTT client reconnecting..."); ESP_LOGD(TAG, "MQTT error = %d / rc = %d", mqttClient.lastError(),
ESP_LOGD(TAG, "MQTT last_error = %d / rc = %d", mqttClient.lastError(),
mqttClient.returnCode()); mqttClient.returnCode());
ESP_LOGD(TAG, "MQTT client reconnecting...");
delay(MQTT_RETRYSEC * 1000); delay(MQTT_RETRYSEC * 1000);
mqtt_connect(MQTT_SERVER, MQTT_PORT); mqtt_connect(MQTT_SERVER, MQTT_PORT);
} }
} // while (1) } // while (1)
} }
@ -159,18 +162,10 @@ void mqtt_callback(MQTTClient *client, char topic[], char payload[],
rcommand((const uint8_t *)payload, (const uint8_t)length); rcommand((const uint8_t *)payload, (const uint8_t)length);
} }
void mqtt_loop(void) {
if (!mqttClient.loop())
ESP_LOGD(TAG, "MQTT last_error = %d / rc = %d", mqttClient.lastError(),
mqttClient.returnCode());
}
void mqtt_queuereset(void) { xQueueReset(MQTTSendQueue); } void mqtt_queuereset(void) { xQueueReset(MQTTSendQueue); }
uint32_t mqtt_queuewaiting(void) { uint32_t mqtt_queuewaiting(void) {
return uxQueueMessagesWaiting(MQTTSendQueue); return uxQueueMessagesWaiting(MQTTSendQueue);
} }
void setMqttIRQ(void) { xTaskNotify(irqHandlerTask, MQTT_IRQ, eSetBits); }
#endif // HAS_MQTT #endif // HAS_MQTT