multitask¶
- sofia_redux.toolkit.utilities.multiprocessing.multitask(func, iterable, args, kwargs, jobs=None, skip=None, max_nbytes='1M', force_threading=False, force_processes=False, logger=None)[source]¶
Process a series of tasks in serial, or in parallel using joblib.
multitask
is used to run a function multiple times on a series of arguments. Tasks may be run in series (default), or in parallel using multi-processing via the joblib package.If an error is encountered while attempting to process in parallel with joblib, an attempt will be made to process the tasks in series.
The function to process multiple times (
func
) must take one of the following forms:1. result[i] = func(args, iterable[i]) 2. result[i] = func(args, kwargs, iterable[i])
Here, the above “args” is the same as the
args
argument. i.e., the full argument list. Setting the argumentkwargs
toNone
implies thatfunc
takes form 1, while anything else sets multitask to assume the function is of form 2.Since this is a non-standard method of specifying a function, it is highly likely the user will have to define their own
func
. For example, to use multitask to add ten to a series of numbers:>>> from sofia_redux.toolkit.utilities.multiprocessing import multitask >>> numbers = list(range(5)) >>> numbers [0, 1, 2, 3, 4] >>> def add_ten(args, i): ... return args[i] + 10 >>> multitask(add_ten, range(len(numbers)), numbers, None) [10, 11, 12, 13, 14]
In the above example,
iterable
has been set to range(len(numbers)) indicating that multitask should supplyi
toadd_ten
multiple times (0, 1, 2, 3, 4). Note thatkwargs
is explicitly set to None indicating thatadd_ten
is of form 1. While multitask may seem like overkill in this example, it can be highly adaptable for complex procedures.The
skip
parameter can be used to skip processing of certain tasks. For example:>>> skip = [False] * len(numbers) >>> skip[2] = True >>> multitask(add_ten, range(len(numbers)), numbers, None, skip=skip) [10, 11, 13, 14]
By default, parallelization is managed with the loky backend. If calling code is known to perform better with the threading backend, it should be called within the joblib parallel_backend context manager:
>>> from joblib import parallel_backend >>> with parallel_backend('threading', n_jobs=2): ... multitask(add_ten, range(len(numbers)), numbers, None) [10, 11, 12, 13, 14]
- Parameters:
- funcfunction
The function to repeat multiple times on different sets of arguments. Must be of the form func(args, i) if
kwargs
is None, or func(args, kwargs, i) otherwise, wherei
is a member ofiterable
.- iterableiterable
A Python object that can be iterated though such that each member can be passed to
func
as the final argument.- args
Anything that should be passed to
func
as the first argument. The intended use is such that the output offunc
is result[i] = f(args[iterable[i]]).- kwargsNone or anything
If set to None, multitask assumes
func
is of the form func(args, i). Otherwise, multitask assumesfunc
is of the form func(args, kwargs, i).- jobsint, optional
If set to a positive integer, processes tasks in parallel using
jobs
threads. If negative, sets the number of threads to the number of CPUs - jobs + 1. i.e., if jobs = -1, multitask will use all available CPUs.- skiparray_like of bool, optional
Should be of len(
iterable
), where a value of True signifies that processing should be skipped for that task and omitted from the final result.- max_nbytesint or str or None, optional
Threshold on the size of arrays passed to the workers that triggers automated memory mapping in temp_folder. Can be an int in Bytes, or a human-readable string, e.g., ‘1M’ for 1 megabyte. Use None to disable memmapping of large arrays. Only active when backend=”loky” or “multiprocessing”. The default is currently set to ‘1M’ for consistency with the joblib library. Note that memmapping disallows in-place modification of data, so if this functionality is required set
max_nbytes
toNone
.- force_threadingbool, optional
If
True
, force joblib to run parrallel jobs using threads so that shared memory can be used. Otherwise, threading will only occur when parallel processing is spawned from a child process of the main thread.- force_processesbool, optional
If
True
, force joblib to run parallel jobs using CPUs rather than threads. This can sometimes lead to unexpected outcomes when the multiprocessing is launched from a non-main thread. Pickling arguments prior and return values during processing is recommended in this case.- loggerlogging.Logger, optional
If supplied, will attempt to produce sensibly ordered logs for the multiprocessing tasks for all handlers.
- Returns:
- resultlist
The final output where result[i] = func(args, iterable[i]). Will be of length len(
iterable
) ifskip
is None, otherwise len(iterable) - sum(skip).