Processing Queue Sets

ProcessingQueueSet

class fluxEngine.ProcessingQueueSet(handle)

fluxEngine Processing Queue Set

This class represents a processing queue set. A processing queue set is used for processing queues.

Each processing queue set is associated with a number of processing threads that the user can manage. A processing queue set may only be used to process a single model at the same time, but multiple processing queue sets may be created.

Parameters:

handle (Handle) – The handle to create the processing queue set for

createProcessingThreads(count, initFunction=None)

Create processing threads

fluxEngine may use parallization to speed up processing. In order to achieve this background threads must be created to run on additional CPU cores.

Calling this method is optional: by default processing will be single-threaded.

This method will start count - 1 threads when called, as the thread that asks for processing is always considered to be the first thread (with id 0). For example, if 4 is supplied to count this function will start 3 threads that run in the background. The thread that the user uses to call ProcessingContext.processNext() will be considered the thread with id 0, making processing use a total of 4 threads, which is the value supplied for count.

This method may only be called if there are currently no background threads associated with this processing queue set. Otherwise stopProcessingThreads() must be called first to change the number of threads.

Any processing context that was created for this processing queue set before a call to this method was made is marked as invalid and can only be destroyed, but not used anymore.

If initFunction is not None it will be called at the beginning of every newly created background thread. This allows the user to customize the thread properties (such as the CPU affinity) themselves.

This method will only return once all threads have been created and their initialization functions (if specified) have run.

The thread initialization functions are only called for the background threads that are started by this method; this means that for a count of 4 the initialization function will be called in three background threads, and it is up to the user to alter the thread in which they call ProcessingContext.processNext() to process data with fluxEngine.

The threads will be created sequentially, the next thread being created only after the previous thread’s initialization function has completed. This allows the user to directly modify global data structures in the initialization functions without the need for locking.

An example call to this method could be something like this:

def init_thread(threadId, threadCount):
    print("Creating thread {0} of {1}".format(threadId, threadCount))
processingQueueSet.createProcessingThreads(4, init_thread)

If the user detects an error condition during thread initialization and wants to abort, they should throw an exception in the initialization function they supplied, which will be propagated out of the call to this method.

Important: any attempt to call a method that accesses this handle inside the initialization functions will create a deadlock.

If an error occurs, an exception will be thrown.

Parameters:
  • count (int) – The number of threads to use for parallel processing (one less than this number will be created by this method, see the description for details)

  • initFunction (callable) – The thread initialization function, or None if no special thread initialization is to be used

stopProcessingThreads()

Stop all existing processing threads.

This will stop any background threads that are currently associated with a given processing queue set. If processing is currently active on the processing queue set, it will be aborted, as if ProcessingContext.abort() had been called. In that case this method may take a bit of time, as abort operations are not immediate, and this method will wait until the abort has completed.

Any processing context that was created for this processing queue set before a call to this method was made is marked as invalid and can only be destroyed, but not used anymore.

This method is always successful: the only errors that could occur when calling this method would be non-recoverable.