MqttClient.h
Go to the documentation of this file.
1 /****
2  * Sming Framework Project - Open Source framework for high efficiency native ESP8266 development.
3  * Created 2015 by Skurydin Alexey
4  * http://github.com/SmingHub/Sming
5  * All files of the Sming Core are provided under the LGPL v3 license.
6  *
7  * MqttClient.h
8  *
9  ****/
10 
11 #pragma once
12 
13 #include "TcpClient.h"
14 #include "Url.h"
15 #include <BitManipulations.h>
16 #include <WString.h>
17 #include <WHashMap.h>
18 #include <Data/ObjectQueue.h>
19 #include <Platform/Timers.h>
20 #include "Mqtt/MqttPayloadParser.h"
21 #include "mqtt-codec/src/message.h"
22 #include "mqtt-codec/src/serialiser.h"
23 #include "mqtt-codec/src/parser.h"
24 
32 
33 #ifndef MQTT_REQUEST_POOL_SIZE
34 #define MQTT_REQUEST_POOL_SIZE 10
35 #endif
36 
37 #define MQTT_CLIENT_CONNECTED bit(1)
38 
39 #define MQTT_FLAG_RETAINED 1
40 
41 class MqttClient;
42 
43 using MqttDelegate = Delegate<int(MqttClient& client, mqtt_message_t* message)>;
45 
46 class MqttClient : protected TcpClient
47 {
48 public:
49  MqttClient(bool withDefaultPayloadParser = true, bool autoDestruct = false);
50 
51  ~MqttClient();
52 
57  void setKeepAlive(uint16_t seconds) //send to broker
58  {
59  keepAlive = seconds;
60  if(seconds < pingRepeatTime) {
61  setPingRepeatTime(seconds);
62  }
63  }
64 
70  {
71  seconds = std::min(keepAlive, seconds);
72  if(seconds != pingRepeatTime) {
73  pingRepeatTime = seconds;
74  pingTimer.reset(seconds);
75  }
76  }
77 
85  bool setWill(const String& topic, const String& message, uint8_t flags = 0);
86 
92  bool connect(const Url& url, const String& uniqueClientName);
93 
101  bool publish(const String& topic, const String& message, uint8_t flags = 0);
102 
110  bool publish(const String& topic, IDataSourceStream* stream, uint8_t flags = 0);
111 
117  bool subscribe(const String& topic);
118 
124  bool unsubscribe(const String& topic);
125 
131  void setEventHandler(mqtt_type_t type, MqttDelegate handler)
132  {
133  eventHandlers[type] = handler;
134  }
135 
142  void setPayloadParser(MqttPayloadParser payloadParser = nullptr)
143  {
144  this->payloadParser = payloadParser;
145  }
146 
147  /* [ Convenience methods ] */
148 
157  static uint8_t getFlags(mqtt_qos_t QoS, mqtt_retain_t retain = MQTT_RETAIN_FALSE, mqtt_dup_t dup = MQTT_DUP_FALSE)
158  {
159  return (retain + (QoS << 1) + (dup << 3));
160  }
161 
168  {
169  eventHandlers[MQTT_TYPE_CONNACK] = handler;
170  }
171 
179  {
180  eventHandlers[MQTT_TYPE_PUBACK] = handler;
181  eventHandlers[MQTT_TYPE_PUBREC] = handler;
182  }
183 
190  {
191  eventHandlers[MQTT_TYPE_PUBLISH] = handler;
192  }
193 
200  {
202  }
203 
204  using TcpClient::getSsl;
206 
208 
211 
214 
215 protected:
216  void onReadyToSendData(TcpConnectionEvent sourceEvent) override;
217  void onFinished(TcpClientState finishState) override;
218 
219 private:
220  // TCP methods
221  virtual bool onTcpReceive(TcpClient& client, char* data, int size);
222 
223  // MQTT parser methods
224  static int staticOnMessageBegin(void* user_data, mqtt_message_t* message);
225  static int staticOnDataBegin(void* user_data, mqtt_message_t* message);
226  static int staticOnDataPayload(void* user_data, mqtt_message_t* message, const char* data, size_t length);
227  static int staticOnDataEnd(void* user_data, mqtt_message_t* message);
228  static int staticOnMessageEnd(void* user_data, mqtt_message_t* message);
229  int onMessageEnd(mqtt_message_t* message);
230 
231 private:
232  Url url;
233 
234  // callbacks
235  using HandlerMap = HashMap<mqtt_type_t, MqttDelegate>;
236  HandlerMap eventHandlers;
237  MqttPayloadParser payloadParser = nullptr;
238 
239  // states
240  MqttClientState state = eMCS_Ready;
241  MqttPayloadParserState payloadState = {};
242 
243  // keep-alive and ping
244  uint16_t keepAlive = 60;
245  uint16_t pingRepeatTime = 20;
247 
248  // messages
249  MqttRequestQueue requestQueue;
250  mqtt_message_t connectMessage;
251  bool connectQueued = false;
252  mqtt_message_t* outgoingMessage = nullptr;
253  mqtt_message_t incomingMessage;
254 
255  // parsers and serializers
256  mqtt_serialiser_t serialiser;
257  static const mqtt_parser_callbacks_t callbacks;
258  mqtt_parser_t parser;
259 
260  // client flags
261  uint8_t flags = 0;
262  /* 7 8 6 5 4 3 2 1 0
263  * |
264  * --- set when connected ...
265  */
266 };
267 
static uint8_t getFlags(mqtt_qos_t QoS, mqtt_retain_t retain=MQTT_RETAIN_FALSE, mqtt_dup_t dup=MQTT_DUP_FALSE)
Compute the flags value.
Definition: MqttClient.h:157
Base class for read-only stream.
Definition: DataSourceStream.h:45
TcpClientState getConnectionState()
Definition: TcpClient.h:126
void onFinished(TcpClientState finishState) override
void onReadyToSendData(TcpConnectionEvent sourceEvent) override
bool unsubscribe(const String &topic)
Unsubscribe from a topic.
IDataSourceStream * stream
The currently active stream being sent.
Definition: TcpClient.h:165
void setCompleteDelegate(TcpClientCompleteDelegate completeCb=nullptr)
Set or clear the callback for connection close.
Definition: TcpClient.h:97
void setEventHandler(mqtt_type_t type, MqttDelegate handler)
Register a callback function to be invoked on incoming event notification.
Definition: MqttClient.h:131
Class to manage URL instance.
Definition: Url.h:66
void setConnectedHandler(MqttDelegate handler)
Sets a handler to be called after successful MQTT connection.
Definition: MqttClient.h:167
Template class to implement a polled timer.
Definition: PolledTimer.h:67
bool isProcessing()
Definition: TcpClient.h:121
The String class.
Definition: WString.h:136
TcpConnectionEvent
Definition: TcpConnection.h:26
bool publish(const String &topic, const String &message, uint8_t flags=0)
Publish a message.
Definition: MqttPayloadParser.h:29
Definition: MqttClient.h:46
Definition: TcpClient.h:45
void setDisconnectHandler(TcpClientCompleteDelegate handler)
Sets a handler to be called on disconnect from the server.
Definition: MqttClient.h:199
Ssl::Session * getSsl()
Get a pointer to the current SSL session object.
Definition: TcpConnection.h:148
@ eMCS_Ready
Definition: MqttClient.h:31
void setPayloadParser(MqttPayloadParser payloadParser=nullptr)
Sets or clears a payload parser (for PUBLISH messages from the server to us)
Definition: MqttClient.h:142
IpAddress getRemoteIp() const
Definition: TcpConnection.h:102
MqttClient(bool withDefaultPayloadParser=true, bool autoDestruct=false)
void setMessageHandler(MqttDelegate handler)
Sets a handler to be called after receiving a PUBLISH message from the server.
Definition: MqttClient.h:189
bool subscribe(const String &topic)
Subscribe to a topic.
void setPingRepeatTime(uint16_t seconds)
Definition: MqttClient.h:69
bool setWill(const String &topic, const String &message, uint8_t flags=0)
uint16_t getRemotePort() const
Definition: TcpConnection.h:107
TcpClientState
Definition: TcpClient.h:28
bool connect(const Url &url, const String &uniqueClientName)
Connect to a MQTT server.
@ eMCS_SendingData
Definition: MqttClient.h:31
MqttClientState
Definition: MqttClient.h:31
void setKeepAlive(uint16_t seconds)
Sets keep-alive time. That information is sent during connection to the server.
Definition: MqttClient.h:57
Definition: Delegate.h:20
void setSslInitHandler(Ssl::Session::InitDelegate handler)
Set the SSL session initialisation callback.
Definition: TcpConnection.h:125
void setPublishedHandler(MqttDelegate handler)
Sets a handler to be called after receiving confirmation from the server for a published message from...
Definition: MqttClient.h:178