Ethernet MQTT client (experimental)
This commit is contained in:
		
							parent
							
								
									10616ffc52
								
							
						
					
					
						commit
						73ee2df3d4
					
				
							
								
								
									
										23
									
								
								include/mqttclient.h
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										23
									
								
								include/mqttclient.h
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,23 @@ | |||||||
|  | #ifndef _MQTTCLIENT_H | ||||||
|  | #define _MQTTCLIENT_H | ||||||
|  | 
 | ||||||
|  | #include "globals.h" | ||||||
|  | #include "rcommand.h" | ||||||
|  | #include <ETH.h> | ||||||
|  | #include <PubSubClient.h> | ||||||
|  | 
 | ||||||
|  | #define MQTT_NAME "paxcounter" | ||||||
|  | #define MQTT_INTOPIC "rcommand" | ||||||
|  | #define MQTT_PORT 1883 | ||||||
|  | 
 | ||||||
|  | extern TaskHandle_t mqttTask; | ||||||
|  | 
 | ||||||
|  | void mqtt_enqueuedata(MessageBuffer_t *message); | ||||||
|  | void mqtt_queuereset(void); | ||||||
|  | void mqtt_client_task(void *param); | ||||||
|  | void mqtt_connect(IPAddress mqtt_host, uint16_t mqtt_port); | ||||||
|  | void mqtt_callback(char *topic, byte *payload, unsigned int length); | ||||||
|  | void WiFiEvent(WiFiEvent_t event); | ||||||
|  | esp_err_t mqtt_init(void); | ||||||
|  | 
 | ||||||
|  | #endif // _MQTTCLIENT_H
 | ||||||
| @ -2,6 +2,7 @@ | |||||||
| #define _SENDDATA_H | #define _SENDDATA_H | ||||||
| 
 | 
 | ||||||
| #include "spislave.h" | #include "spislave.h" | ||||||
|  | #include "mqttclient.h" | ||||||
| #include "cyclic.h" | #include "cyclic.h" | ||||||
| #include "sensor.h" | #include "sensor.h" | ||||||
| #include "lorawan.h" | #include "lorawan.h" | ||||||
|  | |||||||
| @ -10,16 +10,16 @@ | |||||||
| // enable only if you want to store a local paxcount table on the device
 | // enable only if you want to store a local paxcount table on the device
 | ||||||
| #define HAS_SDCARD  2      // this board has an SD-card-reader/writer
 | #define HAS_SDCARD  2      // this board has an SD-card-reader/writer
 | ||||||
| 
 | 
 | ||||||
|  | // enable only if you want to send paxcount via ethernet port to mqtt server
 | ||||||
|  | #define HAS_MQTT 1  // use MQTT on ethernet interface
 | ||||||
|  | 
 | ||||||
| #define DISABLE_BROWNOUT 1 // comment out if you want to keep brownout feature
 | #define DISABLE_BROWNOUT 1 // comment out if you want to keep brownout feature
 | ||||||
| 
 | 
 | ||||||
| //#define BAT_MEASURE_ADC ADC1_GPIO35_CHANNEL // battery probe GPIO pin -> ADC1_CHANNEL_7
 | //#define BAT_MEASURE_ADC ADC1_GPIO35_CHANNEL // battery probe GPIO pin -> ADC1_CHANNEL_7
 | ||||||
|  | //#define BAT_VOLTAGE_DIVIDER 2 // voltage divider 470k/470k on board
 | ||||||
| #define BAT_MEASURE_ADC ADC1_GPIO39_CHANNEL // external power probe GPIO pin
 | #define BAT_MEASURE_ADC ADC1_GPIO39_CHANNEL // external power probe GPIO pin
 | ||||||
| #define BAT_VOLTAGE_DIVIDER 2.1277f // voltage divider 47k/442k on board
 | #define BAT_VOLTAGE_DIVIDER 2.1277f // voltage divider 47k/442k on board
 | ||||||
| 
 | 
 | ||||||
| #define EXT_POWER_SW GPIO_NUM_12 // switches PoE power, Vext control 0 = off / 1 = on
 |  | ||||||
| #define EXT_POWER_ON    1 |  | ||||||
| //#define EXT_POWER_OFF   1
 |  | ||||||
| 
 |  | ||||||
| #define HAS_BUTTON KEY_BUILTIN // on board button
 | #define HAS_BUTTON KEY_BUILTIN // on board button
 | ||||||
