+/*\r
+ * Amazon FreeRTOS Common V1.0.0\r
+ * Copyright (C) 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.\r
+ *\r
+ * Permission is hereby granted, free of charge, to any person obtaining a copy of\r
+ * this software and associated documentation files (the "Software"), to deal in\r
+ * the Software without restriction, including without limitation the rights to\r
+ * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\r
+ * the Software, and to permit persons to whom the Software is furnished to do so,\r
+ * subject to the following conditions:\r
+ *\r
+ * The above copyright notice and this permission notice shall be included in all\r
+ * copies or substantial portions of the Software.\r
+ *\r
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\r
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\r
+ * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\r
+ * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\r
+ * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\r
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\r
+ *\r
+ * http://aws.amazon.com/freertos\r
+ * http://www.FreeRTOS.org\r
+ */\r
+/**\r
+ * @file iot_taskpool.c\r
+ * @brief Implements the task pool functions in iot_taskpool.h\r
+ */\r
+\r
+/* The config header is always included first. */\r
+#include "iot_config.h"\r
+\r
+/* Standard includes. */\r
+#include <stdbool.h>\r
+#include <stddef.h>\r
+#include <stdint.h>\r
+#include <string.h>\r
+\r
+/* Platform layer includes. */\r
+#include "platform/iot_threads.h"\r
+#include "platform/iot_clock.h"\r
+\r
+/* Task pool internal include. */\r
+#include "private/iot_taskpool_internal.h"\r
+\r
+/**\r
+ * @brief Enter a critical section by disabling interrupts.\r
+ *\r
+ */\r
+#define TASKPOOL_ENTER_CRITICAL() taskENTER_CRITICAL()\r
+\r
+/**\r
+ * @brief Enter a critical section by disabling interrupts.\r
+ *\r
+ */\r
+#define TASKPOOL_ENTER_CRITICAL_FROM_ISR() taskENTER_CRITICAL_FROM_ISR()\r
+\r
+/**\r
+ * @brief Exit a critical section by re-enabling interrupts.\r
+ *\r
+ */\r
+#define TASKPOOL_EXIT_CRITICAL() taskEXIT_CRITICAL()\r
+\r
+/**\r
+ * @brief Exit a critical section by re-enabling interrupts.\r
+ *\r
+ */\r
+#define TASKPOOL_EXIT_CRITICAL_FROM_ISR( x ) taskEXIT_CRITICAL_FROM_ISR( x )\r
+\r
+/**\r
+ * @brief Maximum semaphore value for wait operations.\r
+ */\r
+#define TASKPOOL_MAX_SEM_VALUE 0xFFFF\r
+\r
+/**\r
+ * @brief Reschedule delay in milliseconds for deferred jobs.\r
+ */\r
+#define TASKPOOL_JOB_RESCHEDULE_DELAY_MS ( 10ULL )\r
+\r
+/* ---------------------------------------------------------------------------------- */\r
+\r
+/**\r
+ * Doxygen should ignore this section.\r
+ *\r
+ * @brief The system task pool handle for all libraries to use.\r
+ * User application can use the system task pool as well knowing that the usage will be shared with\r
+ * the system libraries as well. The system task pool needs to be initialized before any library is used or\r
+ * before any code that posts jobs to the task pool runs.\r
+ */\r
+_taskPool_t _IotSystemTaskPool = { .dispatchQueue = IOT_DEQUEUE_INITIALIZER };\r
+\r
+/* -------------- Convenience functions to create/recycle/destroy jobs -------------- */\r
+\r
+/**\r
+ * @brief Initializes one instance of a Task pool cache.\r
+ *\r
+ * @param[in] pCache The pre-allocated instance of the cache to initialize.\r
+ */\r
+static void _initJobsCache( _taskPoolCache_t * const pCache );\r
+\r
+/**\r
+ * @brief Initialize a job.\r
+ *\r
+ * @param[in] pJob The job to initialize.\r
+ * @param[in] userCallback The user callback for the job.\r
+ * @param[in] pUserContext The context tp be passed to the callback.\r
+ * @param[in] isStatic A flag to indicate whether the job is statically or synamically allocated.\r
+ */\r
+static void _initializeJob( _taskPoolJob_t * const pJob,\r
+ IotTaskPoolRoutine_t userCallback,\r
+ void * pUserContext,\r
+ bool isStatic );\r
+\r
+/**\r
+ * @brief Extracts and initializes one instance of a job from the cache or, if there is none available, it allocates and initialized a new one.\r
+ *\r
+ * @param[in] pCache The instance of the cache to extract the job from.\r
+ */\r
+static _taskPoolJob_t * _fetchOrAllocateJob( _taskPoolCache_t * const pCache );\r
+\r
+/**\r
+ * Recycles one instance of a job into the cache or, if the cache is full, it destroys it.\r
+ *\r
+ * @param[in] pCache The instance of the cache to recycle the job into.\r
+ * @param[in] pJob The job to recycle.\r
+ *\r
+ */\r
+static void _recycleJob( _taskPoolCache_t * const pCache,\r
+ _taskPoolJob_t * const pJob );\r
+\r
+/**\r
+ * Destroys one instance of a job.\r
+ *\r
+ * @param[in] pJob The job to destroy.\r
+ *\r
+ */\r
+static void _destroyJob( _taskPoolJob_t * const pJob );\r
+\r
+/* -------------- The worker thread procedure for a task pool thread -------------- */\r
+\r
+/**\r
+ * The procedure for a task pool worker thread.\r
+ *\r
+ * @param[in] pUserContext The user context.\r
+ *\r
+ */\r
+static void _taskPoolWorker( void * pUserContext );\r
+\r
+/* -------------- Convenience functions to handle timer events -------------- */\r
+\r
+/**\r
+ * Comparer for the time list.\r
+ *\r
+ * param[in] pTimerEventLink1 The link to the first timer event.\r
+ * param[in] pTimerEventLink1 The link to the first timer event.\r
+ */\r
+static int32_t _timerEventCompare( const IotLink_t * const pTimerEventLink1,\r
+ const IotLink_t * const pTimerEventLink2 );\r
+\r
+/**\r
+ * Reschedules the timer for handling deferred jobs to the next timeout.\r
+ *\r
+ * param[in] timer The timer to reschedule.\r
+ * param[in] pFirstTimerEvent The timer event that carries the timeout and job inforamtion.\r
+ */\r
+static void _rescheduleDeferredJobsTimer( TimerHandle_t const timer,\r
+ _taskPoolTimerEvent_t * const pFirstTimerEvent );\r
+\r
+/**\r
+ * The task pool timer procedure for scheduling deferred jobs.\r
+ *\r
+ * param[in] timer The timer to handle.\r
+ */\r
+static void _timerThread( TimerHandle_t xTimer );\r
+\r
+/* -------------- Convenience functions to create/initialize/destroy the task pool -------------- */\r
+\r
+/**\r
+ * Parameter validation for a task pool initialization.\r
+ *\r
+ * @param[in] pInfo The initialization information for the task pool.\r
+ *\r
+ */\r
+static IotTaskPoolError_t _performTaskPoolParameterValidation( const IotTaskPoolInfo_t * const pInfo );\r
+\r
+/**\r
+ * Initializes a pre-allocated instance of a task pool.\r
+ *\r
+ * @param[in] pInfo The initialization information for the task pool.\r
+ * @param[in] pTaskPool The pre-allocated instance of the task pool to initialize.\r
+ *\r
+ */\r
+static IotTaskPoolError_t _initTaskPoolControlStructures( const IotTaskPoolInfo_t * const pInfo,\r
+ _taskPool_t * const pTaskPool );\r
+\r
+/**\r
+ * Initializes a pre-allocated instance of a task pool.\r
+ *\r
+ * @param[in] pInfo The initialization information for the task pool.\r
+ * @param[out] pTaskPool A pointer to the task pool data structure to initialize.\r
+ *\r
+ */\r
+static IotTaskPoolError_t _createTaskPool( const IotTaskPoolInfo_t * const pInfo,\r
+ _taskPool_t * const pTaskPool );\r
+\r
+/**\r
+ * Destroys one instance of a task pool.\r
+ *\r
+ * @param[in] pTaskPool The task pool to destroy.\r
+ *\r
+ */\r
+static void _destroyTaskPool( _taskPool_t * const pTaskPool );\r
+\r
+/**\r
+ * Check for the exit condition.\r
+ *\r
+ * @param[in] pTaskPool The task pool to destroy.\r
+ *\r
+ */\r
+static bool _IsShutdownStarted( const _taskPool_t * const pTaskPool );\r
+\r
+/**\r
+ * Set the exit condition.\r
+ *\r
+ * @param[in] pTaskPool The task pool to destroy.\r
+ * @param[in] threads The number of threads active in the task pool at shutdown time.\r
+ *\r
+ */\r
+static void _signalShutdown( _taskPool_t * const pTaskPool,\r
+ uint32_t threads );\r
+\r
+/**\r
+ * Places a job in the dispatch queue.\r
+ *\r
+ * @param[in] pTaskPool The task pool to scheduel the job with.\r
+ * @param[in] pJob The job to schedule.\r
+ * @param[in] flags The job flags.\r
+ *\r
+ */\r
+static IotTaskPoolError_t _scheduleInternal( _taskPool_t * const pTaskPool,\r
+ _taskPoolJob_t * const pJob,\r
+ uint32_t flags );\r
+\r
+/**\r
+ * Matches a deferred job in the timer queue with its timer event wrapper.\r
+ *\r
+ * @param[in] pLink A pointer to the timer event link in the timer queue.\r
+ * @param[in] pMatch A pointer to the job to match.\r
+ *\r
+ */\r
+static bool _matchJobByPointer( const IotLink_t * const pLink,\r
+ void * pMatch );\r
+\r
+/**\r
+ * Tries to cancel a job.\r
+ *\r
+ * @param[in] pTaskPool The task pool to cancel an operation against.\r
+ * @param[in] pJob The job to cancel.\r
+ * @param[out] pStatus The status of the job at the time of cancellation.\r
+ *\r
+ */\r
+static IotTaskPoolError_t _tryCancelInternal( _taskPool_t * const pTaskPool,\r
+ _taskPoolJob_t * const pJob,\r
+ IotTaskPoolJobStatus_t * const pStatus );\r
+\r
+/* ---------------------------------------------------------------------------------------------- */\r
+\r
+IotTaskPool_t IotTaskPool_GetSystemTaskPool( void )\r
+{\r
+ return &_IotSystemTaskPool;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotTaskPoolError_t IotTaskPool_CreateSystemTaskPool( const IotTaskPoolInfo_t * const pInfo )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ /* Parameter checking. */\r
+ TASKPOOL_ON_ERROR_GOTO_CLEANUP( _performTaskPoolParameterValidation( pInfo ) );\r
+\r
+ /* Create the system task pool pool. */\r
+ TASKPOOL_SET_AND_GOTO_CLEANUP( _createTaskPool( pInfo, &_IotSystemTaskPool ) );\r
+\r
+ TASKPOOL_NO_FUNCTION_CLEANUP();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotTaskPoolError_t IotTaskPool_Create( const IotTaskPoolInfo_t * const pInfo,\r
+ IotTaskPool_t * const pTaskPool )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ _taskPool_t * pTempTaskPool = NULL;\r
+\r
+ /* Verify that the task pool storage is valid. */\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pTaskPool );\r
+\r
+ /* Parameter checking. */\r
+ TASKPOOL_ON_ERROR_GOTO_CLEANUP( _performTaskPoolParameterValidation( pInfo ) );\r
+\r
+ /* Allocate the memory for the task pool */\r
+ pTempTaskPool = ( _taskPool_t * ) IotTaskPool_MallocTaskPool( sizeof( _taskPool_t ) );\r
+\r
+ if( pTempTaskPool == NULL )\r
+ {\r
+ TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );\r
+ }\r
+\r
+ memset( pTempTaskPool, 0x00, sizeof( _taskPool_t ) );\r
+\r
+ TASKPOOL_SET_AND_GOTO_CLEANUP( _createTaskPool( pInfo, pTempTaskPool ) );\r
+\r
+ TASKPOOL_FUNCTION_CLEANUP();\r
+\r
+ if( TASKPOOL_FAILED( status ) )\r
+ {\r
+ if( pTempTaskPool != NULL )\r
+ {\r
+ IotTaskPool_FreeTaskPool( pTempTaskPool );\r
+ }\r
+ }\r
+ else\r
+ {\r
+ *pTaskPool = pTempTaskPool;\r
+ }\r
+\r
+ TASKPOOL_FUNCTION_CLEANUP_END();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotTaskPoolError_t IotTaskPool_Destroy( IotTaskPool_t taskPoolHandle )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ uint32_t count;\r
+ bool completeShutdown = true;\r
+\r
+ _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;\r
+\r
+ /* Track how many threads the task pool owns. */\r
+ uint32_t activeThreads;\r
+\r
+ /* Parameter checking. */\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pTaskPool );\r
+\r
+ /* Destroying the task pool should be safe, and therefore we will grab the task pool lock.\r
+ * No worker thread or application thread should access any data structure\r
+ * in the task pool while the task pool is being destroyed. */\r
+ TASKPOOL_ENTER_CRITICAL();\r
+ {\r
+ IotLink_t * pItemLink;\r
+\r
+ /* Record how many active threads in the task pool. */\r
+ activeThreads = pTaskPool->activeThreads;\r
+\r
+ /* Destroying a Task pool happens in six (6) stages: First, (1) we clear the job queue and (2) the timer queue.\r
+ * Then (3) we clear the jobs cache. We will then (4) wait for all worker threads to signal exit,\r
+ * before (5) setting the exit condition and wake up all active worker threads. Finally (6) destroying\r
+ * all task pool data structures and release the associated memory.\r
+ */\r
+\r
+ /* (1) Clear the job queue. */\r
+ do\r
+ {\r
+ pItemLink = NULL;\r
+\r
+ pItemLink = IotDeQueue_DequeueHead( &pTaskPool->dispatchQueue );\r
+\r
+ if( pItemLink != NULL )\r
+ {\r
+ _taskPoolJob_t * pJob = IotLink_Container( _taskPoolJob_t, pItemLink, link );\r
+\r
+ _destroyJob( pJob );\r
+ }\r
+ } while( pItemLink );\r
+\r
+ /* (2) Clear the timer queue. */\r
+ {\r
+ _taskPoolTimerEvent_t * pTimerEvent;\r
+\r
+ /* A deferred job may have fired already. Since deferred jobs will go through the same mutex\r
+ * the shutdown sequence is holding at this stage, there is no risk for race conditions. Yet, we\r
+ * need to let the deferred job to destroy the task pool. */\r
+\r
+ pItemLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList );\r
+\r
+ if( pItemLink != NULL )\r
+ {\r
+ TickType_t now = xTaskGetTickCount();\r
+\r
+ pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pItemLink, link );\r
+\r
+ if( pTimerEvent->expirationTime <= now )\r
+ {\r
+ IotLogDebug( "Shutdown will be deferred to the timer thread" );\r
+\r
+ /* Timer may have fired already! Let the timer thread destroy\r
+ * complete the taskpool destruction sequence. */\r
+ completeShutdown = false;\r
+ }\r
+\r
+ /* Remove all timers from the timeout list. */\r
+ for( ; ; )\r
+ {\r
+ pItemLink = IotListDouble_RemoveHead( &pTaskPool->timerEventsList );\r
+\r
+ if( pItemLink == NULL )\r
+ {\r
+ break;\r
+ }\r
+\r
+ pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pItemLink, link );\r
+\r
+ _destroyJob( pTimerEvent->job );\r
+\r
+ IotTaskPool_FreeTimerEvent( pTimerEvent );\r
+ }\r
+ }\r
+ }\r
+\r
+ /* (3) Clear the job cache. */\r
+ do\r
+ {\r
+ pItemLink = NULL;\r
+\r
+ pItemLink = IotListDouble_RemoveHead( &pTaskPool->jobsCache.freeList );\r
+\r
+ if( pItemLink != NULL )\r
+ {\r
+ _taskPoolJob_t * pJob = IotLink_Container( _taskPoolJob_t, pItemLink, link );\r
+\r
+ _destroyJob( pJob );\r
+ }\r
+ } while( pItemLink );\r
+\r
+ /* (4) Set the exit condition. */\r
+ _signalShutdown( pTaskPool, activeThreads );\r
+ }\r
+ TASKPOOL_EXIT_CRITICAL();\r
+\r
+ /* (5) Wait for all active threads to reach the end of their life-span. */\r
+ for( count = 0; count < activeThreads; ++count )\r
+ {\r
+ xSemaphoreTake( pTaskPool->startStopSignal, portMAX_DELAY );\r
+ }\r
+\r
+ IotTaskPool_Assert( uxSemaphoreGetCount( pTaskPool->startStopSignal ) == 0 );\r
+ IotTaskPool_Assert( pTaskPool->activeThreads == 0 );\r
+\r
+ /* (6) Destroy all signaling objects. */\r
+ if( completeShutdown == true )\r
+ {\r
+ _destroyTaskPool( pTaskPool );\r
+\r
+ /* Do not free the system task pool which is statically allocated. */\r
+ if( pTaskPool != &_IotSystemTaskPool )\r
+ {\r
+ IotTaskPool_FreeTaskPool( pTaskPool );\r
+ }\r
+ }\r
+\r
+ TASKPOOL_NO_FUNCTION_CLEANUP();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotTaskPoolError_t IotTaskPool_SetMaxThreads( IotTaskPool_t taskPoolHandle,\r
+ uint32_t maxThreads )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;\r
+\r
+ /* Parameter checking. */\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pTaskPool );\r
+ TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( maxThreads < 1UL );\r
+\r
+ TASKPOOL_NO_FUNCTION_CLEANUP();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotTaskPoolError_t IotTaskPool_CreateJob( IotTaskPoolRoutine_t userCallback,\r
+ void * pUserContext,\r
+ IotTaskPoolJobStorage_t * const pJobStorage,\r
+ IotTaskPoolJob_t * const ppJob )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ /* Parameter checking. */\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( userCallback );\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJobStorage );\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( ppJob );\r
+\r
+ /* Build a job around the user-provided storage. */\r
+ _initializeJob( ( _taskPoolJob_t * ) pJobStorage, userCallback, pUserContext, true );\r
+\r
+ *ppJob = ( IotTaskPoolJob_t ) pJobStorage;\r
+\r
+ TASKPOOL_NO_FUNCTION_CLEANUP();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotTaskPoolError_t IotTaskPool_CreateRecyclableJob( IotTaskPool_t taskPoolHandle,\r
+ IotTaskPoolRoutine_t userCallback,\r
+ void * pUserContext,\r
+ IotTaskPoolJob_t * const ppJob )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;\r
+ _taskPoolJob_t * pTempJob = NULL;\r
+\r
+ /* Parameter checking. */\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( userCallback );\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( ppJob );\r
+\r
+ TASKPOOL_ENTER_CRITICAL();\r
+ {\r
+ /* Bail out early if this task pool is shutting down. */\r
+ pTempJob = _fetchOrAllocateJob( &pTaskPool->jobsCache );\r
+ }\r
+ TASKPOOL_EXIT_CRITICAL();\r
+\r
+ if( pTempJob == NULL )\r
+ {\r
+ IotLogInfo( "Failed to allocate a job." );\r
+\r
+ TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );\r
+ }\r
+\r
+ _initializeJob( pTempJob, userCallback, pUserContext, false );\r
+\r
+ *ppJob = pTempJob;\r
+\r
+ TASKPOOL_NO_FUNCTION_CLEANUP();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotTaskPoolError_t IotTaskPool_DestroyRecyclableJob( IotTaskPool_t taskPoolHandle,\r
+ IotTaskPoolJob_t pJobHandle )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;\r
+ _taskPoolJob_t * pJob = ( _taskPoolJob_t * ) pJobHandle;\r
+\r
+ /* Parameter checking. */\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJobHandle );\r
+\r
+ IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) == false );\r
+\r
+ _destroyJob( pJob );\r
+\r
+ TASKPOOL_NO_FUNCTION_CLEANUP();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotTaskPoolError_t IotTaskPool_RecycleJob( IotTaskPool_t taskPoolHandle,\r
+ IotTaskPoolJob_t pJob )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;\r
+\r
+ /* Parameter checking. */\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJob );\r
+\r
+ TASKPOOL_ENTER_CRITICAL();\r
+ {\r
+ IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) == false );\r
+\r
+ _recycleJob( &pTaskPool->jobsCache, pJob );\r
+ }\r
+ TASKPOOL_EXIT_CRITICAL();\r
+\r
+ TASKPOOL_NO_FUNCTION_CLEANUP();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotTaskPoolError_t IotTaskPool_Schedule( IotTaskPool_t taskPoolHandle,\r
+ IotTaskPoolJob_t pJob,\r
+ uint32_t flags )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;\r
+\r
+ /* Parameter checking. */\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pJob );\r
+ TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( ( flags != 0UL ) && ( flags != IOT_TASKPOOL_JOB_HIGH_PRIORITY ) );\r
+\r
+ TASKPOOL_ENTER_CRITICAL();\r
+ {\r
+ _scheduleInternal( pTaskPool, pJob, flags );\r
+ }\r
+ TASKPOOL_EXIT_CRITICAL();\r
+\r
+ TASKPOOL_NO_FUNCTION_CLEANUP();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotTaskPoolError_t IotTaskPool_ScheduleDeferred( IotTaskPool_t taskPoolHandle,\r
+ IotTaskPoolJob_t job,\r
+ uint32_t timeMs )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;\r
+\r
+ /* Parameter checking. */\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( job );\r
+\r
+ if( timeMs == 0UL )\r
+ {\r
+ TASKPOOL_SET_AND_GOTO_CLEANUP( IotTaskPool_Schedule( pTaskPool, job, 0 ) );\r
+ }\r
+\r
+ TASKPOOL_ENTER_CRITICAL();\r
+ {\r
+ _taskPoolTimerEvent_t * pTimerEvent = IotTaskPool_MallocTimerEvent( sizeof( _taskPoolTimerEvent_t ) );\r
+\r
+ if( pTimerEvent == NULL )\r
+ {\r
+ TASKPOOL_EXIT_CRITICAL();\r
+\r
+ TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );\r
+ }\r
+\r
+ IotLink_t * pTimerEventLink;\r
+\r
+ TickType_t now = xTaskGetTickCount();\r
+\r
+ pTimerEvent->link.pNext = NULL;\r
+ pTimerEvent->link.pPrevious = NULL;\r
+ pTimerEvent->expirationTime = now + pdMS_TO_TICKS( timeMs );\r
+ pTimerEvent->job = job;\r
+\r
+ /* Append the timer event to the timer list. */\r
+ IotListDouble_InsertSorted( &pTaskPool->timerEventsList, &pTimerEvent->link, _timerEventCompare );\r
+\r
+ /* Update the job status to 'scheduled'. */\r
+ job->status = IOT_TASKPOOL_STATUS_DEFERRED;\r
+\r
+ /* Peek the first event in the timer event list. There must be at least one,\r
+ * since we just inserted it. */\r
+ pTimerEventLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList );\r
+ IotTaskPool_Assert( pTimerEventLink != NULL );\r
+\r
+ /* If the event we inserted is at the front of the queue, then\r
+ * we need to reschedule the underlying timer. */\r
+ if( pTimerEventLink == &pTimerEvent->link )\r
+ {\r
+ pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pTimerEventLink, link );\r
+\r
+ _rescheduleDeferredJobsTimer( pTaskPool->timer, pTimerEvent );\r
+ }\r
+ }\r
+ TASKPOOL_EXIT_CRITICAL();\r
+\r
+ TASKPOOL_NO_FUNCTION_CLEANUP();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotTaskPoolError_t IotTaskPool_GetStatus( IotTaskPool_t taskPoolHandle,\r
+ IotTaskPoolJob_t job,\r
+ IotTaskPoolJobStatus_t * const pStatus )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;\r
+\r
+ /* Parameter checking. */\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( job );\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pStatus );\r
+ *pStatus = IOT_TASKPOOL_STATUS_UNDEFINED;\r
+\r
+ TASKPOOL_ENTER_CRITICAL();\r
+ {\r
+ *pStatus = job->status;\r
+ }\r
+ TASKPOOL_EXIT_CRITICAL();\r
+\r
+ TASKPOOL_NO_FUNCTION_CLEANUP();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotTaskPoolError_t IotTaskPool_TryCancel( IotTaskPool_t taskPoolHandle,\r
+ IotTaskPoolJob_t job,\r
+ IotTaskPoolJobStatus_t * const pStatus )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ _taskPool_t * pTaskPool = ( _taskPool_t * ) taskPoolHandle;\r
+\r
+ /* Parameter checking. */\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( taskPoolHandle );\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( job );\r
+\r
+ if( pStatus != NULL )\r
+ {\r
+ *pStatus = IOT_TASKPOOL_STATUS_UNDEFINED;\r
+ }\r
+\r
+ TASKPOOL_ENTER_CRITICAL();\r
+ {\r
+ status = _tryCancelInternal( pTaskPool, job, pStatus );\r
+ }\r
+ TASKPOOL_EXIT_CRITICAL();\r
+\r
+ TASKPOOL_NO_FUNCTION_CLEANUP();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+IotTaskPoolJobStorage_t * IotTaskPool_GetJobStorageFromHandle( IotTaskPoolJob_t pJob )\r
+{\r
+ return ( IotTaskPoolJobStorage_t * ) pJob;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+const char * IotTaskPool_strerror( IotTaskPoolError_t status )\r
+{\r
+ const char * pMessage = NULL;\r
+\r
+ switch( status )\r
+ {\r
+ case IOT_TASKPOOL_SUCCESS:\r
+ pMessage = "SUCCESS";\r
+ break;\r
+\r
+ case IOT_TASKPOOL_BAD_PARAMETER:\r
+ pMessage = "BAD PARAMETER";\r
+ break;\r
+\r
+ case IOT_TASKPOOL_ILLEGAL_OPERATION:\r
+ pMessage = "ILLEGAL OPERATION";\r
+ break;\r
+\r
+ case IOT_TASKPOOL_NO_MEMORY:\r
+ pMessage = "NO MEMORY";\r
+ break;\r
+\r
+ case IOT_TASKPOOL_SHUTDOWN_IN_PROGRESS:\r
+ pMessage = "SHUTDOWN IN PROGRESS";\r
+ break;\r
+\r
+ case IOT_TASKPOOL_CANCEL_FAILED:\r
+ pMessage = "CANCEL FAILED";\r
+ break;\r
+\r
+ default:\r
+ pMessage = "INVALID STATUS";\r
+ break;\r
+ }\r
+\r
+ return pMessage;\r
+}\r
+\r
+/* ---------------------------------------------------------------------------------------------- */\r
+/* ---------------------------------------------------------------------------------------------- */\r
+/* ---------------------------------------------------------------------------------------------- */\r
+\r
+static IotTaskPoolError_t _performTaskPoolParameterValidation( const IotTaskPoolInfo_t * const pInfo )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ /* Check input values for consistency. */\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pInfo );\r
+ TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( pInfo->minThreads > pInfo->maxThreads );\r
+ TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( pInfo->minThreads < 1UL );\r
+ TASKPOOL_ON_ARG_ERROR_GOTO_CLEANUP( pInfo->maxThreads < 1UL );\r
+\r
+ TASKPOOL_NO_FUNCTION_CLEANUP();\r
+}\r
+\r
+static IotTaskPoolError_t _initTaskPoolControlStructures( const IotTaskPoolInfo_t * const pInfo,\r
+ _taskPool_t * const pTaskPool )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ bool semStartStopInit = false;\r
+ bool semDispatchInit = false;\r
+\r
+ /* Initialize a job data structures that require no de-initialization.\r
+ * All other data structures carry a value of 'NULL' before initailization.\r
+ */\r
+ IotDeQueue_Create( &pTaskPool->dispatchQueue );\r
+ IotListDouble_Create( &pTaskPool->timerEventsList );\r
+\r
+ _initJobsCache( &pTaskPool->jobsCache );\r
+\r
+ /* Initialize the semaphore to ensure all threads have started. */\r
+ pTaskPool->startStopSignal = xSemaphoreCreateCountingStatic( pInfo->minThreads, 0, &pTaskPool->startStopSignalBuffer );\r
+\r
+ if( pTaskPool->startStopSignal != NULL )\r
+ {\r
+ semStartStopInit = true;\r
+\r
+ /* Initialize the semaphore for waiting for incoming work. */\r
+ pTaskPool->dispatchSignal = xSemaphoreCreateCountingStatic( TASKPOOL_MAX_SEM_VALUE, 0, &pTaskPool->dispatchSignalBuffer );\r
+\r
+ if( pTaskPool->dispatchSignal != NULL )\r
+ {\r
+ semDispatchInit = true;\r
+ }\r
+ else\r
+ {\r
+ TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );\r
+ }\r
+ }\r
+ else\r
+ {\r
+ TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );\r
+ }\r
+\r
+ TASKPOOL_FUNCTION_CLEANUP();\r
+\r
+ if( TASKPOOL_FAILED( status ) )\r
+ {\r
+ if( semStartStopInit )\r
+ {\r
+ vSemaphoreDelete( &pTaskPool->startStopSignal );\r
+ }\r
+\r
+ if( semDispatchInit )\r
+ {\r
+ vSemaphoreDelete( &pTaskPool->dispatchSignal );\r
+ }\r
+ }\r
+\r
+ TASKPOOL_FUNCTION_CLEANUP_END();\r
+}\r
+\r
+static IotTaskPoolError_t _createTaskPool( const IotTaskPoolInfo_t * const pInfo,\r
+ _taskPool_t * const pTaskPool )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ uint32_t count;\r
+ uint32_t threadsCreated = 0;\r
+\r
+ /* Check input values for consistency. */\r
+ TASKPOOL_ON_NULL_ARG_GOTO_CLEANUP( pTaskPool );\r
+\r
+ /* Zero out all data structures. */\r
+ memset( ( void * ) pTaskPool, 0x00, sizeof( _taskPool_t ) );\r
+\r
+ /* Initialize all internal data structure prior to creating all threads. */\r
+ TASKPOOL_ON_ERROR_GOTO_CLEANUP( _initTaskPoolControlStructures( pInfo, pTaskPool ) );\r
+\r
+ /* Create the timer mutex for a new connection. */\r
+ pTaskPool->timer = xTimerCreate( NULL, portMAX_DELAY, pdFALSE, ( void * ) pTaskPool, _timerThread );\r
+\r
+ if( pTaskPool->timer == NULL )\r
+ {\r
+ IotLogError( "Failed to create timer for task pool." );\r
+\r
+ TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );\r
+ }\r
+\r
+ /* The task pool will initialize the minimum number of threads reqeusted by the user upon start. */\r
+ /* When a thread is created, it will signal a semaphore to signify that it is about to wait on incoming */\r
+ /* jobs. A thread can be woken up for exit or for new jobs only at that point in time. */\r
+ /* The exit condition is setting the maximum number of threads to 0. */\r
+\r
+ /* Create the minimum number of threads specified by the user, and if one fails shutdown and return error. */\r
+ for( ; threadsCreated < pInfo->minThreads; )\r
+ {\r
+ TaskHandle_t task = NULL;\r
+\r
+ BaseType_t res = xTaskCreate( _taskPoolWorker,\r
+ NULL,\r
+ pInfo->stackSize,\r
+ pTaskPool,\r
+ pInfo->priority,\r
+ &task );\r
+\r
+ /* Create one thread. */\r
+ if( res == pdFALSE )\r
+ {\r
+ IotLogError( "Could not create worker thread! Exiting..." );\r
+\r
+ /* If creating one thread fails, set error condition and exit the loop. */\r
+ TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_NO_MEMORY );\r
+ }\r
+\r
+ /* Upon successful thread creation, increase the number of active threads. */\r
+ pTaskPool->activeThreads++;\r
+ IotTaskPool_Assert( task != NULL );\r
+\r
+ ++threadsCreated;\r
+ }\r
+\r
+ TASKPOOL_FUNCTION_CLEANUP();\r
+\r
+ /* Wait for threads to be ready to wait on the condition, so that threads are actually able to receive messages. */\r
+ for( count = 0; count < threadsCreated; ++count )\r
+ {\r
+ xSemaphoreTake( pTaskPool->startStopSignal, portMAX_DELAY );\r
+ }\r
+\r
+ /* In case of failure, wait on the created threads to exit. */\r
+ if( TASKPOOL_FAILED( status ) )\r
+ {\r
+ /* Set the exit condition for the newly created threads. */\r
+ _signalShutdown( pTaskPool, threadsCreated );\r
+\r
+ /* Signal all threads to exit. */\r
+ for( count = 0; count < threadsCreated; ++count )\r
+ {\r
+ xSemaphoreTake( pTaskPool->startStopSignal, portMAX_DELAY );\r
+ }\r
+\r
+ _destroyTaskPool( pTaskPool );\r
+ }\r
+\r
+ pTaskPool->running = true;\r
+\r
+ TASKPOOL_FUNCTION_CLEANUP_END();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _destroyTaskPool( _taskPool_t * const pTaskPool )\r
+{\r
+ if( pTaskPool->timer != NULL )\r
+ {\r
+ xTimerDelete( pTaskPool->timer, 0 );\r
+ }\r
+\r
+ if( pTaskPool->dispatchSignal != NULL )\r
+ {\r
+ vSemaphoreDelete( pTaskPool->dispatchSignal );\r
+ }\r
+\r
+ if( pTaskPool->startStopSignal != NULL )\r
+ {\r
+ vSemaphoreDelete( pTaskPool->startStopSignal );\r
+ }\r
+}\r
+\r
+/* ---------------------------------------------------------------------------------------------- */\r
+\r
+static void _taskPoolWorker( void * pUserContext )\r
+{\r
+ IotTaskPool_Assert( pUserContext != NULL );\r
+\r
+ IotTaskPoolRoutine_t userCallback = NULL;\r
+ bool running = true;\r
+\r
+ /* Extract pTaskPool pointer from context. */\r
+ _taskPool_t * pTaskPool = ( _taskPool_t * ) pUserContext;\r
+\r
+ /* Signal that this worker completed initialization and it is ready to receive notifications. */\r
+ ( void ) xSemaphoreGive( pTaskPool->startStopSignal );\r
+\r
+ /* OUTER LOOP: it controls the lifetiem of the worker thread: exit condition for a worker thread\r
+ * is setting maxThreads to zero. A worker thread is running until the maximum number of allowed\r
+ * threads is not zero and the active threads are less than the maximum number of allowed threads.\r
+ */\r
+ do\r
+ {\r
+ IotLink_t * pFirst = NULL;\r
+ _taskPoolJob_t * pJob = NULL;\r
+\r
+ /* Wait on incoming notifications... */\r
+ xSemaphoreTake( pTaskPool->dispatchSignal, portMAX_DELAY );\r
+\r
+ /* Acquire the lock to check the exit condition, and release the lock if the exit condition is verified,\r
+ * or before waiting for incoming notifications.\r
+ */\r
+ TASKPOOL_ENTER_CRITICAL();\r
+ {\r
+ /* If the exit condition is verified, update the number of active threads and exit the loop. */\r
+ if( _IsShutdownStarted( pTaskPool ) )\r
+ {\r
+ IotLogDebug( "Worker thread exiting because exit condition was set." );\r
+\r
+ /* Decrease the number of active threads. */\r
+ pTaskPool->activeThreads--;\r
+\r
+ TASKPOOL_EXIT_CRITICAL();\r
+\r
+ /* Signal that this worker is exiting. */\r
+ xSemaphoreGive( pTaskPool->startStopSignal );\r
+\r
+ /* On shutdown, abandon the OUTER LOOP immediately. */\r
+ break;\r
+ }\r
+\r
+ /* Dequeue the first job in FIFO order. */\r
+ pFirst = IotDeQueue_DequeueHead( &pTaskPool->dispatchQueue );\r
+\r
+ /* If there is indeed a job, then update status under lock, and release the lock before processing the job. */\r
+ if( pFirst != NULL )\r
+ {\r
+ /* Extract the job from its link. */\r
+ pJob = IotLink_Container( _taskPoolJob_t, pFirst, link );\r
+\r
+ /* Update status to 'executing'. */\r
+ pJob->status = IOT_TASKPOOL_STATUS_COMPLETED;\r
+ userCallback = pJob->userCallback;\r
+ }\r
+ }\r
+ TASKPOOL_EXIT_CRITICAL();\r
+\r
+ /* INNER LOOP: it controls the execution of jobs: the exit condition is the lack of a job to execute. */\r
+ while( pJob != NULL )\r
+ {\r
+ /* Process the job by invoking the associated callback with the user context.\r
+ * This task pool thread will not be available until the user callback returns.\r
+ */\r
+ {\r
+ IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) == false );\r
+ IotTaskPool_Assert( userCallback != NULL );\r
+\r
+ userCallback( pTaskPool, pJob, pJob->pUserContext );\r
+\r
+ /* This job is finished, clear its pointer. */\r
+ pJob = NULL;\r
+ userCallback = NULL;\r
+\r
+ /* If this thread exceeded the quota, then let it terminate. */\r
+ if( running == false )\r
+ {\r
+ /* Abandon the INNER LOOP. Execution will tranfer back to the OUTER LOOP condition. */\r
+ break;\r
+ }\r
+ }\r
+\r
+ /* Acquire the lock before updating the job status. */\r
+ TASKPOOL_ENTER_CRITICAL();\r
+ {\r
+ /* Try and dequeue the next job in the dispatch queue. */\r
+ IotLink_t * pItem = NULL;\r
+\r
+ /* Dequeue the next job from the dispatch queue. */\r
+ pItem = IotDeQueue_DequeueHead( &pTaskPool->dispatchQueue );\r
+\r
+ /* If there is no job left in the dispatch queue, update the worker status and leave. */\r
+ if( pItem == NULL )\r
+ {\r
+ TASKPOOL_EXIT_CRITICAL();\r
+\r
+ /* Abandon the INNER LOOP. Execution will tranfer back to the OUTER LOOP condition. */\r
+ break;\r
+ }\r
+ else\r
+ {\r
+ pJob = IotLink_Container( _taskPoolJob_t, pItem, link );\r
+\r
+ userCallback = pJob->userCallback;\r
+ }\r
+\r
+ pJob->status = IOT_TASKPOOL_STATUS_COMPLETED;\r
+ }\r
+ TASKPOOL_EXIT_CRITICAL();\r
+ }\r
+ } while( running == true );\r
+\r
+ vTaskDelete( NULL );\r
+}\r
+\r
+/* ---------------------------------------------------------------------------------------------- */\r
+\r
+static void _initJobsCache( _taskPoolCache_t * const pCache )\r
+{\r
+ IotDeQueue_Create( &pCache->freeList );\r
+\r
+ pCache->freeCount = 0;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _initializeJob( _taskPoolJob_t * const pJob,\r
+ IotTaskPoolRoutine_t userCallback,\r
+ void * pUserContext,\r
+ bool isStatic )\r
+{\r
+ pJob->link.pNext = NULL;\r
+ pJob->link.pPrevious = NULL;\r
+ pJob->userCallback = userCallback;\r
+ pJob->pUserContext = pUserContext;\r
+\r
+ if( isStatic )\r
+ {\r
+ pJob->flags = IOT_TASK_POOL_INTERNAL_STATIC;\r
+ pJob->status = IOT_TASKPOOL_STATUS_READY;\r
+ }\r
+ else\r
+ {\r
+ pJob->status = IOT_TASKPOOL_STATUS_READY;\r
+ }\r
+}\r
+\r
+static _taskPoolJob_t * _fetchOrAllocateJob( _taskPoolCache_t * const pCache )\r
+{\r
+ _taskPoolJob_t * pJob = NULL;\r
+ IotLink_t * pLink = IotListDouble_RemoveHead( &( pCache->freeList ) );\r
+\r
+ if( pLink != NULL )\r
+ {\r
+ pJob = IotLink_Container( _taskPoolJob_t, pLink, link );\r
+ }\r
+\r
+ /* If there is no available job in the cache, then allocate one. */\r
+ if( pJob == NULL )\r
+ {\r
+ pJob = ( _taskPoolJob_t * ) IotTaskPool_MallocJob( sizeof( _taskPoolJob_t ) );\r
+\r
+ if( pJob != NULL )\r
+ {\r
+ memset( pJob, 0x00, sizeof( _taskPoolJob_t ) );\r
+ }\r
+ else\r
+ {\r
+ /* Log alocation failure for troubleshooting purposes. */\r
+ IotLogInfo( "Failed to allocate job." );\r
+ }\r
+ }\r
+ /* If there was a job in the cache, then make sure we keep the counters up-to-date. */\r
+ else\r
+ {\r
+ IotTaskPool_Assert( pCache->freeCount > 0 );\r
+\r
+ pCache->freeCount--;\r
+ }\r
+\r
+ return pJob;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _recycleJob( _taskPoolCache_t * const pCache,\r
+ _taskPoolJob_t * const pJob )\r
+{\r
+ /* We should never try and recycling a job that is linked into some queue. */\r
+ IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) == false );\r
+\r
+ /* We will recycle the job if there is space in the cache. */\r
+ if( pCache->freeCount < IOT_TASKPOOL_JOBS_RECYCLE_LIMIT )\r
+ {\r
+ /* Destroy user data, for added safety&security. */\r
+ pJob->userCallback = NULL;\r
+ pJob->pUserContext = NULL;\r
+\r
+ /* Reset the status for added debuggability. */\r
+ pJob->status = IOT_TASKPOOL_STATUS_UNDEFINED;\r
+\r
+ IotListDouble_InsertTail( &pCache->freeList, &pJob->link );\r
+\r
+ pCache->freeCount++;\r
+\r
+ IotTaskPool_Assert( pCache->freeCount >= 1 );\r
+ }\r
+ else\r
+ {\r
+ _destroyJob( pJob );\r
+ }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _destroyJob( _taskPoolJob_t * const pJob )\r
+{\r
+ /* Destroy user data, for added safety & security. */\r
+ pJob->userCallback = NULL;\r
+ pJob->pUserContext = NULL;\r
+\r
+ /* Reset the status for added debuggability. */\r
+ pJob->status = IOT_TASKPOOL_STATUS_UNDEFINED;\r
+\r
+ /* Only dispose of dynamically allocated jobs. */\r
+ if( ( pJob->flags & IOT_TASK_POOL_INTERNAL_STATIC ) == 0UL )\r
+ {\r
+ IotTaskPool_FreeJob( pJob );\r
+ }\r
+}\r
+\r
+/* ---------------------------------------------------------------------------------------------- */\r
+\r
+static bool _IsShutdownStarted( const _taskPool_t * const pTaskPool )\r
+{\r
+ return( pTaskPool->running == false );\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _signalShutdown( _taskPool_t * const pTaskPool,\r
+ uint32_t threads )\r
+{\r
+ uint32_t count;\r
+\r
+ /* Set the exit condition. */\r
+ pTaskPool->running = false;\r
+\r
+ /* Broadcast to all active threads to wake-up. Active threads do check the exit condition right after wakein up. */\r
+ for( count = 0; count < threads; ++count )\r
+ {\r
+ ( void ) xSemaphoreGive( pTaskPool->dispatchSignal );\r
+ }\r
+}\r
+\r
+/* ---------------------------------------------------------------------------------------------- */\r
+\r
+static IotTaskPoolError_t _scheduleInternal( _taskPool_t * const pTaskPool,\r
+ _taskPoolJob_t * const pJob,\r
+ uint32_t flags )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ /* Update the job status to 'scheduled'. */\r
+ pJob->status = IOT_TASKPOOL_STATUS_SCHEDULED;\r
+\r
+ BaseType_t higherPriorityTaskWoken;\r
+\r
+ /* Append the job to the dispatch queue. */\r
+ IotDeQueue_EnqueueTail( &pTaskPool->dispatchQueue, &pJob->link );\r
+\r
+ /* Signal a worker to pick up the job. */\r
+ ( void ) xSemaphoreGiveFromISR( pTaskPool->dispatchSignal, &higherPriorityTaskWoken );\r
+\r
+ portYIELD_FROM_ISR( higherPriorityTaskWoken );\r
+\r
+ TASKPOOL_NO_FUNCTION_CLEANUP_NOLABEL();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static bool _matchJobByPointer( const IotLink_t * const pLink,\r
+ void * pMatch )\r
+{\r
+ const _taskPoolJob_t * const pJob = ( _taskPoolJob_t * ) pMatch;\r
+\r
+ const _taskPoolTimerEvent_t * const pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pLink, link );\r
+\r
+ if( pJob == pTimerEvent->job )\r
+ {\r
+ return true;\r
+ }\r
+\r
+ return false;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static IotTaskPoolError_t _tryCancelInternal( _taskPool_t * const pTaskPool,\r
+ _taskPoolJob_t * const pJob,\r
+ IotTaskPoolJobStatus_t * const pStatus )\r
+{\r
+ TASKPOOL_FUNCTION_ENTRY( IOT_TASKPOOL_SUCCESS );\r
+\r
+ bool cancelable = false;\r
+\r
+ /* We can only cancel jobs that are either 'ready' (waiting to be scheduled). 'deferred', or 'scheduled'. */\r
+\r
+ IotTaskPoolJobStatus_t currentStatus = pJob->status;\r
+\r
+ switch( currentStatus )\r
+ {\r
+ case IOT_TASKPOOL_STATUS_READY:\r
+ case IOT_TASKPOOL_STATUS_DEFERRED:\r
+ case IOT_TASKPOOL_STATUS_SCHEDULED:\r
+ case IOT_TASKPOOL_STATUS_CANCELED:\r
+ cancelable = true;\r
+ break;\r
+\r
+ case IOT_TASKPOOL_STATUS_COMPLETED:\r
+ /* Log mesggesong purposes. */\r
+ IotLogWarn( "Attempt to cancel a job that is already executing, or canceled." );\r
+ break;\r
+\r
+ default:\r
+ /* Log mesggesong purposes. */\r
+ IotLogError( "Attempt to cancel a job with an undefined state." );\r
+ break;\r
+ }\r
+\r
+ /* Update the returned status to the current status of the job. */\r
+ if( pStatus != NULL )\r
+ {\r
+ *pStatus = currentStatus;\r
+ }\r
+\r
+ if( cancelable == false )\r
+ {\r
+ TASKPOOL_SET_AND_GOTO_CLEANUP( IOT_TASKPOOL_CANCEL_FAILED );\r
+ }\r
+ else\r
+ {\r
+ /* Update the status of the job. */\r
+ pJob->status = IOT_TASKPOOL_STATUS_CANCELED;\r
+\r
+ /* If the job is cancelable and its current status is 'scheduled' then unlink it from the dispatch\r
+ * queue and signal any waiting threads. */\r
+ if( currentStatus == IOT_TASKPOOL_STATUS_SCHEDULED )\r
+ {\r
+ /* A scheduled work items must be in the dispatch queue. */\r
+ IotTaskPool_Assert( IotLink_IsLinked( &pJob->link ) );\r
+\r
+ IotDeQueue_Remove( &pJob->link );\r
+ }\r
+\r
+ /* If the job current status is 'deferred' then the job has to be pending\r
+ * in the timeouts queue. */\r
+ else if( currentStatus == IOT_TASKPOOL_STATUS_DEFERRED )\r
+ {\r
+ /* Find the timer event associated with the current job. There MUST be one, hence assert if not. */\r
+ IotLink_t * pTimerEventLink = IotListDouble_FindFirstMatch( &pTaskPool->timerEventsList, NULL, _matchJobByPointer, pJob );\r
+ IotTaskPool_Assert( pTimerEventLink != NULL );\r
+\r
+ if( pTimerEventLink != NULL )\r
+ {\r
+ bool shouldReschedule = false;\r
+\r
+ /* If the job being cancelled was at the head of the timeouts queue, then we need to reschedule the timer\r
+ * with the next job timeout */\r
+ IotLink_t * pHeadLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList );\r
+\r
+ if( pHeadLink == pTimerEventLink )\r
+ {\r
+ shouldReschedule = true;\r
+ }\r
+\r
+ /* Remove the timer event associated with the canceled job and free the associated memory. */\r
+ IotListDouble_Remove( pTimerEventLink );\r
+ IotTaskPool_FreeTimerEvent( IotLink_Container( _taskPoolTimerEvent_t, pTimerEventLink, link ) );\r
+\r
+ if( shouldReschedule )\r
+ {\r
+ IotLink_t * pNextTimerEventLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList );\r
+\r
+ if( pNextTimerEventLink != NULL )\r
+ {\r
+ _rescheduleDeferredJobsTimer( pTaskPool->timer, IotLink_Container( _taskPoolTimerEvent_t, pNextTimerEventLink, link ) );\r
+ }\r
+ }\r
+ }\r
+ }\r
+ else\r
+ {\r
+ /* A cancelable job status should be either 'scheduled' or 'deferrred'. */\r
+ IotTaskPool_Assert( ( currentStatus == IOT_TASKPOOL_STATUS_READY ) || ( currentStatus == IOT_TASKPOOL_STATUS_CANCELED ) );\r
+ }\r
+ }\r
+\r
+ TASKPOOL_NO_FUNCTION_CLEANUP();\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static int32_t _timerEventCompare( const IotLink_t * const pTimerEventLink1,\r
+ const IotLink_t * const pTimerEventLink2 )\r
+{\r
+ const _taskPoolTimerEvent_t * const pTimerEvent1 = IotLink_Container( _taskPoolTimerEvent_t,\r
+ pTimerEventLink1,\r
+ link );\r
+ const _taskPoolTimerEvent_t * const pTimerEvent2 = IotLink_Container( _taskPoolTimerEvent_t,\r
+ pTimerEventLink2,\r
+ link );\r
+\r
+ if( pTimerEvent1->expirationTime < pTimerEvent2->expirationTime )\r
+ {\r
+ return -1;\r
+ }\r
+\r
+ if( pTimerEvent1->expirationTime > pTimerEvent2->expirationTime )\r
+ {\r
+ return 1;\r
+ }\r
+\r
+ return 0;\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _rescheduleDeferredJobsTimer( TimerHandle_t const timer,\r
+ _taskPoolTimerEvent_t * const pFirstTimerEvent )\r
+{\r
+ uint64_t delta = 0;\r
+ TickType_t now = xTaskGetTickCount();\r
+\r
+ if( pFirstTimerEvent->expirationTime > now )\r
+ {\r
+ delta = pFirstTimerEvent->expirationTime - now;\r
+ }\r
+\r
+ if( delta < TASKPOOL_JOB_RESCHEDULE_DELAY_MS )\r
+ {\r
+ delta = TASKPOOL_JOB_RESCHEDULE_DELAY_MS; /* The job will be late... */\r
+ }\r
+\r
+ IotTaskPool_Assert( delta > 0 );\r
+\r
+ if( xTimerChangePeriod( timer, ( uint32_t ) delta, portMAX_DELAY ) == pdFAIL )\r
+ {\r
+ IotLogWarn( "Failed to re-arm timer for task pool" );\r
+ }\r
+}\r
+\r
+/*-----------------------------------------------------------*/\r
+\r
+static void _timerThread( TimerHandle_t xTimer )\r
+{\r
+ _taskPool_t * pTaskPool = pvTimerGetTimerID( xTimer );\r
+\r
+ IotTaskPool_Assert( pTaskPool );\r
+\r
+ _taskPoolTimerEvent_t * pTimerEvent = NULL;\r
+\r
+ IotLogDebug( "Timer thread started for task pool %p.", pTaskPool );\r
+\r
+ /* Attempt to lock the timer mutex. Return immediately if the mutex cannot be locked.\r
+ * If this mutex cannot be locked it means that another thread is manipulating the\r
+ * timeouts list, and will reset the timer to fire again, although it will be late.\r
+ */\r
+ TASKPOOL_ENTER_CRITICAL();\r
+ {\r
+ /* Check again for shutdown and bail out early in case. */\r
+ if( _IsShutdownStarted( pTaskPool ) )\r
+ {\r
+ TASKPOOL_EXIT_CRITICAL();\r
+\r
+ /* Complete the shutdown sequence. */\r
+ _destroyTaskPool( pTaskPool );\r
+\r
+ return;\r
+ }\r
+\r
+ /* Dispatch all deferred job whose timer expired, then reset the timer for the next\r
+ * job down the line. */\r
+ for( ; ; )\r
+ {\r
+ /* Peek the first event in the timer event list. */\r
+ IotLink_t * pLink = IotListDouble_PeekHead( &pTaskPool->timerEventsList );\r
+\r
+ /* Check if the timer misfired for any reason. */\r
+ if( pLink != NULL )\r
+ {\r
+ /* Record the current time. */\r
+ TickType_t now = xTaskGetTickCount();\r
+\r
+ /* Extract the job from its envelope. */\r
+ pTimerEvent = IotLink_Container( _taskPoolTimerEvent_t, pLink, link );\r
+\r
+ /* Check if the first event should be processed now. */\r
+ if( pTimerEvent->expirationTime <= now )\r
+ {\r
+ /* Remove the timer event for immediate processing. */\r
+ IotListDouble_Remove( &( pTimerEvent->link ) );\r
+ }\r
+ else\r
+ {\r
+ /* The first element in the timer queue shouldn't be processed yet.\r
+ * Arm the timer for when it should be processed and leave altogether. */\r
+ _rescheduleDeferredJobsTimer( pTaskPool->timer, pTimerEvent );\r
+\r
+ break;\r
+ }\r
+ }\r
+ /* If there are no timer events to process, terminate this thread. */\r
+ else\r
+ {\r
+ IotLogDebug( "No further timer events to process. Exiting timer thread." );\r
+\r
+ break;\r
+ }\r
+\r
+ IotLogDebug( "Scheduling job from timer event." );\r
+\r
+ /* Queue the job associated with the received timer event. */\r
+ ( void ) _scheduleInternal( pTaskPool, pTimerEvent->job, 0 );\r
+\r
+ /* Free the timer event. */\r
+ IotTaskPool_FreeTimerEvent( pTimerEvent );\r
+ }\r
+ }\r
+ TASKPOOL_EXIT_CRITICAL();\r
+}\r