static void _mqttOperation_tryDestroy( void * pData );\r
\r
/**\r
- * @brief Create a keep-alive job for an MQTT connection.\r
+ * @brief Initialize the keep-alive operation for an MQTT connection.\r
*\r
* @param[in] pNetworkInfo User-provided network information for the new\r
* connection.\r
*\r
* @return `true` if the keep-alive job was successfully created; `false` otherwise.\r
*/\r
-static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,\r
- uint16_t keepAliveSeconds,\r
- _mqttConnection_t * pMqttConnection );\r
+static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,\r
+ uint16_t keepAliveSeconds,\r
+ _mqttConnection_t * pMqttConnection );\r
\r
/**\r
* @brief Creates a new MQTT connection and initializes its members.\r
size_t subscriptionCount,\r
uint32_t flags,\r
const IotMqttCallbackInfo_t * pCallbackInfo,\r
- IotMqttOperation_t * pOperationReference );\r
+ IotMqttOperation_t * const pOperationReference );\r
\r
/*-----------------------------------------------------------*/\r
\r
\r
/*-----------------------------------------------------------*/\r
\r
-static bool _createKeepAliveJob( const IotMqttNetworkInfo_t * pNetworkInfo,\r
- uint16_t keepAliveSeconds,\r
- _mqttConnection_t * pMqttConnection )\r
+static bool _createKeepAliveOperation( const IotMqttNetworkInfo_t * pNetworkInfo,\r
+ uint16_t keepAliveSeconds,\r
+ _mqttConnection_t * pMqttConnection )\r
{\r
bool status = true;\r
IotMqttError_t serializeStatus = IOT_MQTT_SUCCESS;\r
IotMqttError_t ( * serializePingreq )( uint8_t **,\r
size_t * ) = _IotMqtt_SerializePingreq;\r
\r
+ /* Set PINGREQ operation members. */\r
+ pMqttConnection->pingreq.u.operation.type = IOT_MQTT_PINGREQ;\r
+\r
/* Convert the keep-alive interval to milliseconds. */\r
- pMqttConnection->keepAliveMs = keepAliveSeconds * 1000;\r
- pMqttConnection->nextKeepAliveMs = pMqttConnection->keepAliveMs;\r
+ pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = keepAliveSeconds * 1000;\r
+ pMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs = keepAliveSeconds * 1000;\r
\r
/* Choose a PINGREQ serializer function. */\r
#if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
#endif /* if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1 */\r
\r
/* Generate a PINGREQ packet. */\r
- serializeStatus = serializePingreq( &( pMqttConnection->pPingreqPacket ),\r
- &( pMqttConnection->pingreqPacketSize ) );\r
+ serializeStatus = serializePingreq( &( pMqttConnection->pingreq.u.operation.pMqttPacket ),\r
+ &( pMqttConnection->pingreq.u.operation.packetSize ) );\r
\r
if( serializeStatus != IOT_MQTT_SUCCESS )\r
{\r
/* Create the task pool job that processes keep-alive. */\r
jobStatus = IotTaskPool_CreateJob( _IotMqtt_ProcessKeepAlive,\r
pMqttConnection,\r
- &( pMqttConnection->keepAliveJobStorage ),\r
- &( pMqttConnection->keepAliveJob ) );\r
+ &( pMqttConnection->pingreq.jobStorage ),\r
+ &( pMqttConnection->pingreq.job ) );\r
\r
/* Task pool job creation for a pre-allocated job should never fail.\r
* Abort the program if it does. */\r
/* Check if keep-alive is active for this connection. */\r
if( keepAliveSeconds != 0 )\r
{\r
- if( _createKeepAliveJob( pNetworkInfo,\r
- keepAliveSeconds,\r
- pMqttConnection ) == false )\r
+ if( _createKeepAliveOperation( pNetworkInfo,\r
+ keepAliveSeconds,\r
+ pMqttConnection ) == false )\r
{\r
IOT_SET_AND_GOTO_CLEANUP( false );\r
}\r
{\r
IotNetworkError_t networkStatus = IOT_NETWORK_SUCCESS;\r
\r
+ /* Default free packet function. */\r
+ void (* freePacket)( uint8_t * ) = _IotMqtt_FreePacket;\r
+\r
/* Clean up keep-alive if still allocated. */\r
- if( pMqttConnection->keepAliveMs != 0 )\r
+ if( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
{\r
IotLogDebug( "(MQTT connection %p) Cleaning up keep-alive.", pMqttConnection );\r
\r
- _IotMqtt_FreePacket( pMqttConnection->pPingreqPacket );\r
+ /* Choose a function to free the packet. */\r
+ #if IOT_MQTT_ENABLE_SERIALIZER_OVERRIDES == 1\r
+ if( pMqttConnection->pSerializer != NULL )\r
+ {\r
+ if( pMqttConnection->pSerializer->freePacket != NULL )\r
+ {\r
+ freePacket = pMqttConnection->pSerializer->freePacket;\r
+ }\r
+ }\r
+ #endif\r
+\r
+ freePacket( pMqttConnection->pingreq.u.operation.pMqttPacket );\r
\r
/* Clear data about the keep-alive. */\r
- pMqttConnection->keepAliveMs = 0;\r
- pMqttConnection->pPingreqPacket = NULL;\r
- pMqttConnection->pingreqPacketSize = 0;\r
+ pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs = 0;\r
+ pMqttConnection->pingreq.u.operation.pMqttPacket = NULL;\r
+ pMqttConnection->pingreq.u.operation.packetSize = 0;\r
\r
/* Decrement reference count. */\r
pMqttConnection->references--;\r
/* A connection to be destroyed should have no keep-alive and at most 1\r
* reference. */\r
IotMqtt_Assert( pMqttConnection->references <= 1 );\r
- IotMqtt_Assert( pMqttConnection->keepAliveMs == 0 );\r
- IotMqtt_Assert( pMqttConnection->pPingreqPacket == NULL );\r
- IotMqtt_Assert( pMqttConnection->pingreqPacketSize == 0 );\r
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs == 0 );\r
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.pMqttPacket == NULL );\r
+ IotMqtt_Assert( pMqttConnection->pingreq.u.operation.packetSize == 0 );\r
\r
/* Remove all subscriptions. */\r
IotMutex_Lock( &( pMqttConnection->subscriptionMutex ) );\r
size_t subscriptionCount,\r
uint32_t flags,\r
const IotMqttCallbackInfo_t * pCallbackInfo,\r
- IotMqttOperation_t * pOperationReference )\r
+ IotMqttOperation_t * const pOperationReference )\r
{\r
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
_mqttOperation_t * pSubscriptionOperation = NULL;\r
\r
/* Check the subscription operation data and set the operation type. */\r
IotMqtt_Assert( pSubscriptionOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
- IotMqtt_Assert( pSubscriptionOperation->u.operation.retry.limit == 0 );\r
+ IotMqtt_Assert( pSubscriptionOperation->u.operation.periodic.retry.limit == 0 );\r
pSubscriptionOperation->u.operation.type = operation;\r
\r
/* Generate a subscription packet from the subscription list. */\r
IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
== IOT_MQTT_FLAG_WAITABLE );\r
- IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );\r
+ IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );\r
\r
/* Set the operation type. */\r
pOperation->u.operation.type = IOT_MQTT_CONNECT;\r
if( status == IOT_MQTT_SUCCESS )\r
{\r
/* Check if a keep-alive job should be scheduled. */\r
- if( pNewMqttConnection->keepAliveMs != 0 )\r
+ if( pNewMqttConnection->pingreq.u.operation.periodic.ping.keepAliveMs != 0 )\r
{\r
IotLogDebug( "Scheduling first MQTT keep-alive job." );\r
\r
taskPoolStatus = IotTaskPool_ScheduleDeferred( IOT_SYSTEM_TASKPOOL,\r
- pNewMqttConnection->keepAliveJob,\r
- pNewMqttConnection->nextKeepAliveMs );\r
+ pNewMqttConnection->pingreq.job,\r
+ pNewMqttConnection->pingreq.u.operation.periodic.ping.nextPeriodMs );\r
\r
if( taskPoolStatus != IOT_TASKPOOL_SUCCESS )\r
{\r
IotMqtt_Assert( pOperation->u.operation.status == IOT_MQTT_STATUS_PENDING );\r
IotMqtt_Assert( ( pOperation->u.operation.flags & IOT_MQTT_FLAG_WAITABLE )\r
== IOT_MQTT_FLAG_WAITABLE );\r
- IotMqtt_Assert( pOperation->u.operation.retry.limit == 0 );\r
+ IotMqtt_Assert( pOperation->u.operation.periodic.retry.limit == 0 );\r
\r
/* Set the operation type. */\r
pOperation->u.operation.type = IOT_MQTT_DISCONNECT;\r
size_t subscriptionCount,\r
uint32_t flags,\r
const IotMqttCallbackInfo_t * pCallbackInfo,\r
- IotMqttOperation_t * pSubscribeOperation )\r
+ IotMqttOperation_t * const pSubscribeOperation )\r
{\r
return _subscriptionCommon( IOT_MQTT_SUBSCRIBE,\r
mqttConnection,\r
size_t subscriptionCount,\r
uint32_t flags,\r
const IotMqttCallbackInfo_t * pCallbackInfo,\r
- IotMqttOperation_t * pUnsubscribeOperation )\r
+ IotMqttOperation_t * const pUnsubscribeOperation )\r
{\r
return _subscriptionCommon( IOT_MQTT_UNSUBSCRIBE,\r
mqttConnection,\r
const IotMqttPublishInfo_t * pPublishInfo,\r
uint32_t flags,\r
const IotMqttCallbackInfo_t * pCallbackInfo,\r
- IotMqttOperation_t * pPublishOperation )\r
+ IotMqttOperation_t * const pPublishOperation )\r
{\r
IOT_FUNCTION_ENTRY( IotMqttError_t, IOT_MQTT_SUCCESS );\r
_mqttOperation_t * pOperation = NULL;\r
/* A QoS 0 PUBLISH may not be retried. */\r
if( pPublishInfo->qos != IOT_MQTT_QOS_0 )\r
{\r
- pOperation->u.operation.retry.limit = pPublishInfo->retryLimit;\r
- pOperation->u.operation.retry.nextPeriod = pPublishInfo->retryMs;\r
+ pOperation->u.operation.periodic.retry.limit = pPublishInfo->retryLimit;\r
+ pOperation->u.operation.periodic.retry.nextPeriodMs = pPublishInfo->retryMs;\r
}\r
else\r
{\r