Processing Queue Sets
ProcessingQueueSet
-
typedef struct fluxEngine_C_v1_ProcessingQueueSet fluxEngine_C_v1_ProcessingQueueSet
Processing Queue Set.
This opaque data structure wraps a processing queue set. It may be created via the fluxEngine_C_v1_ProcessingQueueSet_create() function and destroyed via the fluxEngine_C_v1_destroy_processing_queue_set() function.
A processing queue set is the basic building block that allows data to be processed. Only one thread may perform data processing with the same processing queue set at the same time; locking is used to ensure that this happens.
The user is free to create as many processing queue sets for a given handle as they wish. Processing via multiple processing queue sets may happen from different threads at the same time.
ProcessingQueueSet_create
-
int fluxEngine_C_v1_ProcessingQueueSet_create(fluxEngine_C_v1_Handle *handle, fluxEngine_C_v1_ProcessingQueueSet **processing_queue_set, fluxEngine_C_v1_Error **error)
Create a new processing queue set.
Creates a new processing queue set for a given fluxEngine handle. This may then be used to create processing contexts.
If the call is successful, a pointer to the newly created processing queue set will be stored in
processing_queue_set
. The is the typical usage pattern of this method:The following specific error codes may be returned by this function:fluxEngine_C_v1_Handle* handle = ...; fluxEngine_C_v1_Error* error = NULL; fluxEngine_C_v1_ProcessingQueueSet* processing_queue_set = NULL; int ret = fluxEngine_C_v1_init(handle, &processing_queue_set, &error); if (ret != 0) { // perform error handling // ... // cleanup error structure fluxEngine_C_v1_Error_free(error); // don't proceed return; } // processing_queue_set is now valid // ... // at the end, when the processing queue set is no longer // needed: fluxEngine_C_v1_destroy_processing_queue_set(processing_queue_set);
fluxEngine_C_v1_ErrorCode_Unknown
fluxEngine_C_v1_ErrorCode_AllocationFailure
fluxEngine_C_v1_ErrorCode_InvalidArgument
- Parameters:
handle – The fluxEngine handle
processing_queue_set – [out] A pointer to the resulting processing queue set, on success
error – [out] The resulting error object, if an error occurs. See the documentation of the fluxEngine_C_v1_Error structure for details on error handling.
- Returns:
0
on success,-1
on failure
ProcessingQueueSet_create_threads
-
int fluxEngine_C_v1_ProcessingQueueSet_create_threads(fluxEngine_C_v1_ProcessingQueueSet *processing_queue_set, int thread_count, fluxEngine_C_v1_Error **error)
Create processing threads for a given processing queue set.
fluxEngine may use parallization to speed up processing. In order to achieve this background threads must be created to run on additional CPU cores.
Calling this function is optional: by default processing will be single-threaded.
This function will start
thread_count - 1
threads when called, as the thread that asks for processing is always considered to be the first thread (with id0
). For example, if4
is supplied tothread_count
this function will start3
threads that run in the background. The thread that the user uses to call fluxEngine_C_v1_ProcessingContext_process_next() will be considered the thread with id0
, making processing use a total of4
threads, which is the value supplied forthread_count
.This function may only be called if there are currently no background threads associated with this processing queue set. Otherwise fluxEngine_C_v1_stop_processing_queue_set_threads() must be called first to change the number of threads.
Any processing context that was created for the same processing queue set before a call to this function was made is marked as invalid and can only be destroyed, but not used anymore.
The following specific error codes may be returned by this function:
fluxEngine_C_v1_ErrorCode_Unknown
fluxEngine_C_v1_ErrorCode_AllocationFailure
fluxEngine_C_v1_ErrorCode_InvalidArgument
fluxEngine_C_v1_ErrorCode_HandleNoLongerValid
fluxEngine_C_v1_ErrorCode_ThreadCreationError
- Parameters:
processing_queue_set – The processing queue set to create the threads for
thread_count – The number of threads to use for parallel processing (one less than this number will be created by this function, see the description for details)
error – [out] The resulting error object, if an error occurs. See the documentation of the fluxEngine_C_v1_Error structure for details on error handling.
- Returns:
0
on success,-1
on failure
ProcessingQueueSet_create_threads_ex
-
int fluxEngine_C_v1_ProcessingQueueSet_create_threads_ex(fluxEngine_C_v1_ProcessingQueueSet *processing_queue_set, int thread_count, fluxEngine_C_v1_ThreadInitFunction init_function, void *init_function_context, fluxEngine_C_v1_Error **error)
Create processing threads for a given processing queue set (extended version)
Please read the documentation of fluxEngine_C_v1_ProcessingQueueSet_create_threads() for general details.
This extended function allows the user to supply a thread initialization function that will be called at the beginning of the newly created background threads. This allows the user to customize the thread properties (such as the thread priority or the CPU affinity) themselves.
This function will only return once all thread initialization functions have run.
The thread initialization functions are only called for the backgronud threads that are started by this function; this means that for a
thread_count
of4
the initialization function will be called in 3 background threads, and it is up to the user to alter the thread in which they call fluxEngine_C_v1_ProcessingContext_process_next() to process data with fluxEngine.The threads will be created sequentially, the next thread being created only after the previous thread’s initialization function has completed. This allows the user to directly modify global data structures in the initialization functions without the need for locking.
Important: any attempt to call a function that accesses this processing queue set inside the initialization functions will create a deadlock.
This function may only be called if there are currently no background threads associated with this processing queue set. Otherwise fluxEngine_C_v1_stop_processing_queue_set_threads() must be called first to change the number of threads.
Any processing context that was created for the same processing queue set before a call to this function was made is marked as invalid and can only be destroyed, but not used anymore.
The following specific error codes may be returned by this function:
fluxEngine_C_v1_ErrorCode_Unknown
fluxEngine_C_v1_ErrorCode_AllocationFailure
fluxEngine_C_v1_ErrorCode_InvalidArgument
fluxEngine_C_v1_ErrorCode_HandleNoLongerValid
fluxEngine_C_v1_ErrorCode_ThreadCreationError
fluxEngine_C_v1_ErrorCode_ThreadInitFunctionError
- Parameters:
processing_queue_set – The processing queue set to create the threads for
thread_count – The number of threads to use for parallel processing (one less than this number will be created by this function, see the description for details)
init_function – The initialization function to call at the start of each newly created background thread
init_function_context – An arbitrary context that will be passed to the initialization function
error – [out] The resulting error object, if an error occurs. See the documentation of the fluxEngine_C_v1_Error structure for details on error handling.
- Returns:
0
on success,-1
on failure
ProcessingQueueSet_stop_threads
-
void fluxEngine_C_v1_ProcessingQueueSet_stop_threads(fluxEngine_C_v1_ProcessingQueueSet *processing_queue_set)
Stop background threads of a processing queue set.
This will stop any background threads that are currently associated with a given processing queue set. If processing is currently active on the handle, it will be aborted, as if fluxEngine_C_v1_ProcessingContext_abort() had been called. In that case this method may take a bit of time, as abort operations are not immediate, and this method will wait until the abort has completed.
Any processing context that was created for the same processing queue set before a call to this function was made is marked as invalid and can only be destroyed, but not used anymore.
This method is always successful: the only errors that could occur when calling this method would be non-recoverable.
If
NULL
is passed to this method, it will do nothing.- Parameters:
processing_queue_set – The processing queue set to stop the threads for
ProcessingQueueSet_free
-
void fluxEngine_C_v1_ProcessingQueueSet_free(fluxEngine_C_v1_ProcessingQueueSet *processing_queue_set)
Destroy a processing queue set.
Destroy a processing queue set, freeing its resources. All background threads will be stopped in the same manner as if fluxEngine_C_v1_ProcessingQueueSet_stop_threads() had been called.
Any processing context associated with this processing queue set will be marked as invalid and may hence not be used anymore. However, some memory associated with remaining processing contexts that have not been freed previous to a call to this method may still be in use until each remaining processing context is freed by the user.
All other processing queue sets of the handle will remain valid.
If
NULL
is passed to this method, it will do nothing.- Parameters:
processing_queue_set – The processing queue set to destroy