2 * Amazon FreeRTOS Common V1.0.0
\r
3 * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
\r
5 * Permission is hereby granted, free of charge, to any person obtaining a copy of
\r
6 * this software and associated documentation files (the "Software"), to deal in
\r
7 * the Software without restriction, including without limitation the rights to
\r
8 * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
\r
9 * the Software, and to permit persons to whom the Software is furnished to do so,
\r
10 * subject to the following conditions:
\r
12 * The above copyright notice and this permission notice shall be included in all
\r
13 * copies or substantial portions of the Software.
\r
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
\r
16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
\r
17 * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
\r
18 * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
\r
19 * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
\r
20 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\r
22 * http://aws.amazon.com/freertos
\r
23 * http://www.FreeRTOS.org
\r
26 * @file iot_taskpool.c
\r
27 * @brief Implements the task pool functions in iot_taskpool.h
\r
30 /* The config header is always included first. */
\r
31 #include "iot_config.h"
\r
33 /* Standard includes. */
\r
34 #include <stdbool.h>
\r
39 /* Platform layer includes. */
\r
40 #include "platform/iot_threads.h"
\r
41 #include "platform/iot_clock.h"
\r
43 /* Task pool internal include. */
\r
44 #include "private/iot_taskpool_internal.h"
\r
47 * @brief Enter a critical section by disabling interrupts.
\r
50 #define TASKPOOL_ENTER_CRITICAL() taskENTER_CRITICAL()
\r
53 * @brief Enter a critical section by disabling interrupts.
\r
56 #define TASKPOOL_ENTER_CRITICAL_FROM_ISR() taskENTER_CRITICAL_FROM_ISR()
\r
59 * @brief Exit a critical section by re-enabling interrupts.
\r
62 #define TASKPOOL_EXIT_CRITICAL() taskEXIT_CRITICAL()
\r
65 * @brief Exit a critical section by re-enabling interrupts.
\r
68 #define TASKPOOL_EXIT_CRITICAL_FROM_ISR( x ) taskEXIT_CRITICAL_FROM_ISR( x )
\r
71 * @brief Maximum semaphore value for wait operations.
\r
73 #define TASKPOOL_MAX_SEM_VALUE 0xFFFF
\r
76 * @brief Reschedule delay in milliseconds for deferred jobs.
\r
78 #define TASKPOOL_JOB_RESCHEDULE_DELAY_MS ( 10ULL )
\r
80 /* ---------------------------------------------------------------------------------- */
\r
83 * Doxygen should ignore this section.
\r
85 * @brief The system task pool handle for all libraries to use.
\r
86 * User application can use the system task pool as well knowing that the usage will be shared with
\r
87 * the system libraries as well. The system task pool needs to be initialized before any library is used or
\r
88 * before any code that posts jobs to the task pool runs.
\r
90 _taskPool_t _IotSystemTaskPool = { .dispatchQueue = IOT_DEQUEUE_INITIALIZER };
\r
92 /* -------------- Convenience functions to create/recycle/destroy jobs -------------- */
\r
95 * @brief Initializes one instance of a Task pool cache.
\r
97 * @param[in] pCache The pre-allocated instance of the cache to initialize.
\r
99 static void _initJobsCache( _taskPoolCache_t * const pCache );
\r
102 * @brief Initialize a job.
\r
104 * @param[in] pJob The job to initialize.
\r
105 * @param[in] userCallback The user callback for the job.
\r
106 * @param[in] pUserContext The context tp be passed to the callback.
\r
107 * @param[in] isStatic A flag to indicate whether the job is statically or synamically allocated.
\r
109 static void _initializeJob( _taskPoolJob_t * const pJob,
\r
110 IotTaskPoolRoutine_t userCallback,
\r
111 void * pUserContext,
\r
115 * @brief Extracts and initializes one instance of a job from the cache or, if there is none available, it allocates and initialized a new one.
\r
117 * @param[in] pCache The instance of the cache to extract the job from.
\r
119 static _taskPoolJob_t * _fetchOrAllocateJob( _taskPoolCache_t * const pCache );
\r
122 * Recycles one instance of a job into the cache or, if the cache is full, it destroys it.
\r
124 * @param[in] pCache The instance of the cache to recycle the job into.
\r
125 * @param[in] pJob The job to recycle.
\r
128 static void _recycleJob( _taskPoolCache_t * const pCache,
\r
129 _taskPoolJob_t * const pJob );
\r
132 * Destroys one instance of a job.
\r
134 * @param[in] pJob The job to destroy.
\r
137 static void _destroyJob( _taskPoolJob_t * const pJob );
\r
139 /* -------------- The worker thread procedure for a task pool thread -------------- */
\r
142 * The procedure for a task pool worker thread.
\r
144 * @param[in] pUserContext The user context.
\r
147 static void _taskPoolWorker( void * pUserContext );
\r
149 /* -------------- Convenience functions to handle timer events -------------- */
\r
152 * Comparer for the time list.
\r
154 * param[in] pTimerEventLink1 The link to the first timer event.
\r
155 * param[in] pTimerEventLink1 The link to the first timer event.
\r
157 static int32_t _timerEventCompare( const IotLink_t * const pTimerEventLink1,
\r
158 const IotLink_t * const pTimerEventLink2 );
\r
161 * Reschedules the timer for handling deferred jobs to the next timeout.
\r
163 * param[in] timer The timer to reschedule.
\r
164 * param[in] pFirstTimerEvent The timer event that carries the timeout and job inforamtion.
\r
166 static void _rescheduleDeferredJobsTimer( TimerHandle_t const timer,
\r
167 _taskPoolTimerEvent_t * const pFirstTimerEvent );
\r
170 * The task pool timer procedure for scheduling deferred jobs.
\r
172 * param[in] timer The timer to handle.
\r
174 static void _timerThread( TimerHandle_t xTimer );
\r
176 /* -------------- Convenience functions to create/initialize/destroy the task pool -------------- */
\r
179 * Parameter validation for a task pool initialization.
\r
181 * @param[in] pInfo The initialization information for the task pool.
\r
184 static IotTaskPoolError_t _performTaskPoolParameterValidation( const IotTaskPoolInfo_t * const pInfo );
\r
187 * Initializes a pre-allocated instance of a task pool.
\r
189 * @param[in] pInfo The initialization information for the task pool.
\r
190 * @param[in] pTaskPool The pre-allocated instance of the task pool to initialize.
\r
193 static IotTaskPoolError_t _initTaskPoolControlStructures( const IotTaskPoolInfo_t * const pInfo,
\r
194 _taskPool_t * const pTaskPool );
\r
197 * Initializes a pre-allocated instance of a task pool.
\r
199 * @param[in] pInfo The initialization information for the task pool.
\r
200 * @param[out] pTaskPool A pointer to the task pool data structure to initialize.
\r
203 static IotTaskPoolError_t _createTaskPool( const IotTaskPoolInfo_t * const pInfo,
\r
204 _taskPool_t * const pTaskPool );
\r
207 * Destroys one instance of a task pool.
\r
209 * @param[in] pTaskPool The task pool to destroy.
\r
212 static void _destroyTaskPool( _taskPool_t * const pTaskPool );
\r
215 * Check for the exit condition.
\r
217 * @param[in] pTaskPool The task pool to destroy.
\r
220 static bool _IsShutdownStarted( const _taskPool_t * const pTaskPool );
\r
223 * Set the exit condition.
\r
225 * @param[in] pTaskPool The task pool to destroy.
\r
226 * @param[in] threads The number of threads active in the task pool at shutdown time.
\r
229 static void _signalShutdown( _taskPool_t * const pTaskPool,
\r
230 uint32_t threads );
\r
233 * Places a job in the dispatch queue.
\r
235 * @param[in] pTaskPool The task pool to scheduel the job with.
\r
236 * @param[in] pJob The job to schedule.
\r
237 * @param[in] flags The job flags.
\r
240 static IotTaskPoolError_t _scheduleInternal( _taskPool_t * const pTaskPool,
\r
241 _taskPoolJob_t * const pJob,
\r
245 * Matches a deferred job in the timer queue with its timer event wrapper.
\r
247 * @param[in] pLink A pointer to the timer event link in the timer queue.
\r
248 * @param[in] pMatch A pointer to the job to match.
\r
251 static bool _matchJobByPointer( const IotLink_t * const pLink,
\r
255 * Tries to cancel a job.
\r
257 * @param[in] pTaskPool The task pool to cancel an operation against.
\r
258 * @param[in] pJob The job to cancel.
\r
259 * @param[out] pStatus The status of the job at the time of cancellation.
\r
262 static IotTaskPoolError_t _tryCancelInternal( _taskPool_t * const pTaskPool,
\r
263 _taskPoolJob_t * const pJob,
\r
264 IotTaskPoolJobStatus_t * const pStatus );
\r
266 /* ---------------------------------------------------------------------------------------------- */
\r
268 IotTaskPool_t IotTaskPool_GetSystemTaskPool( void )
\r
270 return &_IotSystemTaskPool;
\r
273 /*-----------------------------------------------------------*/
\r
275 IotTaskPoolError_t IotTaskPool_CreateSystemTaskPool( const IotTaskPoolInfo_t * const pInfo )
\r
277 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
279 /* Parameter checking. */
\r
280 TASKPOOL_ON_ERROR_GOTO_CLEANUP( _performTaskPoolParameterValidation( pInfo ) );
\r
282 /* Create the system task pool pool. */
\r
283 TASKPOOL_SET_AND_GOTO_CLEANUP( _createTaskPool( pInfo, &_IotSystemTaskPool ) );
\r
285 TASKPOOL_NO_FUNCTION_CLEANUP();
\r
288 /*-----------------------------------------------------------*/
\r
290 IotTaskPoolError_t IotTaskPool_Create( const IotTaskPoolInfo_t * const pInfo,
\r
291 IotTaskPool_t * const pTaskPool )
\r
293 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
295 _taskPool_t * pTempTaskPool = NULL;
\r
297 /* Verify that the task pool storage is valid. */
\r
298 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pTaskPool );
\r
300 /* Parameter checking. */
\r
301 TASKPOOL_ON_ERROR_GOTO_CLEANUP( _performTaskPoolParameterValidation( pInfo ) );
\r
303 /* Allocate the memory for the task pool */
\r
304 pTempTaskPool = ( _taskPool_t * ) IotTaskPool_MallocTaskPool( sizeof( _taskPool_t ) );
\r
306 if( pTempTaskPool == NULL )
\r
308 TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );
\r
311 memset( pTempTaskPool, 0x00, sizeof( _taskPool_t ) );
\r
313 TASKPOOL_SET_AND_GOTO_CLEANUP( _createTaskPool( pInfo, pTempTaskPool ) );
\r
315 TASKPOOL_FUNCTION_CLEANUP();
\r
317 if( TASKPOOL_FAILED( status ) )
\r
319 if( pTempTaskPool != NULL )
\r
321 IotTaskPool_FreeTaskPool( pTempTaskPool );
\r
326 *pTaskPool = pTempTaskPool;
\r
329 TASKPOOL_FUNCTION_CLEANUP_END();
\r
332 /*-----------------------------------------------------------*/
\r
334 IotTaskPoolError_t IotTaskPool_Destroy( IotTaskPool_t taskPoolHandle )
\r
336 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
339 bool completeShutdown = true;
\r
341 _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;
\r
343 /* Track how many threads the task pool owns. */
\r
344 uint32_t activeThreads;
\r
346 /* Parameter checking. */
\r
347 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pTaskPool );
\r
349 /* Destroying the task pool should be safe, and therefore we will grab the task pool lock.
\r
350 * No worker thread or application thread should access any data structure
\r
351 * in the task pool while the task pool is being destroyed. */
\r
352 TASKPOOL_ENTER_CRITICAL();
\r
354 IotLink_t * pItemLink;
\r
356 /* Record how many active threads in the task pool. */
\r
357 activeThreads = pTaskPool->activeThreads;
\r
359 /* Destroying a Task pool happens in six (6) stages: First, (1) we clear the job queue and (2) the timer queue.
\r
360 * Then (3) we clear the jobs cache. We will then (4) wait for all worker threads to signal exit,
\r
361 * before (5) setting the exit condition and wake up all active worker threads. Finally (6) destroying
\r
362 * all task pool data structures and release the associated memory.
\r
365 /* (1) Clear the job queue. */
\r
370 pItemLink = IotDeQueue_DequeueHead( &pTaskPool->dispatchQueue );
\r
372 if( pItemLink != NULL )
\r
374 _taskPoolJob_t * pJob = IotLink_Container( _taskPoolJob_t, pItemLink, link );
\r
376 _destroyJob( pJob );
\r
378 } while( pItemLink );
\r
380 /* (2) Clear the timer queue. */
\r
382 _taskPoolTimerEvent_t * pTimerEvent;
\r
384 /* A deferred job may have fired already. Since deferred jobs will go through the same mutex
\r
385 * the shutdown sequence is holding at this stage, there is no risk for race conditions. Yet, we
\r
386 * need to let the deferred job to destroy the task pool. */
\r
388 pItemLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList );
\r
390 if( pItemLink != NULL )
\r
392 TickType_t now = xTaskGetTickCount();
\r
394 pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pItemLink, link );
\r
396 if( pTimerEvent->expirationTime <= now )
\r
398 IotLogDebug( "Shutdown will be deferred to the timer thread" );
\r
400 /* Timer may have fired already! Let the timer thread destroy
\r
401 * complete the taskpool destruction sequence. */
\r
402 completeShutdown = false;
\r
405 /* Remove all timers from the timeout list. */
\r
408 pItemLink = IotListDouble_RemoveHead( &pTaskPool->timerEventsList );
\r
410 if( pItemLink == NULL )
\r
415 pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pItemLink, link );
\r
417 _destroyJob( pTimerEvent->job );
\r
419 IotTaskPool_FreeTimerEvent( pTimerEvent );
\r
424 /* (3) Clear the job cache. */
\r
429 pItemLink = IotListDouble_RemoveHead( &pTaskPool->jobsCache.freeList );
\r
431 if( pItemLink != NULL )
\r
433 _taskPoolJob_t * pJob = IotLink_Container( _taskPoolJob_t, pItemLink, link );
\r
435 _destroyJob( pJob );
\r
437 } while( pItemLink );
\r
439 /* (4) Set the exit condition. */
\r
440 _signalShutdown( pTaskPool, activeThreads );
\r
442 TASKPOOL_EXIT_CRITICAL();
\r
444 /* (5) Wait for all active threads to reach the end of their life-span. */
\r
445 for( count = 0; count < activeThreads; ++count )
\r
447 xSemaphoreTake( pTaskPool->startStopSignal, portMAX_DELAY );
\r
450 IotTaskPool_Assert( uxSemaphoreGetCount( pTaskPool->startStopSignal ) == 0 );
\r
451 IotTaskPool_Assert( pTaskPool->activeThreads == 0 );
\r
453 /* (6) Destroy all signaling objects. */
\r
454 if( completeShutdown == true )
\r
456 _destroyTaskPool( pTaskPool );
\r
458 /* Do not free the system task pool which is statically allocated. */
\r
459 if( pTaskPool != &_IotSystemTaskPool )
\r
461 IotTaskPool_FreeTaskPool( pTaskPool );
\r
465 TASKPOOL_NO_FUNCTION_CLEANUP();
\r
468 /*-----------------------------------------------------------*/
\r
470 IotTaskPoolError_t IotTaskPool_SetMaxThreads( IotTaskPool_t taskPoolHandle,
\r
471 uint32_t maxThreads )
\r
473 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
475 _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;
\r
477 /* Parameter checking. */
\r
478 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pTaskPool );
\r
479 TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( maxThreads < 1UL );
\r
481 TASKPOOL_NO_FUNCTION_CLEANUP();
\r
484 /*-----------------------------------------------------------*/
\r
486 IotTaskPoolError_t IotTaskPool_CreateJob( IotTaskPoolRoutine_t userCallback,
\r
487 void * pUserContext,
\r
488 IotTaskPoolJobStorage_t * const pJobStorage,
\r
489 IotTaskPoolJob_t * const ppJob )
\r
491 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
493 /* Parameter checking. */
\r
494 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( userCallback );
\r
495 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJobStorage );
\r
496 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( ppJob );
\r
498 /* Build a job around the user-provided storage. */
\r
499 _initializeJob( ( _taskPoolJob_t * ) pJobStorage, userCallback, pUserContext, true );
\r
501 *ppJob = ( IotTaskPoolJob_t ) pJobStorage;
\r
503 TASKPOOL_NO_FUNCTION_CLEANUP();
\r
506 /*-----------------------------------------------------------*/
\r
508 IotTaskPoolError_t IotTaskPool_CreateRecyclableJob( IotTaskPool_t taskPoolHandle,
\r
509 IotTaskPoolRoutine_t userCallback,
\r
510 void * pUserContext,
\r
511 IotTaskPoolJob_t * const ppJob )
\r
513 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
515 _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;
\r
516 _taskPoolJob_t * pTempJob = NULL;
\r
518 /* Parameter checking. */
\r
519 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );
\r
520 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( userCallback );
\r
521 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( ppJob );
\r
523 TASKPOOL_ENTER_CRITICAL();
\r
525 /* Bail out early if this task pool is shutting down. */
\r
526 pTempJob = _fetchOrAllocateJob( &pTaskPool->jobsCache );
\r
528 TASKPOOL_EXIT_CRITICAL();
\r
530 if( pTempJob == NULL )
\r
532 IotLogInfo( "Failed to allocate a job." );
\r
534 TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );
\r
537 _initializeJob( pTempJob, userCallback, pUserContext, false );
\r
541 TASKPOOL_NO_FUNCTION_CLEANUP();
\r
544 /*-----------------------------------------------------------*/
\r
546 IotTaskPoolError_t IotTaskPool_DestroyRecyclableJob( IotTaskPool_t taskPoolHandle,
\r
547 IotTaskPoolJob_t pJobHandle )
\r
549 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
551 _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;
\r
552 _taskPoolJob_t * pJob = ( _taskPoolJob_t * ) pJobHandle;
\r
554 /* Parameter checking. */
\r
555 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );
\r
556 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJobHandle );
\r
558 IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) == false );
\r
560 _destroyJob( pJob );
\r
562 TASKPOOL_NO_FUNCTION_CLEANUP();
\r
565 /*-----------------------------------------------------------*/
\r
567 IotTaskPoolError_t IotTaskPool_RecycleJob( IotTaskPool_t taskPoolHandle,
\r
568 IotTaskPoolJob_t pJob )
\r
570 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
572 _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;
\r
574 /* Parameter checking. */
\r
575 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );
\r
576 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJob );
\r
578 TASKPOOL_ENTER_CRITICAL();
\r
580 IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) == false );
\r
582 _recycleJob( &pTaskPool->jobsCache, pJob );
\r
584 TASKPOOL_EXIT_CRITICAL();
\r
586 TASKPOOL_NO_FUNCTION_CLEANUP();
\r
589 /*-----------------------------------------------------------*/
\r
591 IotTaskPoolError_t IotTaskPool_Schedule( IotTaskPool_t taskPoolHandle,
\r
592 IotTaskPoolJob_t pJob,
\r
595 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
597 _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;
\r
599 /* Parameter checking. */
\r
600 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );
\r
601 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJob );
\r
602 TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( ( flags != 0UL ) && ( flags != IOT_TASKPOOL_JOB_HIGH_PRIORITY ) );
\r
604 TASKPOOL_ENTER_CRITICAL();
\r
606 _scheduleInternal( pTaskPool, pJob, flags );
\r
608 TASKPOOL_EXIT_CRITICAL();
\r
610 TASKPOOL_NO_FUNCTION_CLEANUP();
\r
613 /*-----------------------------------------------------------*/
\r
615 IotTaskPoolError_t IotTaskPool_ScheduleDeferred( IotTaskPool_t taskPoolHandle,
\r
616 IotTaskPoolJob_t job,
\r
619 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
621 _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;
\r
623 /* Parameter checking. */
\r
624 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );
\r
625 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( job );
\r
627 if( timeMs == 0UL )
\r
629 TASKPOOL_SET_AND_GOTO_CLEANUP( IotTaskPool_Schedule( pTaskPool, job, 0 ) );
\r
632 TASKPOOL_ENTER_CRITICAL();
\r
634 _taskPoolTimerEvent_t * pTimerEvent = IotTaskPool_MallocTimerEvent( sizeof( _taskPoolTimerEvent_t ) );
\r
636 if( pTimerEvent == NULL )
\r
638 TASKPOOL_EXIT_CRITICAL();
\r
640 TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );
\r
643 IotLink_t * pTimerEventLink;
\r
645 TickType_t now = xTaskGetTickCount();
\r
647 pTimerEvent->link.pNext = NULL;
\r
648 pTimerEvent->link.pPrevious = NULL;
\r
649 pTimerEvent->expirationTime = now + pdMS_TO_TICKS( timeMs );
\r
650 pTimerEvent->job = job;
\r
652 /* Append the timer event to the timer list. */
\r
653 IotListDouble_InsertSorted( &pTaskPool->timerEventsList, &pTimerEvent->link, _timerEventCompare );
\r
655 /* Update the job status to 'scheduled'. */
\r
656 job->status = IOT_TASKPOOL_STATUS_DEFERRED;
\r
658 /* Peek the first event in the timer event list. There must be at least one,
\r
659 * since we just inserted it. */
\r
660 pTimerEventLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList );
\r
661 IotTaskPool_Assert( pTimerEventLink != NULL );
\r
663 /* If the event we inserted is at the front of the queue, then
\r
664 * we need to reschedule the underlying timer. */
\r
665 if( pTimerEventLink == &pTimerEvent->link )
\r
667 pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pTimerEventLink, link );
\r
669 _rescheduleDeferredJobsTimer( pTaskPool->timer, pTimerEvent );
\r
672 TASKPOOL_EXIT_CRITICAL();
\r
674 TASKPOOL_NO_FUNCTION_CLEANUP();
\r
677 /*-----------------------------------------------------------*/
\r
679 IotTaskPoolError_t IotTaskPool_GetStatus( IotTaskPool_t taskPoolHandle,
\r
680 IotTaskPoolJob_t job,
\r
681 IotTaskPoolJobStatus_t * const pStatus )
\r
683 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
685 _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;
\r
687 /* Parameter checking. */
\r
688 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );
\r
689 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( job );
\r
690 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pStatus );
\r
691 *pStatus = IOT_TASKPOOL_STATUS_UNDEFINED;
\r
693 TASKPOOL_ENTER_CRITICAL();
\r
695 *pStatus = job->status;
\r
697 TASKPOOL_EXIT_CRITICAL();
\r
699 TASKPOOL_NO_FUNCTION_CLEANUP();
\r
702 /*-----------------------------------------------------------*/
\r
704 IotTaskPoolError_t IotTaskPool_TryCancel( IotTaskPool_t taskPoolHandle,
\r
705 IotTaskPoolJob_t job,
\r
706 IotTaskPoolJobStatus_t * const pStatus )
\r
708 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
710 _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;
\r
712 /* Parameter checking. */
\r
713 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );
\r
714 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( job );
\r
716 if( pStatus != NULL )
\r
718 *pStatus = IOT_TASKPOOL_STATUS_UNDEFINED;
\r
721 TASKPOOL_ENTER_CRITICAL();
\r
723 status = _tryCancelInternal( pTaskPool, job, pStatus );
\r
725 TASKPOOL_EXIT_CRITICAL();
\r
727 TASKPOOL_NO_FUNCTION_CLEANUP();
\r
730 /*-----------------------------------------------------------*/
\r
732 IotTaskPoolJobStorage_t * IotTaskPool_GetJobStorageFromHandle( IotTaskPoolJob_t pJob )
\r
734 return ( IotTaskPoolJobStorage_t * ) pJob;
\r
737 /*-----------------------------------------------------------*/
\r
739 const char * IotTaskPool_strerror( IotTaskPoolError_t status )
\r
741 const char * pMessage = NULL;
\r
745 case IOT_TASKPOOL_SUCCESS:
\r
746 pMessage = "SUCCESS";
\r
749 case IOT_TASKPOOL_BAD_PARAMETER:
\r
750 pMessage = "BAD PARAMETER";
\r
753 case IOT_TASKPOOL_ILLEGAL_OPERATION:
\r
754 pMessage = "ILLEGAL OPERATION";
\r
757 case IOT_TASKPOOL_NO_MEMORY:
\r
758 pMessage = "NO MEMORY";
\r
761 case IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS:
\r
762 pMessage = "SHUTDOWN IN PROGRESS";
\r
765 case IOT_TASKPOOL_CANCEL_FAILED:
\r
766 pMessage = "CANCEL FAILED";
\r
770 pMessage = "INVALID STATUS";
\r
777 /* ---------------------------------------------------------------------------------------------- */
\r
778 /* ---------------------------------------------------------------------------------------------- */
\r
779 /* ---------------------------------------------------------------------------------------------- */
\r
781 static IotTaskPoolError_t _performTaskPoolParameterValidation( const IotTaskPoolInfo_t * const pInfo )
\r
783 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
785 /* Check input values for consistency. */
\r
786 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pInfo );
\r
787 TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( pInfo->minThreads > pInfo->maxThreads );
\r
788 TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( pInfo->minThreads < 1UL );
\r
789 TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( pInfo->maxThreads < 1UL );
\r
791 TASKPOOL_NO_FUNCTION_CLEANUP();
\r
794 static IotTaskPoolError_t _initTaskPoolControlStructures( const IotTaskPoolInfo_t * const pInfo,
\r
795 _taskPool_t * const pTaskPool )
\r
797 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
799 bool semStartStopInit = false;
\r
800 bool semDispatchInit = false;
\r
802 /* Initialize a job data structures that require no de-initialization.
\r
803 * All other data structures carry a value of 'NULL' before initailization.
\r
805 IotDeQueue_Create( &pTaskPool->dispatchQueue );
\r
806 IotListDouble_Create( &pTaskPool->timerEventsList );
\r
808 _initJobsCache( &pTaskPool->jobsCache );
\r
810 /* Initialize the semaphore to ensure all threads have started. */
\r
811 pTaskPool->startStopSignal = xSemaphoreCreateCountingStatic( pInfo->minThreads, 0, &pTaskPool->startStopSignalBuffer );
\r
813 if( pTaskPool->startStopSignal != NULL )
\r
815 semStartStopInit = true;
\r
817 /* Initialize the semaphore for waiting for incoming work. */
\r
818 pTaskPool->dispatchSignal = xSemaphoreCreateCountingStatic( TASKPOOL_MAX_SEM_VALUE, 0, &pTaskPool->dispatchSignalBuffer );
\r
820 if( pTaskPool->dispatchSignal != NULL )
\r
822 semDispatchInit = true;
\r
826 TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );
\r
831 TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );
\r
834 TASKPOOL_FUNCTION_CLEANUP();
\r
836 if( TASKPOOL_FAILED( status ) )
\r
838 if( semStartStopInit )
\r
840 vSemaphoreDelete( &pTaskPool->startStopSignal );
\r
843 if( semDispatchInit )
\r
845 vSemaphoreDelete( &pTaskPool->dispatchSignal );
\r
849 TASKPOOL_FUNCTION_CLEANUP_END();
\r
852 static IotTaskPoolError_t _createTaskPool( const IotTaskPoolInfo_t * const pInfo,
\r
853 _taskPool_t * const pTaskPool )
\r
855 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
858 uint32_t threadsCreated = 0;
\r
860 /* Check input values for consistency. */
\r
861 TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pTaskPool );
\r
863 /* Zero out all data structures. */
\r
864 memset( ( void * ) pTaskPool, 0x00, sizeof( _taskPool_t ) );
\r
866 /* Initialize all internal data structure prior to creating all threads. */
\r
867 TASKPOOL_ON_ERROR_GOTO_CLEANUP( _initTaskPoolControlStructures( pInfo, pTaskPool ) );
\r
869 /* Create the timer mutex for a new connection. */
\r
870 pTaskPool->timer = xTimerCreate( NULL, portMAX_DELAY, pdFALSE, ( void * ) pTaskPool, _timerThread );
\r
872 if( pTaskPool->timer == NULL )
\r
874 IotLogError( "Failed to create timer for task pool." );
\r
876 TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );
\r
879 /* The task pool will initialize the minimum number of threads reqeusted by the user upon start. */
\r
880 /* When a thread is created, it will signal a semaphore to signify that it is about to wait on incoming */
\r
881 /* jobs. A thread can be woken up for exit or for new jobs only at that point in time. */
\r
882 /* The exit condition is setting the maximum number of threads to 0. */
\r
884 /* Create the minimum number of threads specified by the user, and if one fails shutdown and return error. */
\r
885 for( ; threadsCreated < pInfo->minThreads; )
\r
887 TaskHandle_t task = NULL;
\r
889 BaseType_t res = xTaskCreate( _taskPoolWorker,
\r
896 /* Create one thread. */
\r
897 if( res == pdFALSE )
\r
899 IotLogError( "Could not create worker thread! Exiting..." );
\r
901 /* If creating one thread fails, set error condition and exit the loop. */
\r
902 TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );
\r
905 /* Upon successful thread creation, increase the number of active threads. */
\r
906 pTaskPool->activeThreads++;
\r
907 IotTaskPool_Assert( task != NULL );
\r
912 TASKPOOL_FUNCTION_CLEANUP();
\r
914 /* Wait for threads to be ready to wait on the condition, so that threads are actually able to receive messages. */
\r
915 for( count = 0; count < threadsCreated; ++count )
\r
917 xSemaphoreTake( pTaskPool->startStopSignal, portMAX_DELAY );
\r
920 /* In case of failure, wait on the created threads to exit. */
\r
921 if( TASKPOOL_FAILED( status ) )
\r
923 /* Set the exit condition for the newly created threads. */
\r
924 _signalShutdown( pTaskPool, threadsCreated );
\r
926 /* Signal all threads to exit. */
\r
927 for( count = 0; count < threadsCreated; ++count )
\r
929 xSemaphoreTake( pTaskPool->startStopSignal, portMAX_DELAY );
\r
932 _destroyTaskPool( pTaskPool );
\r
935 pTaskPool->running = true;
\r
937 TASKPOOL_FUNCTION_CLEANUP_END();
\r
940 /*-----------------------------------------------------------*/
\r
942 static void _destroyTaskPool( _taskPool_t * const pTaskPool )
\r
944 if( pTaskPool->timer != NULL )
\r
946 xTimerDelete( pTaskPool->timer, 0 );
\r
949 if( pTaskPool->dispatchSignal != NULL )
\r
951 vSemaphoreDelete( pTaskPool->dispatchSignal );
\r
954 if( pTaskPool->startStopSignal != NULL )
\r
956 vSemaphoreDelete( pTaskPool->startStopSignal );
\r
960 /* ---------------------------------------------------------------------------------------------- */
\r
962 static void _taskPoolWorker( void * pUserContext )
\r
964 IotTaskPool_Assert( pUserContext != NULL );
\r
966 IotTaskPoolRoutine_t userCallback = NULL;
\r
967 bool running = true;
\r
969 /* Extract pTaskPool pointer from context. */
\r
970 _taskPool_t * pTaskPool = ( _taskPool_t * ) pUserContext;
\r
972 /* Signal that this worker completed initialization and it is ready to receive notifications. */
\r
973 ( void ) xSemaphoreGive( pTaskPool->startStopSignal );
\r
975 /* OUTER LOOP: it controls the lifetiem of the worker thread: exit condition for a worker thread
\r
976 * is setting maxThreads to zero. A worker thread is running until the maximum number of allowed
\r
977 * threads is not zero and the active threads are less than the maximum number of allowed threads.
\r
981 IotLink_t * pFirst = NULL;
\r
982 _taskPoolJob_t * pJob = NULL;
\r
984 /* Wait on incoming notifications... */
\r
985 xSemaphoreTake( pTaskPool->dispatchSignal, portMAX_DELAY );
\r
987 /* Acquire the lock to check the exit condition, and release the lock if the exit condition is verified,
\r
988 * or before waiting for incoming notifications.
\r
990 TASKPOOL_ENTER_CRITICAL();
\r
992 /* If the exit condition is verified, update the number of active threads and exit the loop. */
\r
993 if( _IsShutdownStarted( pTaskPool ) )
\r
995 IotLogDebug( "Worker thread exiting because exit condition was set." );
\r
997 /* Decrease the number of active threads. */
\r
998 pTaskPool->activeThreads--;
\r
1000 TASKPOOL_EXIT_CRITICAL();
\r
1002 /* Signal that this worker is exiting. */
\r
1003 xSemaphoreGive( pTaskPool->startStopSignal );
\r
1005 /* On shutdown, abandon the OUTER LOOP immediately. */
\r
1009 /* Dequeue the first job in FIFO order. */
\r
1010 pFirst = IotDeQueue_DequeueHead( &pTaskPool->dispatchQueue );
\r
1012 /* If there is indeed a job, then update status under lock, and release the lock before processing the job. */
\r
1013 if( pFirst != NULL )
\r
1015 /* Extract the job from its link. */
\r
1016 pJob = IotLink_Container( _taskPoolJob_t, pFirst, link );
\r
1018 /* Update status to 'executing'. */
\r
1019 pJob->status = IOT_TASKPOOL_STATUS_COMPLETED;
\r
1020 userCallback = pJob->userCallback;
\r
1023 TASKPOOL_EXIT_CRITICAL();
\r
1025 /* INNER LOOP: it controls the execution of jobs: the exit condition is the lack of a job to execute. */
\r
1026 while( pJob != NULL )
\r
1028 /* Process the job by invoking the associated callback with the user context.
\r
1029 * This task pool thread will not be available until the user callback returns.
\r
1032 IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) == false );
\r
1033 IotTaskPool_Assert( userCallback != NULL );
\r
1035 userCallback( pTaskPool, pJob, pJob->pUserContext );
\r
1037 /* This job is finished, clear its pointer. */
\r
1039 userCallback = NULL;
\r
1041 /* If this thread exceeded the quota, then let it terminate. */
\r
1042 if( running == false )
\r
1044 /* Abandon the INNER LOOP. Execution will tranfer back to the OUTER LOOP condition. */
\r
1049 /* Acquire the lock before updating the job status. */
\r
1050 TASKPOOL_ENTER_CRITICAL();
\r
1052 /* Try and dequeue the next job in the dispatch queue. */
\r
1053 IotLink_t * pItem = NULL;
\r
1055 /* Dequeue the next job from the dispatch queue. */
\r
1056 pItem = IotDeQueue_DequeueHead( &pTaskPool->dispatchQueue );
\r
1058 /* If there is no job left in the dispatch queue, update the worker status and leave. */
\r
1059 if( pItem == NULL )
\r
1061 TASKPOOL_EXIT_CRITICAL();
\r
1063 /* Abandon the INNER LOOP. Execution will tranfer back to the OUTER LOOP condition. */
\r
1068 pJob = IotLink_Container( _taskPoolJob_t, pItem, link );
\r
1070 userCallback = pJob->userCallback;
\r
1073 pJob->status = IOT_TASKPOOL_STATUS_COMPLETED;
\r
1075 TASKPOOL_EXIT_CRITICAL();
\r
1077 } while( running == true );
\r
1079 vTaskDelete( NULL );
\r
1082 /* ---------------------------------------------------------------------------------------------- */
\r
1084 static void _initJobsCache( _taskPoolCache_t * const pCache )
\r
1086 IotDeQueue_Create( &pCache->freeList );
\r
1088 pCache->freeCount = 0;
\r
1091 /*-----------------------------------------------------------*/
\r
1093 static void _initializeJob( _taskPoolJob_t * const pJob,
\r
1094 IotTaskPoolRoutine_t userCallback,
\r
1095 void * pUserContext,
\r
1098 pJob->link.pNext = NULL;
\r
1099 pJob->link.pPrevious = NULL;
\r
1100 pJob->userCallback = userCallback;
\r
1101 pJob->pUserContext = pUserContext;
\r
1105 pJob->flags = IOT_TASK_POOL_INTERNAL_STATIC;
\r
1106 pJob->status = IOT_TASKPOOL_STATUS_READY;
\r
1110 pJob->status = IOT_TASKPOOL_STATUS_READY;
\r
1114 static _taskPoolJob_t * _fetchOrAllocateJob( _taskPoolCache_t * const pCache )
\r
1116 _taskPoolJob_t * pJob = NULL;
\r
1117 IotLink_t * pLink = IotListDouble_RemoveHead( &( pCache->freeList ) );
\r
1119 if( pLink != NULL )
\r
1121 pJob = IotLink_Container( _taskPoolJob_t, pLink, link );
\r
1124 /* If there is no available job in the cache, then allocate one. */
\r
1125 if( pJob == NULL )
\r
1127 pJob = ( _taskPoolJob_t * ) IotTaskPool_MallocJob( sizeof( _taskPoolJob_t ) );
\r
1129 if( pJob != NULL )
\r
1131 memset( pJob, 0x00, sizeof( _taskPoolJob_t ) );
\r
1135 /* Log alocation failure for troubleshooting purposes. */
\r
1136 IotLogInfo( "Failed to allocate job." );
\r
1139 /* If there was a job in the cache, then make sure we keep the counters up-to-date. */
\r
1142 IotTaskPool_Assert( pCache->freeCount > 0 );
\r
1144 pCache->freeCount--;
\r
1150 /*-----------------------------------------------------------*/
\r
1152 static void _recycleJob( _taskPoolCache_t * const pCache,
\r
1153 _taskPoolJob_t * const pJob )
\r
1155 /* We should never try and recycling a job that is linked into some queue. */
\r
1156 IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) == false );
\r
1158 /* We will recycle the job if there is space in the cache. */
\r
1159 if( pCache->freeCount < IOT_TASKPOOL_JOBS_RECYCLE_LIMIT )
\r
1161 /* Destroy user data, for added safety&security. */
\r
1162 pJob->userCallback = NULL;
\r
1163 pJob->pUserContext = NULL;
\r
1165 /* Reset the status for added debuggability. */
\r
1166 pJob->status = IOT_TASKPOOL_STATUS_UNDEFINED;
\r
1168 IotListDouble_InsertTail( &pCache->freeList, &pJob->link );
\r
1170 pCache->freeCount++;
\r
1172 IotTaskPool_Assert( pCache->freeCount >= 1 );
\r
1176 _destroyJob( pJob );
\r
1180 /*-----------------------------------------------------------*/
\r
1182 static void _destroyJob( _taskPoolJob_t * const pJob )
\r
1184 /* Destroy user data, for added safety & security. */
\r
1185 pJob->userCallback = NULL;
\r
1186 pJob->pUserContext = NULL;
\r
1188 /* Reset the status for added debuggability. */
\r
1189 pJob->status = IOT_TASKPOOL_STATUS_UNDEFINED;
\r
1191 /* Only dispose of dynamically allocated jobs. */
\r
1192 if( ( pJob->flags & IOT_TASK_POOL_INTERNAL_STATIC ) == 0UL )
\r
1194 IotTaskPool_FreeJob( pJob );
\r
1198 /* ---------------------------------------------------------------------------------------------- */
\r
1200 static bool _IsShutdownStarted( const _taskPool_t * const pTaskPool )
\r
1202 return( pTaskPool->running == false );
\r
1205 /*-----------------------------------------------------------*/
\r
1207 static void _signalShutdown( _taskPool_t * const pTaskPool,
\r
1208 uint32_t threads )
\r
1212 /* Set the exit condition. */
\r
1213 pTaskPool->running = false;
\r
1215 /* Broadcast to all active threads to wake-up. Active threads do check the exit condition right after wakein up. */
\r
1216 for( count = 0; count < threads; ++count )
\r
1218 ( void ) xSemaphoreGive( pTaskPool->dispatchSignal );
\r
1222 /* ---------------------------------------------------------------------------------------------- */
\r
1224 static IotTaskPoolError_t _scheduleInternal( _taskPool_t * const pTaskPool,
\r
1225 _taskPoolJob_t * const pJob,
\r
1228 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
1230 /* Update the job status to 'scheduled'. */
\r
1231 pJob->status = IOT_TASKPOOL_STATUS_SCHEDULED;
\r
1233 BaseType_t higherPriorityTaskWoken;
\r
1235 /* Append the job to the dispatch queue. */
\r
1236 IotDeQueue_EnqueueTail( &pTaskPool->dispatchQueue, &pJob->link );
\r
1238 /* Signal a worker to pick up the job. */
\r
1239 ( void ) xSemaphoreGiveFromISR( pTaskPool->dispatchSignal, &higherPriorityTaskWoken );
\r
1241 portYIELD_FROM_ISR( higherPriorityTaskWoken );
\r
1243 TASKPOOL_NO_FUNCTION_CLEANUP_NOLABEL();
\r
1246 /*-----------------------------------------------------------*/
\r
1248 static bool _matchJobByPointer( const IotLink_t * const pLink,
\r
1251 const _taskPoolJob_t * const pJob = ( _taskPoolJob_t * ) pMatch;
\r
1253 const _taskPoolTimerEvent_t * const pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pLink, link );
\r
1255 if( pJob == pTimerEvent->job )
\r
1263 /*-----------------------------------------------------------*/
\r
1265 static IotTaskPoolError_t _tryCancelInternal( _taskPool_t * const pTaskPool,
\r
1266 _taskPoolJob_t * const pJob,
\r
1267 IotTaskPoolJobStatus_t * const pStatus )
\r
1269 TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );
\r
1271 bool cancelable = false;
\r
1273 /* We can only cancel jobs that are either 'ready' (waiting to be scheduled). 'deferred', or 'scheduled'. */
\r
1275 IotTaskPoolJobStatus_t currentStatus = pJob->status;
\r
1277 switch( currentStatus )
\r
1279 case IOT_TASKPOOL_STATUS_READY:
\r
1280 case IOT_TASKPOOL_STATUS_DEFERRED:
\r
1281 case IOT_TASKPOOL_STATUS_SCHEDULED:
\r
1282 case IOT_TASKPOOL_STATUS_CANCELED:
\r
1283 cancelable = true;
\r
1286 case IOT_TASKPOOL_STATUS_COMPLETED:
\r
1287 /* Log mesggesong purposes. */
\r
1288 IotLogWarn( "Attempt to cancel a job that is already executing, or canceled." );
\r
1292 /* Log mesggesong purposes. */
\r
1293 IotLogError( "Attempt to cancel a job with an undefined state." );
\r
1297 /* Update the returned status to the current status of the job. */
\r
1298 if( pStatus != NULL )
\r
1300 *pStatus = currentStatus;
\r
1303 if( cancelable == false )
\r
1305 TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_CANCEL_FAILED );
\r
1309 /* Update the status of the job. */
\r
1310 pJob->status = IOT_TASKPOOL_STATUS_CANCELED;
\r
1312 /* If the job is cancelable and its current status is 'scheduled' then unlink it from the dispatch
\r
1313 * queue and signal any waiting threads. */
\r
1314 if( currentStatus == IOT_TASKPOOL_STATUS_SCHEDULED )
\r
1316 /* A scheduled work items must be in the dispatch queue. */
\r
1317 IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) );
\r
1319 IotDeQueue_Remove( &pJob->link );
\r
1322 /* If the job current status is 'deferred' then the job has to be pending
\r
1323 * in the timeouts queue. */
\r
1324 else if( currentStatus == IOT_TASKPOOL_STATUS_DEFERRED )
\r
1326 /* Find the timer event associated with the current job. There MUST be one, hence assert if not. */
\r
1327 IotLink_t * pTimerEventLink = IotListDouble_FindFirstMatch( &pTaskPool->timerEventsList, NULL, _matchJobByPointer, pJob );
\r
1328 IotTaskPool_Assert( pTimerEventLink != NULL );
\r
1330 if( pTimerEventLink != NULL )
\r
1332 bool shouldReschedule = false;
\r
1334 /* If the job being cancelled was at the head of the timeouts queue, then we need to reschedule the timer
\r
1335 * with the next job timeout */
\r
1336 IotLink_t * pHeadLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList );
\r
1338 if( pHeadLink == pTimerEventLink )
\r
1340 shouldReschedule = true;
\r
1343 /* Remove the timer event associated with the canceled job and free the associated memory. */
\r
1344 IotListDouble_Remove( pTimerEventLink );
\r
1345 IotTaskPool_FreeTimerEvent( IotLink_Container( _taskPoolTimerEvent_t, pTimerEventLink, link ) );
\r
1347 if( shouldReschedule )
\r
1349 IotLink_t * pNextTimerEventLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList );
\r
1351 if( pNextTimerEventLink != NULL )
\r
1353 _rescheduleDeferredJobsTimer( pTaskPool->timer, IotLink_Container( _taskPoolTimerEvent_t, pNextTimerEventLink, link ) );
\r
1360 /* A cancelable job status should be either 'scheduled' or 'deferrred'. */
\r
1361 IotTaskPool_Assert( ( currentStatus == IOT_TASKPOOL_STATUS_READY ) || ( currentStatus == IOT_TASKPOOL_STATUS_CANCELED ) );
\r
1365 TASKPOOL_NO_FUNCTION_CLEANUP();
\r
1368 /*-----------------------------------------------------------*/
\r
1370 static int32_t _timerEventCompare( const IotLink_t * const pTimerEventLink1,
\r
1371 const IotLink_t * const pTimerEventLink2 )
\r
1373 const _taskPoolTimerEvent_t * const pTimerEvent1 = IotLink_Container( _taskPoolTimerEvent_t,
\r
1376 const _taskPoolTimerEvent_t * const pTimerEvent2 = IotLink_Container( _taskPoolTimerEvent_t,
\r
1380 if( pTimerEvent1->expirationTime < pTimerEvent2->expirationTime )
\r
1385 if( pTimerEvent1->expirationTime > pTimerEvent2->expirationTime )
\r
1393 /*-----------------------------------------------------------*/
\r
1395 static void _rescheduleDeferredJobsTimer( TimerHandle_t const timer,
\r
1396 _taskPoolTimerEvent_t * const pFirstTimerEvent )
\r
1398 uint64_t delta = 0;
\r
1399 TickType_t now = xTaskGetTickCount();
\r
1401 if( pFirstTimerEvent->expirationTime > now )
\r
1403 delta = pFirstTimerEvent->expirationTime - now;
\r
1406 if( delta < TASKPOOL_JOB_RESCHEDULE_DELAY_MS )
\r
1408 delta = TASKPOOL_JOB_RESCHEDULE_DELAY_MS; /* The job will be late... */
\r
1411 IotTaskPool_Assert( delta > 0 );
\r
1413 if( xTimerChangePeriod( timer, ( uint32_t ) delta, portMAX_DELAY ) == pdFAIL )
\r
1415 IotLogWarn( "Failed to re-arm timer for task pool" );
\r
1419 /*-----------------------------------------------------------*/
\r
1421 static void _timerThread( TimerHandle_t xTimer )
\r
1423 _taskPool_t * pTaskPool = pvTimerGetTimerID( xTimer );
\r
1425 IotTaskPool_Assert( pTaskPool );
\r
1427 _taskPoolTimerEvent_t * pTimerEvent = NULL;
\r
1429 IotLogDebug( "Timer thread started for task pool %p.", pTaskPool );
\r
1431 /* Attempt to lock the timer mutex. Return immediately if the mutex cannot be locked.
\r
1432 * If this mutex cannot be locked it means that another thread is manipulating the
\r
1433 * timeouts list, and will reset the timer to fire again, although it will be late.
\r
1435 TASKPOOL_ENTER_CRITICAL();
\r
1437 /* Check again for shutdown and bail out early in case. */
\r
1438 if( _IsShutdownStarted( pTaskPool ) )
\r
1440 TASKPOOL_EXIT_CRITICAL();
\r
1442 /* Complete the shutdown sequence. */
\r
1443 _destroyTaskPool( pTaskPool );
\r
1448 /* Dispatch all deferred job whose timer expired, then reset the timer for the next
\r
1449 * job down the line. */
\r
1452 /* Peek the first event in the timer event list. */
\r
1453 IotLink_t * pLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList );
\r
1455 /* Check if the timer misfired for any reason. */
\r
1456 if( pLink != NULL )
\r
1458 /* Record the current time. */
\r
1459 TickType_t now = xTaskGetTickCount();
\r
1461 /* Extract the job from its envelope. */
\r
1462 pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pLink, link );
\r
1464 /* Check if the first event should be processed now. */
\r
1465 if( pTimerEvent->expirationTime <= now )
\r
1467 /* Remove the timer event for immediate processing. */
\r
1468 IotListDouble_Remove( &( pTimerEvent->link ) );
\r
1472 /* The first element in the timer queue shouldn't be processed yet.
\r
1473 * Arm the timer for when it should be processed and leave altogether. */
\r
1474 _rescheduleDeferredJobsTimer( pTaskPool->timer, pTimerEvent );
\r
1479 /* If there are no timer events to process, terminate this thread. */
\r
1482 IotLogDebug( "No further timer events to process. Exiting timer thread." );
\r
1487 IotLogDebug( "Scheduling job from timer event." );
\r
1489 /* Queue the job associated with the received timer event. */
\r
1490 ( void ) _scheduleInternal( pTaskPool, pTimerEvent->job, 0 );
\r
1492 /* Free the timer event. */
\r
1493 IotTaskPool_FreeTimerEvent( pTimerEvent );
\r
1496 TASKPOOL_EXIT_CRITICAL();
\r