/* Platform layer includes. */\r
#include "platform/iot_threads.h"\r
\r
+/* Atomics include. */\r
+#include "iot_atomic.h"\r
+\r
/*-----------------------------------------------------------*/\r
\r
/**\r
static void _sendPuback( _mqttConnection_t * pMqttConnection,\r
uint16_t packetIdentifier );\r
\r
+/**\r
+ * @brief Flush a packet from the stream of incoming data.\r
+ *\r
+ * This function is called when memory for a packet cannot be allocated. The\r
+ * packet is flushed from the stream of incoming data so that the next packet\r
+ * may be read.\r
+ *\r
+ * @param[in] pNetworkConnection Network connection to use for receive, which\r
+ * may be different from the network connection associated with the MQTT connection.\r
+ * @param[in] pMqttConnection The associated MQTT connection.\r
+ * @param[in] length The length of the packet to flush.\r
+ */\r
+static void _flushPacket( void * pNetworkConnection,\r
+ const _mqttConnection_t * pMqttConnection,\r
+ size_t length );\r
+\r
/*-----------------------------------------------------------*/\r
\r
static bool _incomingPacketValid( uint8_t packetType )\r
\r
if( pIncomingPacket->pRemainingData == NULL )\r
{\r
+ IotLogError( "(MQTT connection %p) Failed to allocate buffer of length "\r
+ "%lu for incoming packet type %lu.",\r
+ pMqttConnection,\r
+ ( unsigned long ) pIncomingPacket->remainingLength,\r
+ ( unsigned long ) pIncomingPacket->type );\r
+\r
+ _flushPacket( pNetworkConnection, pMqttConnection, pIncomingPacket->remainingLength );\r
+\r
IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
}\r
else\r
\r
if( status == IOT_MQTT_SUCCESS )\r
{\r
- IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
-\r
- if( pMqttConnection->keepAliveFailure == false )\r
+ if( Atomic_CompareAndSwap_u32( &( pMqttConnection->pingreq.u.operation.periodic.ping.failure ),\r
+ 0,\r
+ 1 ) == 1 )\r
{\r
- IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",\r
- pMqttConnection );\r
+ IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",\r
+ pMqttConnection );\r
}\r
else\r
{\r
- IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",\r
- pMqttConnection );\r
-\r
- pMqttConnection->keepAliveFailure = false;\r
+ IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",\r
+ pMqttConnection );\r
}\r
-\r
- IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
}\r
else\r
{\r
static void _sendPuback( _mqttConnection_t * pMqttConnection,\r
uint16_t packetIdentifier )\r
{\r
- IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;\r
- uint8_t * pPuback = NULL;\r
- size_t pubackSize = 0, bytesSent = 0;\r
+ IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
+ _mqttOperation_t * pPubackOperation = NULL;\r
\r
- /* Default PUBACK serializer and free packet functions. */\r
+ /* Default PUBACK serializer function. */\r
IotMqttError_t ( * serializePuback )( uint16_t,\r
uint8_t **,\r
size_t * ) = _IotMqtt_SerializePuback;\r
- void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;\r
\r
IotLogDebug( "(MQTT connection %p) Sending PUBACK for received PUBLISH %hu.",\r
pMqttConnection,\r
EMPTY_ELSE_MARKER;\r
}\r
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
- #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
- if( pMqttConnection->pSerializer != NULL )\r
- {\r
- if( pMqttConnection->pSerializer->freePacket != NULL )\r
- {\r
- freePacket = pMqttConnection->pSerializer->freePacket;\r
- }\r
- else\r
- {\r
- EMPTY_ELSE_MARKER;\r
- }\r
- }\r
- else\r
- {\r
- EMPTY_ELSE_MARKER;\r
- }\r
- #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+ /* Create a PUBACK operation. */\r
+ status = _IotMqtt_CreateOperation( pMqttConnection,\r
+ 0,\r
+ NULL,\r
+ &pPubackOperation );\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+\r
+ /* Set the operation type. */\r
+ pPubackOperation->u.operation.type = IOT_MQTT_PUBACK;\r
\r
/* Generate a PUBACK packet from the packet identifier. */\r
- serializeStatus = serializePuback( packetIdentifier,\r
- &pPuback,\r
- &pubackSize );\r
+ status = serializePuback( packetIdentifier,\r
+ &( pPubackOperation->u.operation.pMqttPacket ),\r
+ &( pPubackOperation->u.operation.packetSize ) );\r
+\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ IOT_GOTO_CLEANUP();\r
+ }\r
+\r
+ /* Add the PUBACK operation to the send queue for network transmission. */\r
+ status = _IotMqtt_ScheduleOperation( pPubackOperation,\r
+ _IotMqtt_ProcessSend,\r
+ 0 );\r
\r
- if( serializeStatus != IOT_MQTT_SUCCESS )\r
+ if( status != IOT_MQTT_SUCCESS )\r
{\r
- IotLogWarn( "(MQTT connection %p) Failed to generate PUBACK packet for "\r
- "received PUBLISH %hu.",\r
- pMqttConnection,\r
- packetIdentifier );\r
+ IotLogError( "(MQTT connection %p) Failed to enqueue PUBACK for sending.",\r
+ pMqttConnection );\r
+\r
+ IOT_GOTO_CLEANUP();\r
}\r
else\r
{\r
- bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,\r
- pPuback,\r
- pubackSize );\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+\r
+ /* Clean up on error. */\r
+ IOT_FUNCTION_CLEANUP_BEGIN();\r
\r
- if( bytesSent != pubackSize )\r
+ if( status != IOT_MQTT_SUCCESS )\r
+ {\r
+ if( pPubackOperation != NULL )\r
{\r
- IotLogWarn( "(MQTT connection %p) Failed to send PUBACK for received"\r
- " PUBLISH %hu.",\r
- pMqttConnection,\r
- packetIdentifier );\r
+ _IotMqtt_DestroyOperation( pPubackOperation );\r
}\r
else\r
{\r
- IotLogDebug( "(MQTT connection %p) PUBACK for received PUBLISH %hu sent.",\r
- pMqttConnection,\r
- packetIdentifier );\r
+ EMPTY_ELSE_MARKER;\r
}\r
+ }\r
+ else\r
+ {\r
+ EMPTY_ELSE_MARKER;\r
+ }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
\r
- freePacket( pPuback );\r
+static void _flushPacket( void * pNetworkConnection,\r
+ const _mqttConnection_t * pMqttConnection,\r
+ size_t length )\r
+{\r
+ size_t bytesFlushed = 0;\r
+ uint8_t receivedByte = 0;\r
+\r
+ for( bytesFlushed = 0; bytesFlushed < length; bytesFlushed++ )\r
+ {\r
+ ( void ) _IotMqtt_GetNextByte( pNetworkConnection,\r
+ pMqttConnection->pNetworkInterface,\r
+ &receivedByte );\r
}\r
}\r
\r
IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
IotNetworkError_t closeStatus = IOT_NETWORK_SUCCESS;\r
IotMqttCallbackParam_t callbackParam = { .u.message = { 0 } };\r
+ void * pNetworkConnection = NULL, * pDisconnectCallbackContext = NULL;\r
+\r
+ /* Disconnect callback function. */\r
+ void ( * disconnectCallback )( void *,\r
+ IotMqttCallbackParam_t * ) = NULL;\r
+\r
+ /* Network close function. */\r
+ IotNetworkError_t ( * closeConnection) ( void * ) = NULL;\r
+\r
+ /* Default free packet function. */\r
+ void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;\r
\r
/* Mark the MQTT connection as disconnected and the keep-alive as failed. */\r
IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
pMqttConnection->disconnected = true;\r
- pMqttConnection->keepAliveFailure = true;\r
\r
- if( pMqttConnection->keepAliveMs != 0 )\r
+ if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
{\r
/* Keep-alive must have a PINGREQ allocated. */\r
- IotMqtt_Assert( pMqttConnection->pPingreqPacket != NULL );\r
- IotMqtt_Assert( pMqttConnection->pingreqPacketSize != 0 );\r
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket != NULL );\r
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize != 0 );\r
\r
/* PINGREQ provides a reference to the connection, so reference count must\r
* be nonzero. */\r
\r
/* Attempt to cancel the keep-alive job. */\r
taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
- pMqttConnection->keepAliveJob,\r
+ pMqttConnection->pingreq.job,\r
NULL );\r
\r
/* If the keep-alive job was not canceled, it must be already executing.\r
* the executing keep-alive job will clean up itself. */\r
if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
{\r
- /* Clean up PINGREQ packet and job. */\r
- _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );\r
+ /* Choose a function to free the packet. */\r
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+ if( pMqttConnection->pSerializer != NULL )\r
+ {\r
+ if( pMqttConnection->pSerializer->freePacket != NULL )\r
+ {\r
+ freePacket = pMqttConnection->pSerializer->freePacket;\r
+ }\r
+ }\r
+ #endif\r
+\r
+ freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );\r
\r
/* Clear data about the keep-alive. */\r
- pMqttConnection->keepAliveMs = 0;\r
- pMqttConnection->pPingreqPacket = NULL;\r
- pMqttConnection->pingreqPacketSize = 0;\r
+ pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;\r
+ pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;\r
+ pMqttConnection->pingreq.u.operation.packetSize = 0;\r
\r
/* Keep-alive is cleaned up; decrement reference count. Since this\r
* function must be followed with a call to DISCONNECT, a check to\r
EMPTY_ELSE_MARKER;\r
}\r
\r
+ /* Copy the function pointers and contexts, as the MQTT connection may be\r
+ * modified after the mutex is released. */\r
+ disconnectCallback = pMqttConnection->disconnectCallback.function;\r
+ pDisconnectCallbackContext = pMqttConnection->disconnectCallback.pCallbackContext;\r
+\r
+ closeConnection = pMqttConnection->pNetworkInterface->close;\r
+ pNetworkConnection = pMqttConnection->pNetworkConnection;\r
+\r
IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
\r
/* Close the network connection. */\r
- if( pMqttConnection->pNetworkInterface->close != NULL )\r
+ if( closeConnection != NULL )\r
{\r
- closeStatus = pMqttConnection->pNetworkInterface->close( pMqttConnection->pNetworkConnection );\r
+ closeStatus = closeConnection( pNetworkConnection );\r
\r
if( closeStatus == IOT_NETWORK_SUCCESS )\r
{\r
}\r
\r
/* Invoke the disconnect callback. */\r
- if( pMqttConnection->disconnectCallback.function != NULL )\r
+ if( disconnectCallback != NULL )\r
{\r
/* Set the members of the callback parameter. */\r
callbackParam.mqttConnection = pMqttConnection;\r
callbackParam.u.disconnectReason = disconnectReason;\r
\r
- pMqttConnection->disconnectCallback.function( pMqttConnection->disconnectCallback.pCallbackContext,\r
- &callbackParam );\r
+ disconnectCallback( pDisconnectCallbackContext,\r
+ &callbackParam );\r
}\r
else\r
{\r