2 * AWS IoT Jobs V1.0.0
\r
3 * Copyright (C) 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
24 * @file aws_iot_jobs_subscription.c
\r
25 * @brief Implements functions for interacting with the Jobs library's
\r
26 * subscription list.
\r
29 /* The config header is always included first. */
\r
30 #include "iot_config.h"
\r
32 /* Standard includes. */
\r
35 /* Jobs internal include. */
\r
36 #include "private/aws_iot_jobs_internal.h"
\r
38 /* Error handling include. */
\r
39 #include "iot_error.h"
\r
41 /* Platform layer includes. */
\r
42 #include "platform/iot_threads.h"
\r
45 #include "iot_mqtt.h"
\r
47 /*-----------------------------------------------------------*/
\r
50 * @brief Match two #_jobsSubscription_t by Thing Name.
\r
52 * @param[in] pSubscriptionLink Pointer to the link member of a #_jobsSubscription_t
\r
53 * containing the Thing Name to check.
\r
54 * @param[in] pMatch Pointer to a `AwsIotThingName_t`.
\r
56 * @return `true` if the Thing Names match; `false` otherwise.
\r
58 static bool _jobsSubscription_match( const IotLink_t * pSubscriptionLink,
\r
61 /*-----------------------------------------------------------*/
\r
64 * @brief List of active Jobs subscriptions objects.
\r
66 IotListDouble_t _AwsIotJobsSubscriptions = { 0 };
\r
69 * @brief Protects #_AwsIotJobsSubscriptions from concurrent access.
\r
71 IotMutex_t _AwsIotJobsSubscriptionsMutex;
\r
73 /*-----------------------------------------------------------*/
\r
75 static bool _jobsSubscription_match( const IotLink_t * pSubscriptionLink,
\r
80 /* Because this function is called from a container function, the given link
\r
81 * must never be NULL. */
\r
82 AwsIotJobs_Assert( pSubscriptionLink != NULL );
\r
84 const _jobsSubscription_t * pSubscription = IotLink_Container( _jobsSubscription_t,
\r
87 const AwsIotThingName_t * pThingName = ( AwsIotThingName_t * ) pMatch;
\r
89 if( pThingName->thingNameLength == pSubscription->thingNameLength )
\r
91 /* Check for matching Thing Names. */
\r
92 match = ( strncmp( pThingName->pThingName,
\r
93 pSubscription->pThingName,
\r
94 pThingName->thingNameLength ) == 0 );
\r
100 /*-----------------------------------------------------------*/
\r
102 _jobsSubscription_t * _AwsIotJobs_FindSubscription( const char * pThingName,
\r
103 size_t thingNameLength,
\r
104 bool createIfNotFound )
\r
106 _jobsSubscription_t * pSubscription = NULL;
\r
107 IotLink_t * pSubscriptionLink = NULL;
\r
108 AwsIotThingName_t thingName = { 0 };
\r
110 thingName.pThingName = pThingName;
\r
111 thingName.thingNameLength = thingNameLength;
\r
113 /* Search the list for an existing subscription for Thing Name. */
\r
114 pSubscriptionLink = IotListDouble_FindFirstMatch( &( _AwsIotJobsSubscriptions ),
\r
116 _jobsSubscription_match,
\r
119 /* Check if a subscription was found. */
\r
120 if( pSubscriptionLink == NULL )
\r
122 if( createIfNotFound == true )
\r
124 /* No subscription found. Allocate a new subscription. */
\r
125 pSubscription = AwsIotJobs_MallocSubscription( sizeof( _jobsSubscription_t ) + thingNameLength );
\r
127 if( pSubscription != NULL )
\r
129 /* Clear the new subscription. */
\r
130 ( void ) memset( pSubscription, 0x00, sizeof( _jobsSubscription_t ) + thingNameLength );
\r
132 /* Set the Thing Name length and copy the Thing Name into the new subscription. */
\r
133 pSubscription->thingNameLength = thingNameLength;
\r
134 ( void ) memcpy( pSubscription->pThingName, pThingName, thingNameLength );
\r
136 /* Add the new subscription to the subscription list. */
\r
137 IotListDouble_InsertHead( &( _AwsIotJobsSubscriptions ),
\r
138 &( pSubscription->link ) );
\r
140 IotLogDebug( "Created new Jobs subscriptions object for %.*s.",
\r
146 IotLogError( "Failed to allocate memory for %.*s Jobs subscriptions.",
\r
154 IotLogDebug( "Found existing Jobs subscriptions object for %.*s.",
\r
158 pSubscription = IotLink_Container( _jobsSubscription_t, pSubscriptionLink, link );
\r
161 return pSubscription;
\r
164 /*-----------------------------------------------------------*/
\r
166 void _AwsIotJobs_RemoveSubscription( _jobsSubscription_t * pSubscription,
\r
167 _jobsSubscription_t ** pRemovedSubscription )
\r
169 IOT_FUNCTION_ENTRY( bool, true );
\r
170 int32_t i = 0, callbackIndex = 0;
\r
172 IotLogDebug( "Checking if subscription object for %.*s can be removed.",
\r
173 pSubscription->thingNameLength,
\r
174 pSubscription->pThingName );
\r
176 /* Check for active operations. If any Jobs operation's subscription
\r
177 * reference count is not 0, then the subscription cannot be removed. */
\r
178 for( i = 0; i < JOBS_OPERATION_COUNT; i++ )
\r
180 if( pSubscription->operationReferences[ i ] > 0 )
\r
182 IotLogDebug( "Reference count %ld for %.*s subscription object. "
\r
183 "Subscription cannot be removed yet.",
\r
184 ( long int ) pSubscription->operationReferences[ i ],
\r
185 pSubscription->thingNameLength,
\r
186 pSubscription->pThingName );
\r
188 IOT_SET_AND_GOTO_CLEANUP( false );
\r
190 else if( pSubscription->operationReferences[ i ] == AWS_IOT_PERSISTENT_SUBSCRIPTION )
\r
192 IotLogDebug( "Subscription object for %.*s has persistent subscriptions. "
\r
193 "Subscription will not be removed.",
\r
194 pSubscription->thingNameLength,
\r
195 pSubscription->pThingName );
\r
197 IOT_SET_AND_GOTO_CLEANUP( false );
\r
201 /* Check for active subscriptions. If any Jobs callbacks are active, then the
\r
202 * subscription cannot be removed. */
\r
203 if( pSubscription->callbackReferences > 0 )
\r
205 IotLogDebug( "Notify callbacks are using %.*s subscription object. "
\r
206 "Subscription cannot be removed yet.",
\r
207 pSubscription->thingNameLength,
\r
208 pSubscription->pThingName );
\r
210 IOT_SET_AND_GOTO_CLEANUP( false );
\r
213 for( i = 0; i < JOBS_CALLBACK_COUNT; i++ )
\r
215 for( callbackIndex = 0; callbackIndex < AWS_IOT_JOBS_NOTIFY_CALLBACKS; callbackIndex++ )
\r
217 if( pSubscription->callbacks[ i ][ callbackIndex ].function != NULL )
\r
219 IotLogDebug( "Found active Jobs %s callback for %.*s subscription object. "
\r
220 "Subscription cannot be removed yet.",
\r
221 _pAwsIotJobsCallbackNames[ i ],
\r
222 pSubscription->thingNameLength,
\r
223 pSubscription->pThingName );
\r
225 IOT_SET_AND_GOTO_CLEANUP( false );
\r
230 /* Remove the subscription if unused. */
\r
231 IOT_FUNCTION_CLEANUP_BEGIN();
\r
233 if( status == true )
\r
235 /* No Jobs operation subscription references or active Jobs callbacks.
\r
236 * Remove the subscription object. */
\r
237 IotListDouble_Remove( &( pSubscription->link ) );
\r
239 IotLogDebug( "Removed subscription object for %.*s.",
\r
240 pSubscription->thingNameLength,
\r
241 pSubscription->pThingName );
\r
243 /* If the caller requested the removed subscription, set the output parameter.
\r
244 * Otherwise, free the memory used by the subscription. */
\r
245 if( pRemovedSubscription != NULL )
\r
247 *pRemovedSubscription = pSubscription;
\r
251 _AwsIotJobs_DestroySubscription( pSubscription );
\r
256 /*-----------------------------------------------------------*/
\r
258 void _AwsIotJobs_DestroySubscription( void * pData )
\r
260 _jobsSubscription_t * pSubscription = ( _jobsSubscription_t * ) pData;
\r
262 /* Free the topic buffer if allocated. */
\r
263 if( pSubscription->pTopicBuffer != NULL )
\r
265 AwsIotJobs_FreeString( pSubscription->pTopicBuffer );
\r
268 /* Free memory used by subscription. */
\r
269 AwsIotJobs_FreeSubscription( pSubscription );
\r
272 /*-----------------------------------------------------------*/
\r
274 AwsIotJobsError_t _AwsIotJobs_IncrementReferences( _jobsOperation_t * pOperation,
\r
275 char * pTopicBuffer,
\r
276 uint16_t operationTopicLength,
\r
277 AwsIotMqttCallbackFunction_t callback )
\r
279 IOT_FUNCTION_ENTRY( AwsIotJobsError_t, AWS_IOT_JOBS_SUCCESS );
\r
280 const _jobsOperationType_t type = pOperation->type;
\r
281 _jobsSubscription_t * pSubscription = pOperation->pSubscription;
\r
282 IotMqttError_t subscriptionStatus = IOT_MQTT_STATUS_PENDING;
\r
283 AwsIotSubscriptionInfo_t subscriptionInfo = { 0 };
\r
285 /* Do nothing if this operation has persistent subscriptions. */
\r
286 if( pSubscription->operationReferences[ type ] == AWS_IOT_PERSISTENT_SUBSCRIPTION )
\r
288 IotLogDebug( "Jobs %s for %.*s has persistent subscriptions. Reference "
\r
289 "count will not be incremented.",
\r
290 _pAwsIotJobsOperationNames[ type ],
\r
291 pSubscription->thingNameLength,
\r
292 pSubscription->pThingName );
\r
294 IOT_GOTO_CLEANUP();
\r
297 /* When persistent subscriptions are not present, the reference count must
\r
298 * not be negative. */
\r
299 AwsIotJobs_Assert( pSubscription->operationReferences[ type ] >= 0 );
\r
301 /* Check if there are any existing references for this operation. */
\r
302 if( pSubscription->operationReferences[ type ] == 0 )
\r
304 /* Set the parameters needed to add subscriptions. */
\r
305 subscriptionInfo.mqttConnection = pOperation->mqttConnection;
\r
306 subscriptionInfo.callbackFunction = callback;
\r
307 subscriptionInfo.timeout = _AwsIotJobsMqttTimeoutMs;
\r
308 subscriptionInfo.pTopicFilterBase = pTopicBuffer;
\r
309 subscriptionInfo.topicFilterBaseLength = operationTopicLength;
\r
311 subscriptionStatus = AwsIot_ModifySubscriptions( IotMqtt_SubscribeSync,
\r
312 &subscriptionInfo );
\r
314 /* Convert MQTT return code to Jobs return code. */
\r
315 switch( subscriptionStatus )
\r
317 case IOT_MQTT_SUCCESS:
\r
318 status = AWS_IOT_JOBS_SUCCESS;
\r
321 case IOT_MQTT_NO_MEMORY:
\r
322 status = AWS_IOT_JOBS_NO_MEMORY;
\r
326 status = AWS_IOT_JOBS_MQTT_ERROR;
\r
330 if( status != AWS_IOT_JOBS_SUCCESS )
\r
332 IOT_GOTO_CLEANUP();
\r
336 /* Increment the number of subscription references for this operation when
\r
337 * the keep subscriptions flag is not set. */
\r
338 if( ( pOperation->flags & AWS_IOT_JOBS_FLAG_KEEP_SUBSCRIPTIONS ) == 0 )
\r
340 ( pSubscription->operationReferences[ type ] )++;
\r
342 IotLogDebug( "Jobs %s subscriptions for %.*s now has count %d.",
\r
343 _pAwsIotJobsOperationNames[ type ],
\r
344 pSubscription->thingNameLength,
\r
345 pSubscription->pThingName,
\r
346 pSubscription->operationReferences[ type ] );
\r
348 /* Otherwise, set the persistent subscriptions flag. */
\r
351 pSubscription->operationReferences[ type ] = AWS_IOT_PERSISTENT_SUBSCRIPTION;
\r
353 IotLogDebug( "Set persistent subscriptions flag for Jobs %s of %.*s.",
\r
354 _pAwsIotJobsOperationNames[ type ],
\r
355 pSubscription->thingNameLength,
\r
356 pSubscription->pThingName );
\r
359 IOT_FUNCTION_EXIT_NO_CLEANUP();
\r
362 /*-----------------------------------------------------------*/
\r
364 void _AwsIotJobs_DecrementReferences( _jobsOperation_t * pOperation,
\r
365 char * pTopicBuffer,
\r
366 _jobsSubscription_t ** pRemovedSubscription )
\r
368 const _jobsOperationType_t type = pOperation->type;
\r
369 _jobsSubscription_t * pSubscription = pOperation->pSubscription;
\r
370 uint16_t operationTopicLength = 0;
\r
371 AwsIotSubscriptionInfo_t subscriptionInfo = { 0 };
\r
372 AwsIotJobsRequestInfo_t requestInfo = AWS_IOT_JOBS_REQUEST_INFO_INITIALIZER;
\r
374 /* Do nothing if this Jobs operation has persistent subscriptions. */
\r
375 if( pSubscription->operationReferences[ type ] != AWS_IOT_PERSISTENT_SUBSCRIPTION )
\r
377 /* Decrement the number of subscription references for this operation.
\r
378 * Ensure that it's positive. */
\r
379 ( pSubscription->operationReferences[ type ] )--;
\r
380 AwsIotJobs_Assert( pSubscription->operationReferences[ type ] >= 0 );
\r
382 /* Check if the number of references has reached 0. */
\r
383 if( pSubscription->operationReferences[ type ] == 0 )
\r
385 IotLogDebug( "Reference count for %.*s %s is 0. Unsubscribing.",
\r
386 pSubscription->thingNameLength,
\r
387 pSubscription->pThingName,
\r
388 _pAwsIotJobsOperationNames[ type ] );
\r
390 /* Subscription must have a topic buffer. */
\r
391 AwsIotJobs_Assert( pSubscription->pTopicBuffer != NULL );
\r
393 /* Set the parameters needed to generate a Jobs topic. */
\r
394 requestInfo.pThingName = pSubscription->pThingName;
\r
395 requestInfo.thingNameLength = pSubscription->thingNameLength;
\r
396 requestInfo.pJobId = pOperation->pJobId;
\r
397 requestInfo.jobIdLength = pOperation->jobIdLength;
\r
399 /* Generate the prefix of the Jobs topic. This function will not
\r
400 * fail when given a buffer. */
\r
401 ( void ) _AwsIotJobs_GenerateJobsTopic( ( _jobsOperationType_t ) type,
\r
403 &( pSubscription->pTopicBuffer ),
\r
404 &operationTopicLength );
\r
406 /* Set the parameters needed to remove subscriptions. */
\r
407 subscriptionInfo.mqttConnection = pOperation->mqttConnection;
\r
408 subscriptionInfo.timeout = _AwsIotJobsMqttTimeoutMs;
\r
409 subscriptionInfo.pTopicFilterBase = pTopicBuffer;
\r
410 subscriptionInfo.topicFilterBaseLength = operationTopicLength;
\r
412 ( void ) AwsIot_ModifySubscriptions( IotMqtt_UnsubscribeSync,
\r
413 &subscriptionInfo );
\r
416 /* Check if this subscription should be deleted. */
\r
417 _AwsIotJobs_RemoveSubscription( pSubscription,
\r
418 pRemovedSubscription );
\r
422 IotLogDebug( "Jobs %s for %.*s has persistent subscriptions. Reference "
\r
423 "count will not be decremented.",
\r
424 _pAwsIotJobsOperationNames[ type ],
\r
425 pSubscription->thingNameLength,
\r
426 pSubscription->pThingName );
\r
430 /*-----------------------------------------------------------*/
\r
432 AwsIotJobsError_t AwsIotJobs_RemovePersistentSubscriptions( const AwsIotJobsRequestInfo_t * pRequestInfo,
\r
435 IOT_FUNCTION_ENTRY( AwsIotJobsError_t, AWS_IOT_JOBS_SUCCESS );
\r
437 uint16_t operationTopicLength = 0;
\r
438 IotMqttError_t unsubscribeStatus = IOT_MQTT_STATUS_PENDING;
\r
439 AwsIotSubscriptionInfo_t subscriptionInfo = { 0 };
\r
440 _jobsSubscription_t * pSubscription = NULL;
\r
441 IotLink_t * pSubscriptionLink = NULL;
\r
442 AwsIotThingName_t thingName = { 0 };
\r
444 thingName.pThingName = pRequestInfo->pThingName;
\r
445 thingName.thingNameLength = pRequestInfo->thingNameLength;
\r
447 IotLogInfo( "Removing persistent subscriptions for %.*s.",
\r
448 pRequestInfo->thingNameLength,
\r
449 pRequestInfo->pThingName );
\r
451 /* Check parameters. */
\r
452 if( pRequestInfo->mqttConnection == IOT_MQTT_CONNECTION_INITIALIZER )
\r
454 IotLogError( "MQTT connection is not initialized." );
\r
456 IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_JOBS_BAD_PARAMETER );
\r
459 if( AwsIot_ValidateThingName( pRequestInfo->pThingName,
\r
460 pRequestInfo->thingNameLength ) == false )
\r
462 IotLogError( "Thing Name is not valid." );
\r
464 IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_JOBS_BAD_PARAMETER );
\r
467 if( ( ( flags & AWS_IOT_JOBS_FLAG_REMOVE_DESCRIBE_SUBSCRIPTIONS ) != 0 ) ||
\r
468 ( ( flags & AWS_IOT_JOBS_FLAG_REMOVE_UPDATE_SUBSCRIPTIONS ) != 0 ) )
\r
470 if( ( pRequestInfo->pJobId == NULL ) || ( pRequestInfo->jobIdLength == 0 ) )
\r
472 IotLogError( "Job ID must be set." );
\r
474 IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_JOBS_BAD_PARAMETER );
\r
477 if( pRequestInfo->jobIdLength > JOBS_MAX_ID_LENGTH )
\r
479 IotLogError( "Job ID cannot be longer than %d.",
\r
480 JOBS_MAX_ID_LENGTH );
\r
482 IOT_SET_AND_GOTO_CLEANUP( AWS_IOT_JOBS_BAD_PARAMETER );
\r
486 IotMutex_Lock( &( _AwsIotJobsSubscriptionsMutex ) );
\r
488 /* Search the list for an existing subscription for Thing Name. */
\r
489 pSubscriptionLink = IotListDouble_FindFirstMatch( &( _AwsIotJobsSubscriptions ),
\r
491 _jobsSubscription_match,
\r
494 if( pSubscriptionLink != NULL )
\r
496 IotLogDebug( "Found subscription object for %.*s. Checking for persistent "
\r
497 "subscriptions to remove.",
\r
498 pRequestInfo->thingNameLength,
\r
499 pRequestInfo->pThingName );
\r
501 pSubscription = IotLink_Container( _jobsSubscription_t, pSubscriptionLink, link );
\r
503 for( i = 0; i < JOBS_OPERATION_COUNT; i++ )
\r
505 if( ( flags & ( 0x1UL << i ) ) != 0 )
\r
507 IotLogDebug( "Removing %.*s %s persistent subscriptions.",
\r
508 pRequestInfo->thingNameLength,
\r
509 pRequestInfo->pThingName,
\r
510 _pAwsIotJobsOperationNames[ i ] );
\r
512 /* Subscription must have a topic buffer. */
\r
513 AwsIotJobs_Assert( pSubscription->pTopicBuffer != NULL );
\r
515 if( pSubscription->operationReferences[ i ] == AWS_IOT_PERSISTENT_SUBSCRIPTION )
\r
517 /* Generate the prefix of the Jobs topic. This function will not
\r
518 * fail when given a buffer. */
\r
519 ( void ) _AwsIotJobs_GenerateJobsTopic( ( _jobsOperationType_t ) i,
\r
521 &( pSubscription->pTopicBuffer ),
\r
522 &operationTopicLength );
\r
524 /* Set the parameters needed to remove subscriptions. */
\r
525 subscriptionInfo.mqttConnection = pRequestInfo->mqttConnection;
\r
526 subscriptionInfo.timeout = _AwsIotJobsMqttTimeoutMs;
\r
527 subscriptionInfo.pTopicFilterBase = pSubscription->pTopicBuffer;
\r
528 subscriptionInfo.topicFilterBaseLength = operationTopicLength;
\r
530 unsubscribeStatus = AwsIot_ModifySubscriptions( IotMqtt_UnsubscribeSync,
\r
531 &subscriptionInfo );
\r
533 /* Convert MQTT return code to Shadow return code. */
\r
534 switch( unsubscribeStatus )
\r
536 case IOT_MQTT_SUCCESS:
\r
537 status = AWS_IOT_JOBS_SUCCESS;
\r
540 case IOT_MQTT_NO_MEMORY:
\r
541 status = AWS_IOT_JOBS_NO_MEMORY;
\r
545 status = AWS_IOT_JOBS_MQTT_ERROR;
\r
549 if( status != AWS_IOT_JOBS_SUCCESS )
\r
554 /* Clear the persistent subscriptions flag and check if the
\r
555 * subscription can be removed. */
\r
556 pSubscription->operationReferences[ i ] = 0;
\r
557 _AwsIotJobs_RemoveSubscription( pSubscription, NULL );
\r
561 IotLogDebug( "%.*s %s does not have persistent subscriptions.",
\r
562 pRequestInfo->thingNameLength,
\r
563 pRequestInfo->pThingName,
\r
564 _pAwsIotJobsOperationNames[ i ] );
\r
571 IotLogWarn( "No subscription object found for %.*s",
\r
572 pRequestInfo->thingNameLength,
\r
573 pRequestInfo->pThingName );
\r
576 IotMutex_Unlock( &( _AwsIotJobsSubscriptionsMutex ) );
\r
578 IOT_FUNCTION_EXIT_NO_CLEANUP();
\r
581 /*-----------------------------------------------------------*/
\r