]> git.sur5r.net Git - freertos/blobdiff - FreeRTOS-Plus/Source/FreeRTOS-IoT-Libraries/c_sdk/standard/mqtt/src/iot_mqtt_network.c
Files as per 190725_FreeRTOS_IoT_Libs_Task_Pool_and_MQTT_Preview interim release.
[freertos] / FreeRTOS-Plus / Source / FreeRTOS-IoT-Libraries / c_sdk / standard / mqtt / src / iot_mqtt_network.c
index 169a292df791479bd14cad9e48514ad49929348b..42843d28c5e2809c323816e0641037705f857728 100644 (file)
@@ -43,6 +43,9 @@
 /* Platform layer includes. */\r
 #include "platform/iot_threads.h"\r
 \r
+/* Atomics include. */\r
+#include "iot_atomic.h"\r
+\r
 /*-----------------------------------------------------------*/\r
 \r
 /**\r
@@ -89,6 +92,22 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne
 static void _sendPuback( _mqttConnection_t * pMqttConnection,\r
                          uint16_t packetIdentifier );\r
 \r
+/**\r
+ * @brief Flush a packet from the stream of incoming data.\r
+ *\r
+ * This function is called when memory for a packet cannot be allocated. The\r
+ * packet is flushed from the stream of incoming data so that the next packet\r
+ * may be read.\r
+ *\r
+ * @param[in] pNetworkConnection Network connection to use for receive, which\r
+ * may be different from the network connection associated with the MQTT connection.\r
+ * @param[in] pMqttConnection The associated MQTT connection.\r
+ * @param[in] length The length of the packet to flush.\r
+ */\r
+static void _flushPacket( void * pNetworkConnection,\r
+                          const _mqttConnection_t * pMqttConnection,\r
+                          size_t length );\r
+\r
 /*-----------------------------------------------------------*/\r
 \r
 static bool _incomingPacketValid( uint8_t packetType )\r
@@ -201,6 +220,14 @@ static IotMqttError_t _getIncomingPacket( void * pNetworkConnection,
 \r
         if( pIncomingPacket->pRemainingData == NULL )\r
         {\r
+            IotLogError( "(MQTT connection %p) Failed to allocate buffer of length "\r
+                         "%lu for incoming packet type %lu.",\r
+                         pMqttConnection,\r
+                         ( unsigned long ) pIncomingPacket->remainingLength,\r
+                         ( unsigned long ) pIncomingPacket->type );\r
+\r
+            _flushPacket( pNetworkConnection, pMqttConnection, pIncomingPacket->remainingLength );\r
+\r
             IOT_SET_AND_GOTO_CLEANUP( IOT_MQTT_NO_MEMORY );\r
         }\r
         else\r
@@ -593,22 +620,18 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne
 \r
             if( status == IOT_MQTT_SUCCESS )\r
             {\r
-                IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
-\r
-                if( pMqttConnection->keepAliveFailure == false )\r
+                if( Atomic_CompareAndSwap_u32( &( pMqttConnection->pingreq.u.operation.periodic.ping.failure ),\r
+                                               0,\r
+                                               1 ) == 1 )\r
                 {\r
-                    IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",\r
-                                pMqttConnection );\r
+                    IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",\r
+                                 pMqttConnection );\r
                 }\r
                 else\r
                 {\r
-                    IotLogDebug( "(MQTT connection %p) PINGRESP successfully parsed.",\r
-                                 pMqttConnection );\r
-\r
-                    pMqttConnection->keepAliveFailure = false;\r
+                    IotLogWarn( "(MQTT connection %p) Unexpected PINGRESP received.",\r
+                                pMqttConnection );\r
                 }\r