| #define HAS_LED NOT_A_PIN // no on board LED
 | #define HAS_LED NOT_A_PIN // no on board LED
 | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -330,6 +330,12 @@ void setup() { | |||||||
|   assert(spi_init() == ESP_OK); |   assert(spi_init() == ESP_OK); | ||||||
| #endif | #endif | ||||||
| 
 | 
 | ||||||
|  | // initialize MQTT
 | ||||||
|  | #ifdef HAS_MQTT | ||||||
|  |   strcat_P(features, " MQTT"); | ||||||
|  |   assert(mqtt_init() == ESP_OK); | ||||||
|  | #endif | ||||||
|  | 
 | ||||||
| #ifdef HAS_SDCARD | #ifdef HAS_SDCARD | ||||||
|   if (sdcard_init()) |   if (sdcard_init()) | ||||||
|     strcat_P(features, " SD"); |     strcat_P(features, " SD"); | ||||||
|  | |||||||
							
								
								
									
										134
									
								
								src/mqttclient.cpp
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										134
									
								
								src/mqttclient.cpp
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,134 @@ | |||||||
|  | #ifdef HAS_MQTT | ||||||
|  | 
 | ||||||
|  | #include "mqttclient.h" | ||||||
|  | 
 | ||||||
|  | static const char TAG[] = __FILE__; | ||||||
|  | 
 | ||||||
|  | IPAddress mqtt_server_ip(192, 168, 11, 57); | ||||||
|  | 
 | ||||||
|  | QueueHandle_t MQTTSendQueue; | ||||||
|  | TaskHandle_t mqttTask; | ||||||
|  | 
 | ||||||
|  | WiFiClient ipClient; | ||||||
|  | PubSubClient client(ipClient); | ||||||
|  | 
 | ||||||
|  | void WiFiEvent(WiFiEvent_t event) { | ||||||
|  |   switch (event) { | ||||||
|  |   case SYSTEM_EVENT_ETH_START: | ||||||
|  |     ESP_LOGI(TAG, "ETH Started"); | ||||||
|  |     ETH.setHostname(MQTT_NAME); | ||||||
|  |     break; | ||||||
|  |   case SYSTEM_EVENT_ETH_CONNECTED: | ||||||
|  |     ESP_LOGI(TAG, "ETH Connected"); | ||||||
|  |     break; | ||||||
|  |   case SYSTEM_EVENT_ETH_GOT_IP: | ||||||
|  |     ESP_LOGI(TAG, "ETH MAC: %s", ETH.macAddress()); | ||||||
|  |     ESP_LOGI(TAG, "IPv4: %s", ETH.localIP()); | ||||||
|  |     ESP_LOGI(TAG, "Link Speed %d Mbps %s", ETH.linkSpeed(), | ||||||
|  |              ETH.fullDuplex() ? "full duplex" : "half duplex"); | ||||||
|  |     mqtt_connect(mqtt_server_ip, MQTT_PORT); | ||||||
|  |     break; | ||||||
|  |   case SYSTEM_EVENT_ETH_DISCONNECTED: | ||||||
|  |     ESP_LOGI(TAG, "ETH Disconnected"); | ||||||
|  |     break; | ||||||
|  |   case SYSTEM_EVENT_ETH_STOP: | ||||||
|  |     ESP_LOGI(TAG, "ETH Stopped"); | ||||||
|  |     break; | ||||||
|  |   default: | ||||||
|  |     break; | ||||||
|  |   } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | void mqtt_connect(IPAddress mqtt_host, uint16_t mqtt_port) { | ||||||
|  |   // attempt to connect to MQTT server
 | ||||||
|  |   if (ipClient.connect(mqtt_server_ip, MQTT_PORT)) { | ||||||
|  |     if (client.connect(MQTT_NAME)) { | ||||||
|  |       ESP_LOGW(TAG, "MQTT server connected, subscribing"); | ||||||
|  |       client.subscribe(MQTT_INTOPIC); | ||||||
|  |     } else { | ||||||
|  |       ESP_LOGW(TAG, "MQTT server not responding, retrying later"); | ||||||
|  |     } | ||||||
|  |   } else | ||||||
|  |     ESP_LOGW(TAG, "MQTT server not connected, retrying later"); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | void mqtt_client_task(void *param) { | ||||||
|  |   while (1) { | ||||||
|  |     MessageBuffer_t msg; | ||||||
|  |     char cPort[4], cMsg[PAYLOAD_BUFFER_SIZE + 1]; | ||||||
|  | 
 | ||||||
|  |     // fetch next or wait for payload to send from queue
 | ||||||
|  |     if (xQueueReceive(MQTTSendQueue, &msg, portMAX_DELAY) != pdTRUE) { | ||||||
|  |       ESP_LOGE(TAG, "Premature return from xQueueReceive() with no data!"); | ||||||
|  |       continue; | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     // send data
 | ||||||
|  |     if (client.connected()) { | ||||||
|  |       snprintf(cPort, sizeof(cPort), "%d", msg.MessagePort); | ||||||
|  |       snprintf(cMsg, sizeof(cMsg), "%s", msg.Message); | ||||||
|  |       client.publish(cPort, cMsg); | ||||||
|  |       client.loop(); | ||||||
|  |       ESP_LOGI(TAG, "%d byte(s) sent to MQTT", msg.MessageSize); | ||||||
|  |     } else { | ||||||
|  |       mqtt_enqueuedata(&msg); // re-enqueue the undelivered message
 | ||||||
|  |       delay(10000); | ||||||
|  |       // attempt to reconnect to MQTT server
 | ||||||
|  |       mqtt_connect(mqtt_server_ip, MQTT_PORT); | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | esp_err_t mqtt_init(void) { | ||||||
|  |   assert(SEND_QUEUE_SIZE); | ||||||
|  |   MQTTSendQueue = xQueueCreate(SEND_QUEUE_SIZE, sizeof(MessageBuffer_t)); | ||||||
|  |   if (MQTTSendQueue == 0) { | ||||||
|  |     ESP_LOGE(TAG, "Could not create MQTT send queue. Aborting."); | ||||||
|  |     return ESP_FAIL; | ||||||
|  |   } | ||||||
|  |   ESP_LOGI(TAG, "MQTT send queue created, size %d Bytes", | ||||||
|  |            SEND_QUEUE_SIZE * PAYLOAD_BUFFER_SIZE); | ||||||
|  | 
 | ||||||
|  |   WiFi.onEvent(WiFiEvent); | ||||||
|  |   assert(ETH.begin()); | ||||||
|  | 
 | ||||||
|  |   client.setServer(mqtt_server_ip, MQTT_PORT); | ||||||
|  |   client.setCallback(mqtt_callback); | ||||||
|  | 
 | ||||||
|  |   ESP_LOGI(TAG, "Starting MQTTloop..."); | ||||||
|  |   xTaskCreate(mqtt_client_task, "mqttloop", 4096, (void *)NULL, 2, &mqttTask); | ||||||
|  | 
 | ||||||
|  |   return ESP_OK; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | void mqtt_enqueuedata(MessageBuffer_t *message) { | ||||||
|  |   // enqueue message in MQTT send queue
 | ||||||
|  |   BaseType_t ret; | ||||||
|  |   MessageBuffer_t DummyBuffer; | ||||||
|  |   sendprio_t prio = message->MessagePrio; | ||||||
|  | 
 | ||||||
|  |   switch (prio) { | ||||||
|  |   case prio_high: | ||||||
|  |     // clear space in queue if full, then fallthrough to normal
 | ||||||
|  |     if (!uxQueueSpacesAvailable(MQTTSendQueue)) | ||||||
|  |       xQueueReceive(MQTTSendQueue, &DummyBuffer, (TickType_t)0); | ||||||
|  |   case prio_normal: | ||||||
|  |     ret = xQueueSendToFront(MQTTSendQueue, (void *)message, (TickType_t)0); | ||||||
|  |     break; | ||||||
|  |   case prio_low: | ||||||
|  |   default: | ||||||
|  |     ret = xQueueSendToBack(MQTTSendQueue, (void *)message, (TickType_t)0); | ||||||
|  |     break; | ||||||
|  |   } | ||||||
|  |   if (ret != pdTRUE) | ||||||
|  |     ESP_LOGW(TAG, "MQTT sendqueue is full"); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | void mqtt_queuereset(void) { xQueueReset(MQTTSendQueue); } | ||||||
|  | 
 | ||||||
|  | void mqtt_callback(char *topic, byte *payload, unsigned int length) { | ||||||
|  |   if ((length) && (topic == MQTT_INTOPIC)) | ||||||
|  |     rcommand(payload, length); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #endif // HAS_MQTT
 | ||||||
| @ -50,6 +50,9 @@ void SendPayload(uint8_t port, sendprio_t prio) { | |||||||
| #ifdef HAS_SPI | #ifdef HAS_SPI | ||||||
|   spi_enqueuedata(&SendBuffer); |   spi_enqueuedata(&SendBuffer); | ||||||
| #endif | #endif | ||||||
|  | #ifdef HAS_MQTT | ||||||
|  |   mqtt_enqueuedata(&SendBuffer); | ||||||
|  | #endif | ||||||
| 
 | 
 | ||||||
| // write data to sdcard, if present
 | // write data to sdcard, if present
 | ||||||
| #ifdef HAS_SDCARD | #ifdef HAS_SDCARD | ||||||
| @ -179,4 +182,7 @@ void flushQueues() { | |||||||
| #ifdef HAS_SPI | #ifdef HAS_SPI | ||||||
|   spi_queuereset(); |   spi_queuereset(); | ||||||
| #endif | #endif | ||||||
|  | #ifdef HAS_MQTT | ||||||
|  |   mqtt_queuereset(); | ||||||
|  | #endif | ||||||
| } | } | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user