multitask¶
- sofia_redux.toolkit.utilities.multiprocessing.multitask(func, iterable, args, kwargs, jobs=None, skip=None, max_nbytes='1M', force_threading=False, force_processes=False, logger=None)[source]¶
 Process a series of tasks in serial, or in parallel using joblib.
multitaskis used to run a function multiple times on a series of arguments. Tasks may be run in series (default), or in parallel using multi-processing via the joblib package.If an error is encountered while attempting to process in parallel with joblib, an attempt will be made to process the tasks in series.
The function to process multiple times (
func) must take one of the following forms:1. result[i] = func(args, iterable[i]) 2. result[i] = func(args, kwargs, iterable[i])
Here, the above “args” is the same as the
argsargument. i.e., the full argument list. Setting the argumentkwargstoNoneimplies thatfunctakes form 1, while anything else sets multitask to assume the function is of form 2.Since this is a non-standard method of specifying a function, it is highly likely the user will have to define their own
func. For example, to use multitask to add ten to a series of numbers:>>> from sofia_redux.toolkit.utilities.multiprocessing import multitask >>> numbers = list(range(5)) >>> numbers [0, 1, 2, 3, 4] >>> def add_ten(args, i): ... return args[i] + 10 >>> multitask(add_ten, range(len(numbers)), numbers, None) [10, 11, 12, 13, 14]
In the above example,
iterablehas been set to range(len(numbers)) indicating that multitask should supplyitoadd_tenmultiple times (0, 1, 2, 3, 4). Note thatkwargsis explicitly set to None indicating thatadd_tenis of form 1. While multitask may seem like overkill in this example, it can be highly adaptable for complex procedures.The
skipparameter can be used to skip processing of certain tasks. For example:>>> skip = [False] * len(numbers) >>> skip[2] = True >>> multitask(add_ten, range(len(numbers)), numbers, None, skip=skip) [10, 11, 13, 14]
By default, parallelization is managed with the loky backend. If calling code is known to perform better with the threading backend, it should be called within the joblib parallel_backend context manager:
>>> from joblib import parallel_backend >>> with parallel_backend('threading', n_jobs=2): ... multitask(add_ten, range(len(numbers)), numbers, None) [10, 11, 12, 13, 14]
- Parameters:
 - funcfunction
 The function to repeat multiple times on different sets of arguments. Must be of the form func(args, i) if
kwargsis None, or func(args, kwargs, i) otherwise, whereiis a member ofiterable.- iterableiterable
 A Python object that can be iterated though such that each member can be passed to
funcas the final argument.- args
 Anything that should be passed to
funcas the first argument. The intended use is such that the output offuncis result[i] = f(args[iterable[i]]).- kwargsNone or anything
 If set to None, multitask assumes
funcis of the form func(args, i). Otherwise, multitask assumesfuncis of the form func(args, kwargs, i).- jobsint, optional
 If set to a positive integer, processes tasks in parallel using
jobsthreads. If negative, sets the number of threads to the number of CPUs - jobs + 1. i.e., if jobs = -1, multitask will use all available CPUs.- skiparray_like of bool, optional
 Should be of len(
iterable), where a value of True signifies that processing should be skipped for that task and omitted from the final result.- max_nbytesint or str or None, optional
 Threshold on the size of arrays passed to the workers that triggers automated memory mapping in temp_folder. Can be an int in Bytes, or a human-readable string, e.g., ‘1M’ for 1 megabyte. Use None to disable memmapping of large arrays. Only active when backend=”loky” or “multiprocessing”. The default is currently set to ‘1M’ for consistency with the joblib library. Note that memmapping disallows in-place modification of data, so if this functionality is required set
max_nbytestoNone.- force_threadingbool, optional
 If
True, force joblib to run parrallel jobs using threads so that shared memory can be used. Otherwise, threading will only occur when parallel processing is spawned from a child process of the main thread.- force_processesbool, optional
 If
True, force joblib to run parallel jobs using CPUs rather than threads. This can sometimes lead to unexpected outcomes when the multiprocessing is launched from a non-main thread. Pickling arguments prior and return values during processing is recommended in this case.- loggerlogging.Logger, optional
 If supplied, will attempt to produce sensibly ordered logs for the multiprocessing tasks for all handlers.
- Returns:
 - resultlist
 The final output where result[i] = func(args, iterable[i]). Will be of length len(
iterable) ifskipis None, otherwise len(iterable) - sum(skip).