3 * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
24 * @file iot_mqtt_internal.h
\r
25 * @brief Internal header of MQTT library. This header should not be included in
\r
26 * typical application code.
\r
29 #ifndef IOT_MQTT_INTERNAL_H_
\r
30 #define IOT_MQTT_INTERNAL_H_
\r
32 /* The config header is always included first. */
\r
33 #include "iot_config.h"
\r
35 /* Linear containers (lists and queues) include. */
\r
36 #include "iot_linear_containers.h"
\r
39 #include "iot_mqtt.h"
\r
41 /* Task pool include. */
\r
42 #include "iot_taskpool_freertos.h"
\r
45 * @def IotMqtt_Assert( expression )
\r
46 * @brief Assertion macro for the MQTT library.
\r
48 * Set @ref IOT_MQTT_ENABLE_ASSERTS to `1` to enable assertions in the MQTT
\r
51 * @param[in] expression Expression to be evaluated.
\r
53 #if IOT_MQTT_ENABLE_ASSERTS == 1
\r
54 #ifndef IotMqtt_Assert
\r
55 #ifdef Iot_DefaultAssert
\r
56 #define IotMqtt_Assert( expression ) Iot_DefaultAssert( expression )
\r
58 #error "Asserts are enabled for MQTT, but IotMqtt_Assert is not defined"
\r
62 #define IotMqtt_Assert( expression )
\r
65 /* Configure logs for MQTT functions. */
\r
66 #ifdef IOT_LOG_LEVEL_MQTT
\r
67 #define LIBRARY_LOG_LEVEL IOT_LOG_LEVEL_MQTT
\r
69 #ifdef IOT_LOG_LEVEL_GLOBAL
\r
70 #define LIBRARY_LOG_LEVEL IOT_LOG_LEVEL_GLOBAL
\r
72 #define LIBRARY_LOG_LEVEL IOT_LOG_NONE
\r
76 #define LIBRARY_LOG_NAME ( "MQTT" )
\r
77 #include "iot_logging_setup.h"
\r
80 * Provide default values for undefined memory allocation functions based on
\r
81 * the usage of dynamic memory allocation.
\r
83 #if IOT_STATIC_MEMORY_ONLY == 1
\r
84 #include "iot_static_memory.h"
\r
87 * @brief Allocate an #_mqttConnection_t. This function should have the same
\r
88 * signature as [malloc]
\r
89 * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/malloc.html).
\r
91 void * IotMqtt_MallocConnection( size_t size );
\r
94 * @brief Free an #_mqttConnection_t. This function should have the same
\r
95 * signature as [free]
\r
96 * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).
\r
98 void IotMqtt_FreeConnection( void * ptr );
\r
101 * @brief Allocate memory for an MQTT packet. This function should have the
\r
102 * same signature as [malloc]
\r
103 * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/malloc.html).
\r
105 #define IotMqtt_MallocMessage Iot_MallocMessageBuffer
\r
108 * @brief Free an MQTT packet. This function should have the same signature
\r
110 * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).
\r
112 #define IotMqtt_FreeMessage Iot_FreeMessageBuffer
\r
115 * @brief Allocate an #_mqttOperation_t. This function should have the same
\r
116 * signature as [malloc]
\r
117 * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/malloc.html).
\r
119 void * IotMqtt_MallocOperation( size_t size );
\r
122 * @brief Free an #_mqttOperation_t. This function should have the same
\r
123 * signature as [free]
\r
124 * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).
\r
126 void IotMqtt_FreeOperation( void * ptr );
\r
129 * @brief Allocate an #_mqttSubscription_t. This function should have the
\r
130 * same signature as [malloc]
\r
131 * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/malloc.html).
\r
133 void * IotMqtt_MallocSubscription( size_t size );
\r
136 * @brief Free an #_mqttSubscription_t. This function should have the same
\r
137 * signature as [free]
\r
138 * (http://pubs.opengroup.org/onlinepubs/9699919799/functions/free.html).
\r
140 void IotMqtt_FreeSubscription( void * ptr );
\r
141 #else /* if IOT_STATIC_MEMORY_ONLY == 1 */
\r
142 #ifndef IotMqtt_MallocConnection
\r
143 #ifdef Iot_DefaultMalloc
\r
144 #define IotMqtt_MallocConnection Iot_DefaultMalloc
\r
146 #error "No malloc function defined for IotMqtt_MallocConnection"
\r
150 #ifndef IotMqtt_FreeConnection
\r
151 #ifdef Iot_DefaultFree
\r
152 #define IotMqtt_FreeConnection Iot_DefaultFree
\r
154 #error "No free function defined for IotMqtt_FreeConnection"
\r
158 #ifndef IotMqtt_MallocMessage
\r
159 #ifdef Iot_DefaultMalloc
\r
160 #define IotMqtt_MallocMessage Iot_DefaultMalloc
\r
162 #error "No malloc function defined for IotMqtt_MallocMessage"
\r
166 #ifndef IotMqtt_FreeMessage
\r
167 #ifdef Iot_DefaultFree
\r
168 #define IotMqtt_FreeMessage Iot_DefaultFree
\r
170 #error "No free function defined for IotMqtt_FreeMessage"
\r
174 #ifndef IotMqtt_MallocOperation
\r
175 #ifdef Iot_DefaultMalloc
\r
176 #define IotMqtt_MallocOperation Iot_DefaultMalloc
\r
178 #error "No malloc function defined for IotMqtt_MallocOperation"
\r
182 #ifndef IotMqtt_FreeOperation
\r
183 #ifdef Iot_DefaultFree
\r
184 #define IotMqtt_FreeOperation Iot_DefaultFree
\r
186 #error "No free function defined for IotMqtt_FreeOperation"
\r
190 #ifndef IotMqtt_MallocSubscription
\r
191 #ifdef Iot_DefaultMalloc
\r
192 #define IotMqtt_MallocSubscription Iot_DefaultMalloc
\r
194 #error "No malloc function defined for IotMqtt_MallocSubscription"
\r
198 #ifndef IotMqtt_FreeSubscription
\r
199 #ifdef Iot_DefaultFree
\r
200 #define IotMqtt_FreeSubscription Iot_DefaultFree
\r
202 #error "No free function defined for IotMqtt_FreeSubscription"
\r
205 #endif /* if IOT_STATIC_MEMORY_ONLY == 1 */
\r
208 * @cond DOXYGEN_IGNORE
\r
209 * Doxygen should ignore this section.
\r
211 * Provide default values for undefined configuration constants.
\r
213 #ifndef AWS_IOT_MQTT_ENABLE_METRICS
\r
214 #define AWS_IOT_MQTT_ENABLE_METRICS ( 1 )
\r
216 #ifndef IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES
\r
217 #define IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES ( 0 )
\r
219 #ifndef IOT_MQTT_RESPONSE_WAIT_MS
\r
220 #define IOT_MQTT_RESPONSE_WAIT_MS ( 1000 )
\r
222 #ifndef IOT_MQTT_RETRY_MS_CEILING
\r
223 #define IOT_MQTT_RETRY_MS_CEILING ( 60000 )
\r
228 * @brief Marks the empty statement of an `else` branch.
\r
230 * Does nothing, but allows test coverage to detect branches not taken. By default,
\r
231 * this is defined to nothing. When running code coverage testing, this is defined
\r
232 * to an assembly NOP.
\r
234 #ifndef EMPTY_ELSE_MARKER
\r
235 #define EMPTY_ELSE_MARKER
\r
238 #define MQTT_SERVER_MAX_CLIENTID_LENGTH ( ( uint16_t ) 23 ) /**< @brief Optional maximum length of client identifier specified by MQTT 3.1.1. */
\r
239 #define MQTT_SERVER_MAX_PUBLISH_PAYLOAD_LENGTH ( ( size_t ) ( 268435456 ) ) /**< @brief Maximum publish payload length supported by MQTT 3.1.1. */
\r
240 #define MQTT_SERVER_MAX_LWT_PAYLOAD_LENGTH ( ( size_t ) UINT16_MAX ) /**< @brief Maximum LWT payload length supported by MQTT 3.1.1. */
\r
243 * Constants related to limits defined in AWS Service Limits.
\r
246 * https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html
\r
248 * Used to validate parameters if when connecting to an AWS IoT MQTT server.
\r
250 #define AWS_IOT_MQTT_SERVER_MAX_CLIENTID_LENGTH ( ( uint16_t ) 128 ) /**< @brief Maximum length of client identifier accepted by AWS IoT. */
\r
251 #define AWS_IOT_MQTT_SERVER_MAX_TOPIC_LENGTH ( ( uint16_t ) 256 ) /**< @brief Maximum length of topic names or filters accepted by AWS IoT. */
\r
252 #define AWS_IOT_MQTT_SERVER_MAX_TOPIC_FILTERS_PER_SUBSCRIBE ( ( size_t ) 8 ) /**< @brief Maximum number of topic filters in a single SUBSCRIBE packet. */
\r
253 #define AWS_IOT_MQTT_SERVER_MAX_PUBLISH_PAYLOAD_LENGTH ( ( size_t ) ( 131072 ) ) /**< @brief Maximum publish payload length accepted by AWS IoT. */
\r
256 * MQTT control packet type and flags. Always the first byte of an MQTT
\r
260 * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/csprd02/mqtt-v3.1.1-csprd02.html#_Toc385349757
\r
262 #define MQTT_PACKET_TYPE_CONNECT ( ( uint8_t ) 0x10U ) /**< @brief CONNECT (client-to-server). */
\r
263 #define MQTT_PACKET_TYPE_CONNACK ( ( uint8_t ) 0x20U ) /**< @brief CONNACK (server-to-client). */
\r
264 #define MQTT_PACKET_TYPE_PUBLISH ( ( uint8_t ) 0x30U ) /**< @brief PUBLISH (bi-directional). */
\r
265 #define MQTT_PACKET_TYPE_PUBACK ( ( uint8_t ) 0x40U ) /**< @brief PUBACK (server-to-client). */
\r
266 #define MQTT_PACKET_TYPE_SUBSCRIBE ( ( uint8_t ) 0x82U ) /**< @brief SUBSCRIBE (client-to-server). */
\r
267 #define MQTT_PACKET_TYPE_SUBACK ( ( uint8_t ) 0x90U ) /**< @brief SUBACK (server-to-client). */
\r
268 #define MQTT_PACKET_TYPE_UNSUBSCRIBE ( ( uint8_t ) 0xa2U ) /**< @brief UNSUBSCRIBE (client-to-server). */
\r
269 #define MQTT_PACKET_TYPE_UNSUBACK ( ( uint8_t ) 0xb0U ) /**< @brief UNSUBACK (server-to-client). */
\r
270 #define MQTT_PACKET_TYPE_PINGREQ ( ( uint8_t ) 0xc0U ) /**< @brief PINGREQ (client-to-server). */
\r
271 #define MQTT_PACKET_TYPE_PINGRESP ( ( uint8_t ) 0xd0U ) /**< @brief PINGRESP (server-to-client). */
\r
272 #define MQTT_PACKET_TYPE_DISCONNECT ( ( uint8_t ) 0xe0U ) /**< @brief DISCONNECT (client-to-server). */
\r
275 * @brief A value that represents an invalid remaining length.
\r
277 * This value is greater than what is allowed by the MQTT specification.
\r
279 #define MQTT_REMAINING_LENGTH_INVALID ( ( size_t ) 268435456 )
\r
282 * @brief When this flag is passed, MQTT functions will execute jobs on the calling
\r
283 * thread, bypassing the task pool.
\r
285 * This flag is used along with @ref mqtt_constants_flags, but is intended for internal
\r
286 * use only. Nevertheless, its value must be bitwise exclusive of all conflicting
\r
287 * @ref mqtt_constants_flags.
\r
289 #define MQTT_INTERNAL_FLAG_BLOCK_ON_SEND ( 0x80000000 )
\r
292 * @brief When calling #_IotMqtt_RemoveSubscriptionByPacket, use this value
\r
293 * for `order` to delete all subscriptions for the packet.
\r
295 * This flag is used along with @ref mqtt_constants_flags, but is intended for internal
\r
298 * @ref mqtt_constants_flags.
\r
300 #define MQTT_REMOVE_ALL_SUBSCRIPTIONS ( -1 )
\r
302 /*---------------------- MQTT internal data structures ----------------------*/
\r
305 * @cond DOXYGEN_IGNORE
\r
306 * Doxygen should ignore this section.
\r
308 * Forward declaration of MQTT connection type.
\r
310 struct _mqttConnection;
\r
314 * @brief Internal structure representing a single MQTT operation, such as
\r
315 * CONNECT, SUBSCRIBE, PUBLISH, etc.
\r
317 * Queues of these structures keeps track of all in-progress MQTT operations.
\r
319 typedef struct _mqttOperation
\r
321 /* Pointers to neighboring queue elements. */
\r
322 IotLink_t link; /**< @brief List link member. */
\r
324 bool incomingPublish; /**< @brief Set to true if this operation is an incoming PUBLISH. */
\r
325 struct _mqttConnection * pMqttConnection; /**< @brief MQTT connection associated with this operation. */
\r
327 IotTaskPoolJobStorage_t jobStorage; /**< @brief Task pool job storage associated with this operation. */
\r
328 IotTaskPoolJob_t job; /**< @brief Task pool job associated with this operation. */
\r
332 /* If incomingPublish is false, this struct is valid. */
\r
335 /* Basic operation information. */
\r
336 int32_t jobReference; /**< @brief Tracks if a job is using this operation. Must always be 0, 1, or 2. */
\r
337 IotMqttOperationType_t type; /**< @brief What operation this structure represents. */
\r
338 uint32_t flags; /**< @brief Flags passed to the function that created this operation. */
\r
339 uint16_t packetIdentifier; /**< @brief The packet identifier used with this operation. */
\r
341 /* Serialized packet and size. */
\r
342 uint8_t * pMqttPacket; /**< @brief The MQTT packet to send over the network. */
\r
343 uint8_t * pPacketIdentifierHigh; /**< @brief The location of the high byte of the packet identifier in the MQTT packet. */
\r
344 size_t packetSize; /**< @brief Size of `pMqttPacket`. */
\r
346 /* How to notify of an operation's completion. */
\r
349 IotSemaphore_t waitSemaphore; /**< @brief Semaphore to be used with @ref mqtt_function_wait. */
\r
350 IotMqttCallbackInfo_t callback; /**< @brief User-provided callback function and parameter. */
\r
351 } notify; /**< @brief How to notify of this operation's completion. */
\r
352 IotMqttError_t status; /**< @brief Result of this operation. This is reported once a response is received. */
\r
358 uint32_t count; /**< @brief Current number of retries. */
\r
359 uint32_t limit; /**< @brief Maximum number of retries allowed. */
\r
360 uint32_t nextPeriodMs; /**< @brief Next retry period. */
\r
361 } retry; /**< @brief Additional information for PUBLISH retry. */
\r
365 uint32_t failure; /**< @brief Flag tracking keep-alive status. */
\r
366 uint32_t keepAliveMs; /**< @brief Keep-alive interval in milliseconds. Its max value (per spec) is 65,535,000. */
\r
367 uint32_t nextPeriodMs; /**< @brief Relative delay for next keep-alive job. */
\r
368 } ping; /**< @brief Additional information for keep-alive pings. */
\r
369 } periodic; /**< @brief Additional information for periodic operations. */
\r
372 /* If incomingPublish is true, this struct is valid. */
\r
375 IotMqttPublishInfo_t publishInfo; /**< @brief Deserialized PUBLISH. */
\r
376 const void * pReceivedData; /**< @brief Any buffer associated with this PUBLISH that should be freed. */
\r
378 } u; /**< @brief Valid member depends on _mqttOperation_t.incomingPublish. */
\r
379 } _mqttOperation_t;
\r
382 * @brief Represents an MQTT connection.
\r
384 typedef struct _mqttConnection
\r
386 bool awsIotMqttMode; /**< @brief Specifies if this connection is to an AWS IoT MQTT server. */
\r
387 bool ownNetworkConnection; /**< @brief Whether this MQTT connection owns its network connection. */
\r
388 void * pNetworkConnection; /**< @brief References the transport-layer network connection. */
\r
389 const IotNetworkInterface_t * pNetworkInterface; /**< @brief Network interface provided to @ref mqtt_function_connect. */
\r
390 IotMqttCallbackInfo_t disconnectCallback; /**< @brief A function to invoke when this connection is disconnected. */
\r
392 const IotMqttSerializer_t * pSerializer; /**< @brief MQTT packet serializer overrides. */
\r
394 bool disconnected; /**< @brief Tracks if this connection has been disconnected. */
\r
395 IotMutex_t referencesMutex; /**< @brief Recursive mutex. Grants access to connection state and operation lists. */
\r
396 int32_t references; /**< @brief Counts callbacks and operations using this connection. */
\r
397 IotListDouble_t pendingProcessing; /**< @brief List of operations waiting to be processed by a task pool routine. */
\r
398 IotListDouble_t pendingResponse; /**< @brief List of processed operations awaiting a server response. */
\r
400 IotListDouble_t subscriptionList; /**< @brief Holds subscriptions associated with this connection. */
\r
401 IotMutex_t subscriptionMutex; /**< @brief Grants exclusive access to the subscription list. */
\r
403 _mqttOperation_t pingreq; /**< @brief Operation used for MQTT keep-alive. */
\r
404 } _mqttConnection_t;
\r
407 * @brief Represents a subscription stored in an MQTT connection.
\r
409 typedef struct _mqttSubscription
\r
411 IotLink_t link; /**< @brief List link member. */
\r
413 int32_t references; /**< @brief How many subscription callbacks are using this subscription. */
\r
416 * @brief Tracks whether an unsubscribe function has been called for
\r
417 * this subscription.
\r
419 * If there are active subscription callbacks, this subscription cannot be removed.
\r
420 * Instead, this flag will be set, which schedules the removal of this subscription
\r
421 * once all subscription callbacks terminate.
\r
427 uint16_t identifier; /**< @brief Packet identifier. */
\r
428 size_t order; /**< @brief Order in the packet's list of subscriptions. */
\r
429 } packetInfo; /**< @brief Information about the SUBSCRIBE packet that registered this subscription. */
\r
431 IotMqttCallbackInfo_t callback; /**< @brief Callback information for this subscription. */
\r
433 uint16_t topicFilterLength; /**< @brief Length of #_mqttSubscription_t.pTopicFilter. */
\r
434 char pTopicFilter[]; /**< @brief The subscription topic filter. */
\r
435 } _mqttSubscription_t;
\r
438 * @brief Represents an MQTT packet received from the network.
\r
440 * This struct is used to hold parameters for the deserializers so that all
\r
441 * deserializers have the same function signature.
\r
443 typedef struct _mqttPacket
\r
448 * @brief (Input) MQTT connection associated with this packet. Only used
\r
449 * when deserializing SUBACKs.
\r
451 _mqttConnection_t * pMqttConnection;
\r
454 * @brief (Output) Operation representing an incoming PUBLISH. Only used
\r
455 * when deserializing PUBLISHes.
\r
457 _mqttOperation_t * pIncomingPublish;
\r
458 } u; /**< @brief Valid member depends on packet being decoded. */
\r
460 uint8_t * pRemainingData; /**< @brief (Input) The remaining data in MQTT packet. */
\r
461 size_t remainingLength; /**< @brief (Input) Length of the remaining data in the MQTT packet. */
\r
462 uint16_t packetIdentifier; /**< @brief (Output) MQTT packet identifier. */
\r
463 uint8_t type; /**< @brief (Input) A value identifying the packet type. */
\r
466 /*-------------------- MQTT struct validation functions ---------------------*/
\r
469 * @brief Check that an #IotMqttConnectInfo_t is valid.
\r
471 * @param[in] pConnectInfo The #IotMqttConnectInfo_t to validate.
\r
473 * @return `true` if `pConnectInfo` is valid; `false` otherwise.
\r
475 bool _IotMqtt_ValidateConnect( const IotMqttConnectInfo_t * pConnectInfo );
\r
478 * @brief Check that an #IotMqttPublishInfo_t is valid.
\r
480 * @param[in] awsIotMqttMode Specifies if this PUBLISH packet is being sent to
\r
481 * an AWS IoT MQTT server.
\r
482 * @param[in] pPublishInfo The #IotMqttPublishInfo_t to validate.
\r
484 * @return `true` if `pPublishInfo` is valid; `false` otherwise.
\r
486 bool _IotMqtt_ValidatePublish( bool awsIotMqttMode,
\r
487 const IotMqttPublishInfo_t * pPublishInfo );
\r
490 * @brief Check that an #IotMqttPublishInfo_t is valid for an LWT publish
\r
492 * @param[in] awsIotMqttMode Specifies if this PUBLISH packet is being sent to
\r
493 * an AWS IoT MQTT server.
\r
494 * @param[in] pLwtPublishInfo The #IotMqttPublishInfo_t to validate.
\r
496 * @return `true` if `pLwtPublishInfo` is valid; `false` otherwise.
\r
498 bool _IotMqtt_ValidateLwtPublish( bool awsIotMqttMode,
\r
499 const IotMqttPublishInfo_t * pLwtPublishInfo );
\r
502 * @brief Check that an #IotMqttOperation_t is valid and waitable.
\r
504 * @param[in] operation The #IotMqttOperation_t to validate.
\r
506 * @return `true` if `operation` is valid; `false` otherwise.
\r
508 bool _IotMqtt_ValidateOperation( IotMqttOperation_t operation );
\r
511 * @brief Check that a list of #IotMqttSubscription_t is valid.
\r
513 * @param[in] operation Either #IOT_MQTT_SUBSCRIBE or #IOT_MQTT_UNSUBSCRIBE.
\r
514 * Some parameters are not validated for #IOT_MQTT_UNSUBSCRIBE.
\r
515 * @param[in] awsIotMqttMode Specifies if this SUBSCRIBE packet is being sent to
\r
516 * an AWS IoT MQTT server.
\r
517 * @param[in] pListStart First element of the list to validate.
\r
518 * @param[in] listSize Number of elements in the subscription list.
\r
520 * @return `true` if every element in the list is valid; `false` otherwise.
\r
522 bool _IotMqtt_ValidateSubscriptionList( IotMqttOperationType_t operation,
\r
523 bool awsIotMqttMode,
\r
524 const IotMqttSubscription_t * pListStart,
\r
527 /*-------------------- MQTT packet serializer functions ---------------------*/
\r
530 * @brief Get the MQTT packet type from a stream of bytes off the network.
\r
532 * @param[in] pNetworkConnection Reference to the network connection.
\r
533 * @param[in] pNetworkInterface Function pointers used to interact with the
\r
536 * @return One of the server-to-client MQTT packet types.
\r
538 * @note This function is only used for incoming packets, and may not work
\r
539 * correctly for outgoing packets.
\r
541 uint8_t _IotMqtt_GetPacketType( void * pNetworkConnection,
\r
542 const IotNetworkInterface_t * pNetworkInterface );
\r
545 * @brief Get the remaining length from a stream of bytes off the network.
\r
547 * @param[in] pNetworkConnection Reference to the network connection.
\r
548 * @param[in] pNetworkInterface Function pointers used to interact with the
\r
551 * @return The remaining length; #MQTT_REMAINING_LENGTH_INVALID on error.
\r
553 size_t _IotMqtt_GetRemainingLength( void * pNetworkConnection,
\r
554 const IotNetworkInterface_t * pNetworkInterface );
\r
557 * @brief Get the remaining length from a stream of bytes off the network.
\r
559 * @param[in] pNetworkConnection Reference to the network connection.
\r
560 * @param[in] getNextByte Function pointer used to interact with the
\r
561 * network to get next byte.
\r
563 * @return The remaining length; #MQTT_REMAINING_LENGTH_INVALID on error.
\r
565 * @note This function is similar to _IotMqtt_GetRemainingLength() but it uses
\r
566 * user provided getNextByte function to parse the stream instead of using
\r
567 * _IotMqtt_GetNextByte(). pNetworkConnection is implementation dependent and
\r
568 * user provided function makes use of it.
\r
571 size_t _IotMqtt_GetRemainingLength_Generic( void * pNetworkConnection,
\r
572 IotMqttGetNextByte_t getNextByte );
\r
575 * @brief Generate a CONNECT packet from the given parameters.
\r
577 * @param[in] pConnectInfo User-provided CONNECT information.
\r
578 * @param[out] pConnectPacket Where the CONNECT packet is written.
\r
579 * @param[out] pPacketSize Size of the packet written to `pConnectPacket`.
\r
581 * @return #IOT_MQTT_SUCCESS or #IOT_MQTT_NO_MEMORY.
\r
583 IotMqttError_t _IotMqtt_SerializeConnect( const IotMqttConnectInfo_t * pConnectInfo,
\r
584 uint8_t ** pConnectPacket,
\r
585 size_t * pPacketSize );
\r
588 * @brief Deserialize a CONNACK packet.
\r
590 * Converts the packet from a stream of bytes to an #IotMqttError_t. Also
\r
591 * prints out debug log messages about the packet.
\r
593 * @param[in,out] pConnack Pointer to an MQTT packet struct representing a CONNACK.
\r
595 * @return #IOT_MQTT_SUCCESS if CONNACK specifies that CONNECT was accepted;
\r
596 * #IOT_MQTT_SERVER_REFUSED if CONNACK specifies that CONNECT was rejected;
\r
597 * #IOT_MQTT_BAD_RESPONSE if the CONNACK packet doesn't follow MQTT spec.
\r
599 IotMqttError_t _IotMqtt_DeserializeConnack( _mqttPacket_t * pConnack );
\r
602 * @brief Generate a PUBLISH packet from the given parameters.
\r
604 * @param[in] pPublishInfo User-provided PUBLISH information.
\r
605 * @param[out] pPublishPacket Where the PUBLISH packet is written.
\r
606 * @param[out] pPacketSize Size of the packet written to `pPublishPacket`.
\r
607 * @param[out] pPacketIdentifier The packet identifier generated for this PUBLISH.
\r
608 * @param[out] pPacketIdentifierHigh Where the high byte of the packet identifier
\r
611 * @return #IOT_MQTT_SUCCESS or #IOT_MQTT_NO_MEMORY.
\r
613 IotMqttError_t _IotMqtt_SerializePublish( const IotMqttPublishInfo_t * pPublishInfo,
\r
614 uint8_t ** pPublishPacket,
\r
615 size_t * pPacketSize,
\r
616 uint16_t * pPacketIdentifier,
\r
617 uint8_t ** pPacketIdentifierHigh );
\r
620 * @brief Set the DUP bit in a QoS 1 PUBLISH packet.
\r
622 * @param[in] pPublishPacket Pointer to the PUBLISH packet to modify.
\r
623 * @param[in] pPacketIdentifierHigh The high byte of any packet identifier to modify.
\r
624 * @param[out] pNewPacketIdentifier Since AWS IoT does not support the DUP flag,
\r
625 * a new packet identifier is generated and should be written here. This parameter
\r
626 * is only used when connected to an AWS IoT MQTT server.
\r
628 * @note See #IotMqttPublishInfo_t for caveats with retransmission to the
\r
629 * AWS IoT MQTT server.
\r
631 void _IotMqtt_PublishSetDup( uint8_t * pPublishPacket,
\r
632 uint8_t * pPacketIdentifierHigh,
\r
633 uint16_t * pNewPacketIdentifier );
\r
636 * @brief Deserialize a PUBLISH packet received from the server.
\r
638 * Converts the packet from a stream of bytes to an #IotMqttPublishInfo_t and
\r
639 * extracts the packet identifier. Also prints out debug log messages about the
\r
642 * @param[in,out] pPublish Pointer to an MQTT packet struct representing a PUBLISH.
\r
644 * @return #IOT_MQTT_SUCCESS if PUBLISH is valid; #IOT_MQTT_BAD_RESPONSE
\r
645 * if the PUBLISH packet doesn't follow MQTT spec.
\r
647 IotMqttError_t _IotMqtt_DeserializePublish( _mqttPacket_t * pPublish );
\r
650 * @brief Generate a PUBACK packet for the given packet identifier.
\r
652 * @param[in] packetIdentifier The packet identifier to place in PUBACK.
\r
653 * @param[out] pPubackPacket Where the PUBACK packet is written.
\r
654 * @param[out] pPacketSize Size of the packet written to `pPubackPacket`.
\r
656 * @return #IOT_MQTT_SUCCESS or #IOT_MQTT_NO_MEMORY.
\r
658 IotMqttError_t _IotMqtt_SerializePuback( uint16_t packetIdentifier,
\r
659 uint8_t ** pPubackPacket,
\r
660 size_t * pPacketSize );
\r
663 * @brief Deserialize a PUBACK packet.
\r
665 * Converts the packet from a stream of bytes to an #IotMqttError_t and extracts
\r
666 * the packet identifier. Also prints out debug log messages about the packet.
\r
668 * @param[in,out] pPuback Pointer to an MQTT packet struct representing a PUBACK.
\r
670 * @return #IOT_MQTT_SUCCESS if PUBACK is valid; #IOT_MQTT_BAD_RESPONSE
\r
671 * if the PUBACK packet doesn't follow MQTT spec.
\r
673 IotMqttError_t _IotMqtt_DeserializePuback( _mqttPacket_t * pPuback );
\r
676 * @brief Generate a SUBSCRIBE packet from the given parameters.
\r
678 * @param[in] pSubscriptionList User-provided array of subscriptions.
\r
679 * @param[in] subscriptionCount Size of `pSubscriptionList`.
\r
680 * @param[out] pSubscribePacket Where the SUBSCRIBE packet is written.
\r
681 * @param[out] pPacketSize Size of the packet written to `pSubscribePacket`.
\r
682 * @param[out] pPacketIdentifier The packet identifier generated for this SUBSCRIBE.
\r
684 * @return #IOT_MQTT_SUCCESS or #IOT_MQTT_NO_MEMORY.
\r
686 IotMqttError_t _IotMqtt_SerializeSubscribe( const IotMqttSubscription_t * pSubscriptionList,
\r
687 size_t subscriptionCount,
\r
688 uint8_t ** pSubscribePacket,
\r
689 size_t * pPacketSize,
\r
690 uint16_t * pPacketIdentifier );
\r
693 * @brief Deserialize a SUBACK packet.
\r
695 * Converts the packet from a stream of bytes to an #IotMqttError_t and extracts
\r
696 * the packet identifier. Also prints out debug log messages about the packet.
\r
698 * @param[in,out] pSuback Pointer to an MQTT packet struct representing a SUBACK.
\r
700 * @return #IOT_MQTT_SUCCESS if SUBACK is valid; #IOT_MQTT_BAD_RESPONSE
\r
701 * if the SUBACK packet doesn't follow MQTT spec.
\r
703 IotMqttError_t _IotMqtt_DeserializeSuback( _mqttPacket_t * pSuback );
\r
706 * @brief Generate an UNSUBSCRIBE packet from the given parameters.
\r
708 * @param[in] pSubscriptionList User-provided array of subscriptions to remove.
\r
709 * @param[in] subscriptionCount Size of `pSubscriptionList`.
\r
710 * @param[out] pUnsubscribePacket Where the UNSUBSCRIBE packet is written.
\r
711 * @param[out] pPacketSize Size of the packet written to `pUnsubscribePacket`.
\r
712 * @param[out] pPacketIdentifier The packet identifier generated for this UNSUBSCRIBE.
\r
714 * @return #IOT_MQTT_SUCCESS or #IOT_MQTT_NO_MEMORY.
\r
716 IotMqttError_t _IotMqtt_SerializeUnsubscribe( const IotMqttSubscription_t * pSubscriptionList,
\r
717 size_t subscriptionCount,
\r
718 uint8_t ** pUnsubscribePacket,
\r
719 size_t * pPacketSize,
\r
720 uint16_t * pPacketIdentifier );
\r
723 * @brief Deserialize a UNSUBACK packet.
\r
725 * Converts the packet from a stream of bytes to an #IotMqttError_t and extracts
\r
726 * the packet identifier. Also prints out debug log messages about the packet.
\r
728 * @param[in,out] pUnsuback Pointer to an MQTT packet struct representing an UNSUBACK.
\r
730 * @return #IOT_MQTT_SUCCESS if UNSUBACK is valid; #IOT_MQTT_BAD_RESPONSE
\r
731 * if the UNSUBACK packet doesn't follow MQTT spec.
\r
733 IotMqttError_t _IotMqtt_DeserializeUnsuback( _mqttPacket_t * pUnsuback );
\r
736 * @brief Generate a PINGREQ packet.
\r
738 * @param[out] pPingreqPacket Where the PINGREQ packet is written.
\r
739 * @param[out] pPacketSize Size of the packet written to `pPingreqPacket`.
\r
741 * @return Always returns #IOT_MQTT_SUCCESS.
\r
743 IotMqttError_t _IotMqtt_SerializePingreq( uint8_t ** pPingreqPacket,
\r
744 size_t * pPacketSize );
\r
747 * @brief Deserialize a PINGRESP packet.
\r
749 * Converts the packet from a stream of bytes to an #IotMqttError_t. Also
\r
750 * prints out debug log messages about the packet.
\r
752 * @param[in,out] pPingresp Pointer to an MQTT packet struct representing a PINGRESP.
\r
754 * @return #IOT_MQTT_SUCCESS if PINGRESP is valid; #IOT_MQTT_BAD_RESPONSE
\r
755 * if the PINGRESP packet doesn't follow MQTT spec.
\r
757 IotMqttError_t _IotMqtt_DeserializePingresp( _mqttPacket_t * pPingresp );
\r
760 * @brief Generate a DISCONNECT packet.
\r
762 * @param[out] pDisconnectPacket Where the DISCONNECT packet is written.
\r
763 * @param[out] pPacketSize Size of the packet written to `pDisconnectPacket`.
\r
765 * @return Always returns #IOT_MQTT_SUCCESS.
\r
767 IotMqttError_t _IotMqtt_SerializeDisconnect( uint8_t ** pDisconnectPacket,
\r
768 size_t * pPacketSize );
\r
771 * @brief Free a packet generated by the serializer.
\r
773 * @param[in] pPacket The packet to free.
\r
775 void _IotMqtt_FreePacket( uint8_t * pPacket );
\r
777 /*-------------------- MQTT operation record functions ----------------------*/
\r
780 * @brief Create a record for a new in-progress MQTT operation.
\r
782 * @param[in] pMqttConnection The MQTT connection to associate with the operation.
\r
783 * @param[in] flags Flags variable passed to a user-facing MQTT function.
\r
784 * @param[in] pCallbackInfo User-provided callback function and parameter.
\r
785 * @param[out] pNewOperation Set to point to the new operation on success.
\r
787 * @return #IOT_MQTT_SUCCESS, #IOT_MQTT_BAD_PARAMETER, or #IOT_MQTT_NO_MEMORY.
\r
789 IotMqttError_t _IotMqtt_CreateOperation( _mqttConnection_t * pMqttConnection,
\r
791 const IotMqttCallbackInfo_t * pCallbackInfo,
\r
792 _mqttOperation_t ** pNewOperation );
\r
795 * @brief Decrement the job reference count of an MQTT operation and optionally
\r
798 * Checks if the operation may be destroyed afterwards.
\r
800 * @param[in] pOperation The MQTT operation with the job to cancel.
\r
801 * @param[in] cancelJob Whether to attempt cancellation of the operation's job.
\r
803 * @return `true` if the the operation may be safely destroyed; `false` otherwise.
\r
805 bool _IotMqtt_DecrementOperationReferences( _mqttOperation_t * pOperation,
\r
809 * @brief Free resources used to record an MQTT operation. This is called when
\r
810 * the operation completes.
\r
812 * @param[in] pOperation The operation which completed.
\r
814 void _IotMqtt_DestroyOperation( _mqttOperation_t * pOperation );
\r
817 * @brief Task pool routine for processing an MQTT connection's keep-alive.
\r
819 * @param[in] pTaskPool Pointer to the system task pool.
\r
820 * @param[in] pKeepAliveJob Pointer the an MQTT connection's keep-alive job.
\r
821 * @param[in] pContext Pointer to an MQTT connection, passed as an opaque context.
\r
823 void _IotMqtt_ProcessKeepAlive( IotTaskPool_t pTaskPool,
\r
824 IotTaskPoolJob_t pKeepAliveJob,
\r
828 * @brief Task pool routine for processing an incoming PUBLISH message.
\r
830 * @param[in] pTaskPool Pointer to the system task pool.
\r
831 * @param[in] pPublishJob Pointer to the incoming PUBLISH operation's job.
\r
832 * @param[in] pContext Pointer to the incoming PUBLISH operation, passed as an
\r
835 void _IotMqtt_ProcessIncomingPublish( IotTaskPool_t pTaskPool,
\r
836 IotTaskPoolJob_t pPublishJob,
\r
840 * @brief Task pool routine for processing an MQTT operation to send.
\r
842 * @param[in] pTaskPool Pointer to the system task pool.
\r
843 * @param[in] pSendJob Pointer to an operation's job.
\r
844 * @param[in] pContext Pointer to the operation to send, passed as an opaque
\r
847 void _IotMqtt_ProcessSend( IotTaskPool_t pTaskPool,
\r
848 IotTaskPoolJob_t pSendJob,
\r
852 * @brief Task pool routine for processing a completed MQTT operation.
\r
854 * @param[in] pTaskPool Pointer to the system task pool.
\r
855 * @param[in] pOperationJob Pointer to the completed operation's job.
\r
856 * @param[in] pContext Pointer to the completed operation, passed as an opaque
\r
859 void _IotMqtt_ProcessCompletedOperation( IotTaskPool_t pTaskPool,
\r
860 IotTaskPoolJob_t pOperationJob,
\r
864 * @brief Schedule an operation for immediate processing.
\r
866 * @param[in] pOperation The operation to schedule.
\r
867 * @param[in] jobRoutine The routine to run for the job. Must be either
\r
868 * #_IotMqtt_ProcessSend, #_IotMqtt_ProcessCompletedOperation, or
\r
869 * #_IotMqtt_ProcessIncomingPublish.
\r
870 * @param[in] delay A delay before the operation job should be executed. Pass
\r
871 * `0` to execute ASAP.
\r
873 * @return #IOT_MQTT_SUCCESS or #IOT_MQTT_SCHEDULING_ERROR.
\r
875 IotMqttError_t _IotMqtt_ScheduleOperation( _mqttOperation_t * pOperation,
\r
876 IotTaskPoolRoutine_t jobRoutine,
\r
880 * @brief Search a list of MQTT operations pending responses using an operation
\r
881 * name and packet identifier. Removes a matching operation from the list if found.
\r
883 * @param[in] pMqttConnection The connection associated with the operation.
\r
884 * @param[in] type The operation type to look for.
\r
885 * @param[in] pPacketIdentifier A packet identifier to match. Pass `NULL` to ignore.
\r
887 * @return Pointer to any matching operation; `NULL` if no match was found.
\r
889 _mqttOperation_t * _IotMqtt_FindOperation( _mqttConnection_t * pMqttConnection,
\r
890 IotMqttOperationType_t type,
\r
891 const uint16_t * pPacketIdentifier );
\r
894 * @brief Notify of a completed MQTT operation.
\r
896 * @param[in] pOperation The MQTT operation which completed.
\r
898 * Depending on the parameters passed to a user-facing MQTT function, the
\r
899 * notification will cause @ref mqtt_function_wait to return or invoke a
\r
900 * user-provided callback.
\r
902 void _IotMqtt_Notify( _mqttOperation_t * pOperation );
\r
904 /*----------------- MQTT subscription management functions ------------------*/
\r
907 * @brief Add an array of subscriptions to the subscription manager.
\r
909 * @param[in] pMqttConnection The MQTT connection associated with the subscriptions.
\r
910 * @param[in] subscribePacketIdentifier Packet identifier for the subscriptions'
\r
911 * SUBSCRIBE packet.
\r
912 * @param[in] pSubscriptionList The first element in the array.
\r
913 * @param[in] subscriptionCount Number of elements in `pSubscriptionList`.
\r
915 * @return #IOT_MQTT_SUCCESS or #IOT_MQTT_NO_MEMORY.
\r
917 IotMqttError_t _IotMqtt_AddSubscriptions( _mqttConnection_t * pMqttConnection,
\r
918 uint16_t subscribePacketIdentifier,
\r
919 const IotMqttSubscription_t * pSubscriptionList,
\r
920 size_t subscriptionCount );
\r
923 * @brief Process a received PUBLISH from the server, invoking any subscription
\r
924 * callbacks that have a matching topic filter.
\r
926 * @param[in] pMqttConnection The MQTT connection associated with the received
\r
928 * @param[in] pCallbackParam The parameter to pass to a PUBLISH callback.
\r
930 void _IotMqtt_InvokeSubscriptionCallback( _mqttConnection_t * pMqttConnection,
\r
931 IotMqttCallbackParam_t * pCallbackParam );
\r
934 * @brief Remove a single subscription from the subscription manager by
\r
935 * packetIdentifier and order.
\r
937 * @param[in] pMqttConnection The MQTT connection associated with the subscriptions.
\r
938 * @param[in] packetIdentifier The packet identifier associated with the subscription's
\r
939 * SUBSCRIBE packet.
\r
940 * @param[in] order The order of the subscription in the SUBSCRIBE packet.
\r
941 * Pass #MQTT_REMOVE_ALL_SUBSCRIPTIONS to ignore order and remove all subscriptions for `packetIdentifier`.
\r
943 void _IotMqtt_RemoveSubscriptionByPacket( _mqttConnection_t * pMqttConnection,
\r
944 uint16_t packetIdentifier,
\r
948 * @brief Remove an array of subscriptions from the subscription manager by
\r
951 * @param[in] pMqttConnection The MQTT connection associated with the subscriptions.
\r
952 * @param[in] pSubscriptionList The first element in the array.
\r
953 * @param[in] subscriptionCount Number of elements in `pSubscriptionList`.
\r
955 void _IotMqtt_RemoveSubscriptionByTopicFilter( _mqttConnection_t * pMqttConnection,
\r
956 const IotMqttSubscription_t * pSubscriptionList,
\r
957 size_t subscriptionCount );
\r
959 /*------------------ MQTT connection management functions -------------------*/
\r
962 * @brief Attempt to increment the reference count of an MQTT connection.
\r
964 * @param[in] pMqttConnection The referenced MQTT connection.
\r
966 * @return `true` if the reference count was incremented; `false` otherwise. The
\r
967 * reference count will not be incremented for a disconnected connection.
\r
969 bool _IotMqtt_IncrementConnectionReferences( _mqttConnection_t * pMqttConnection );
\r
972 * @brief Decrement the reference count of an MQTT connection.
\r
974 * Also destroys an unreferenced MQTT connection.
\r
976 * @param[in] pMqttConnection The referenced MQTT connection.
\r
978 void _IotMqtt_DecrementConnectionReferences( _mqttConnection_t * pMqttConnection );
\r
981 * @brief Read the next available byte on a network connection.
\r
983 * @param[in] pNetworkConnection Reference to the network connection.
\r
984 * @param[in] pNetworkInterface Function pointers used to interact with the
\r
986 * @param[out] pIncomingByte The byte read from the network.
\r
988 * @return `true` if a byte was successfully received from the network; `false`
\r
991 bool _IotMqtt_GetNextByte( void * pNetworkConnection,
\r
992 const IotNetworkInterface_t * pNetworkInterface,
\r
993 uint8_t * pIncomingByte );
\r
996 * @brief Closes the network connection associated with an MQTT connection.
\r
998 * A network disconnect function must be set in the network interface for the
\r
999 * network connection to be closed.
\r
1001 * @param[in] disconnectReason A reason to pass to the connection's disconnect
\r
1003 * @param[in] pMqttConnection The MQTT connection with the network connection
\r
1006 void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason,
\r
1007 _mqttConnection_t * pMqttConnection );
\r
1009 #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1
\r
1012 * @brief Utility macro for creating serializer override selector functions
\r
1014 #define _SERIALIZER_OVERRIDE_SELECTOR( _funcType_t, _funcName, _defaultFunc, _serializerMember ) \
\r
1015 static _funcType_t _funcName( const IotMqttSerializer_t * pSerializer ); \
\r
1016 static _funcType_t _funcName( const IotMqttSerializer_t * pSerializer ) \
\r
1018 _funcType_t _returnValue = _defaultFunc; \
\r
1019 if( pSerializer != NULL ) \
\r
1021 if( pSerializer->_serializerMember != NULL ) \
\r
1023 _returnValue = pSerializer->_serializerMember; \
\r
1027 EMPTY_ELSE_MARKER; \
\r
1032 EMPTY_ELSE_MARKER; \
\r
1034 return _returnValue; \
\r
1036 #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */
\r
1038 #endif /* ifndef IOT_MQTT_INTERNAL_H_ */
\r