-\r
-                IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
             }\r
             else\r
             {\r
@@ -637,15 +660,13 @@ static IotMqttError_t _deserializeIncomingPacket( _mqttConnection_t * pMqttConne
 static void _sendPuback( _mqttConnection_t * pMqttConnection,\r
                          uint16_t packetIdentifier )\r
 {\r
-    IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;\r
-    uint8_t * pPuback = NULL;\r
-    size_t pubackSize = 0, bytesSent = 0;\r
+    IotMqttError_t status = IOT_MQTT_STATUS_PENDING;\r
+    _mqttOperation_t * pPubackOperation = NULL;\r
 \r
-    /* Default PUBACK serializer and free packet functions. */\r
+    /* Default PUBACK serializer function. */\r
     IotMqttError_t ( * serializePuback )( uint16_t,\r
                                           uint8_t **,\r
                                           size_t * ) = _IotMqtt_SerializePuback;\r
-    void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;\r
 \r
     IotLogDebug( "(MQTT connection %p) Sending PUBACK for received PUBLISH %hu.",\r
                  pMqttConnection,\r
@@ -669,57 +690,82 @@ static void _sendPuback( _mqttConnection_t * pMqttConnection,
             EMPTY_ELSE_MARKER;\r
         }\r
     #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
-    #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
-        if( pMqttConnection->pSerializer != NULL )\r
-        {\r
-            if( pMqttConnection->pSerializer->freePacket != NULL )\r
-            {\r
-                freePacket = pMqttConnection->pSerializer->freePacket;\r
-            }\r
-            else\r
-            {\r
-                EMPTY_ELSE_MARKER;\r
-            }\r
-        }\r
-        else\r
-        {\r
-            EMPTY_ELSE_MARKER;\r
-        }\r
-    #endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
+\r
+    /* Create a PUBACK operation. */\r
+    status = _IotMqtt_CreateOperation( pMqttConnection,\r
+                                       0,\r
+                                       NULL,\r
+                                       &pPubackOperation );\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        IOT_GOTO_CLEANUP();\r
+    }\r
+\r
+    /* Set the operation type. */\r
+    pPubackOperation->u.operation.type = IOT_MQTT_PUBACK;\r
 \r
     /* Generate a PUBACK packet from the packet identifier. */\r
-    serializeStatus = serializePuback( packetIdentifier,\r
-                                       &pPuback,\r
-                                       &pubackSize );\r
+    status = serializePuback( packetIdentifier,\r
+                              &( pPubackOperation->u.operation.pMqttPacket ),\r
+                              &( pPubackOperation->u.operation.packetSize ) );\r
+\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        IOT_GOTO_CLEANUP();\r
+    }\r
+\r
+    /* Add the PUBACK operation to the send queue for network transmission. */\r
+    status = _IotMqtt_ScheduleOperation( pPubackOperation,\r
+                                         _IotMqtt_ProcessSend,\r
+                                         0 );\r
 \r
-    if( serializeStatus != IOT_MQTT_SUCCESS )\r
+    if( status != IOT_MQTT_SUCCESS )\r
     {\r
-        IotLogWarn( "(MQTT connection %p) Failed to generate PUBACK packet for "\r
-                    "received PUBLISH %hu.",\r
-                    pMqttConnection,\r
-                    packetIdentifier );\r
+        IotLogError( "(MQTT connection %p) Failed to enqueue PUBACK for sending.",\r
+                     pMqttConnection );\r
+\r
+        IOT_GOTO_CLEANUP();\r
     }\r
     else\r
     {\r
-        bytesSent = pMqttConnection->pNetworkInterface->send( pMqttConnection->pNetworkConnection,\r
-                                                              pPuback,\r
-                                                              pubackSize );\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+\r
+    /* Clean up on error. */\r
+    IOT_FUNCTION_CLEANUP_BEGIN();\r
 \r
-        if( bytesSent != pubackSize )\r
+    if( status != IOT_MQTT_SUCCESS )\r
+    {\r
+        if( pPubackOperation != NULL )\r
         {\r
-            IotLogWarn( "(MQTT connection %p) Failed to send PUBACK for received"\r
-                        " PUBLISH %hu.",\r
-                        pMqttConnection,\r
-                        packetIdentifier );\r
+            _IotMqtt_DestroyOperation( pPubackOperation );\r
         }\r
         else\r
         {\r
-            IotLogDebug( "(MQTT connection %p) PUBACK for received PUBLISH %hu sent.",\r
-                         pMqttConnection,\r
-                         packetIdentifier );\r
+            EMPTY_ELSE_MARKER;\r
         }\r
+    }\r
+    else\r
+    {\r
+        EMPTY_ELSE_MARKER;\r
+    }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
 \r
-        freePacket( pPuback );\r
+static void _flushPacket( void * pNetworkConnection,\r
+                          const _mqttConnection_t * pMqttConnection,\r
+                          size_t length )\r
+{\r
+    size_t bytesFlushed = 0;\r
+    uint8_t receivedByte = 0;\r
+\r
+    for( bytesFlushed = 0; bytesFlushed < length; bytesFlushed++ )\r
+    {\r
+        ( void ) _IotMqtt_GetNextByte( pNetworkConnection,\r
+                                       pMqttConnection->pNetworkInterface,\r
+                                       &receivedByte );\r
     }\r
 }\r
 \r
@@ -761,17 +807,27 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
     IotTaskPoolError_t taskPoolStatus = IOT_TASKPOOL_SUCCESS;\r
     IotNetworkError_t closeStatus = IOT_NETWORK_SUCCESS;\r
     IotMqttCallbackParam_t callbackParam = { .u.message = { 0 } };\r
+    void * pNetworkConnection = NULL, * pDisconnectCallbackContext = NULL;\r
+\r
+    /* Disconnect callback function. */\r
+    void ( * disconnectCallback )( void *,\r
+                                   IotMqttCallbackParam_t * ) = NULL;\r
+\r
+    /* Network close function. */\r
+    IotNetworkError_t ( * closeConnection) ( void * ) = NULL;\r
+\r
+    /* Default free packet function. */\r
+    void ( * freePacket )( uint8_t * ) = _IotMqtt_FreePacket;\r
 \r
     /* Mark the MQTT connection as disconnected and the keep-alive as failed. */\r
     IotMutex_Lock( &( pMqttConnection->referencesMutex ) );\r
     pMqttConnection->disconnected = true;\r
-    pMqttConnection->keepAliveFailure = true;\r
 \r
-    if( pMqttConnection->keepAliveMs != 0 )\r
+    if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
     {\r
         /* Keep-alive must have a PINGREQ allocated. */\r
-        IotMqtt_Assert( pMqttConnection->pPingreqPacket != NULL );\r
-        IotMqtt_Assert( pMqttConnection->pingreqPacketSize != 0 );\r
+        IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket != NULL );\r
+        IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize != 0 );\r
 \r
         /* PINGREQ provides a reference to the connection, so reference count must\r
          * be nonzero. */\r
@@ -779,7 +835,7 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
 \r
         /* Attempt to cancel the keep-alive job. */\r
         taskPoolStatus = IotTaskPool_TryCancel( IOT_SYSTEM_TASKPOOL,\r
-                                                pMqttConnection->keepAliveJob,\r
+                                                pMqttConnection->pingreq.job,\r
                                                 NULL );\r
 \r
         /* If the keep-alive job was not canceled, it must be already executing.\r
@@ -791,13 +847,23 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
          * the executing keep-alive job will clean up itself. */\r
         if( taskPoolStatus == IOT_TASKPOOL_SUCCESS )\r
         {\r
-            /* Clean up PINGREQ packet and job. */\r
-            _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );\r
+            /* Choose a function to free the packet. */\r
+            #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+                if( pMqttConnection->pSerializer != NULL )\r
+                {\r
+                    if( pMqttConnection->pSerializer->freePacket != NULL )\r
+                    {\r
+                        freePacket = pMqttConnection->pSerializer->freePacket;\r
+                    }\r
+                }\r
+            #endif\r
+\r
+            freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );\r
 \r
             /* Clear data about the keep-alive. */\r
-            pMqttConnection->keepAliveMs = 0;\r
-            pMqttConnection->pPingreqPacket = NULL;\r
-            pMqttConnection->pingreqPacketSize = 0;\r
+            pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;\r
+            pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;\r
+            pMqttConnection->pingreq.u.operation.packetSize = 0;\r
 \r
             /* Keep-alive is cleaned up; decrement reference count. Since this\r
              * function must be followed with a call to DISCONNECT, a check to\r
@@ -817,12 +883,20 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
         EMPTY_ELSE_MARKER;\r
     }\r
 \r
+    /* Copy the function pointers and contexts, as the MQTT connection may be\r
+     * modified after the mutex is released. */\r
+    disconnectCallback = pMqttConnection->disconnectCallback.function;\r
+    pDisconnectCallbackContext = pMqttConnection->disconnectCallback.pCallbackContext;\r
+\r
+    closeConnection = pMqttConnection->pNetworkInterface->close;\r
+    pNetworkConnection = pMqttConnection->pNetworkConnection;\r
+\r
     IotMutex_Unlock( &( pMqttConnection->referencesMutex ) );\r
 \r
     /* Close the network connection. */\r
-    if( pMqttConnection->pNetworkInterface->close != NULL )\r
+    if( closeConnection != NULL )\r
     {\r
-        closeStatus = pMqttConnection->pNetworkInterface->close( pMqttConnection->pNetworkConnection );\r
+        closeStatus = closeConnection( pNetworkConnection );\r
 \r
         if( closeStatus == IOT_NETWORK_SUCCESS )\r
         {\r
@@ -842,14 +916,14 @@ void _IotMqtt_CloseNetworkConnection( IotMqttDisconnectReason_t disconnectReason
     }\r
 \r
     /* Invoke the disconnect callback. */\r
-    if( pMqttConnection->disconnectCallback.function != NULL )\r
+    if( disconnectCallback != NULL )\r
     {\r
         /* Set the members of the callback parameter. */\r
         callbackParam.mqttConnection = pMqttConnection;\r
         callbackParam.u.disconnectReason = disconnectReason;\r
 \r
-        pMqttConnection->disconnectCallback.function( pMqttConnection->disconnectCallback.pCallbackContext,\r
-                                                      &callbackParam );\r
+        disconnectCallback( pDisconnectCallbackContext,\r
+                            &callbackParam );\r
     }\r
     else\r
     {\r