Source code for sofia_redux.toolkit.utilities.multiprocessing

# Licensed under a 3-clause BSD style license - see LICENSE.rst

from contextlib import contextmanager
from functools import partial
import gc
import logging
import multiprocessing as mp
import os
import regex
import shutil
import signal
import sys
import tempfile
import time
import threading

from astropy import log
import cloudpickle
import numpy as np
import psutil


__all__ = ['get_core_number', 'relative_cores', 'valid_relative_jobs',
           'multitask', 'pickle_object', 'unpickle_file', 'pickle_list',
           'unpickle_list', 'in_main_thread', 'log_with_multi_handler',
           'log_for_multitask', 'purge_multitask_logs', 'wrapped_with_logger',
           'log_records_to_pickle_file', 'MultitaskHandler', 'wrap_function']


[docs] def get_core_number(cores=True): """ Returns the maximum number of CPU cores available Parameters ---------- cores : bool or int, optional If False, returns 1. If True, returns the maximum number of cores available. An integer specifies an upper limit on the maximum number of cores to return Returns ------- cores : int The maximum number of cores to use for parallel processing """ if not cores: return 1 max_cores = psutil.cpu_count() if cores is True: return max_cores elif isinstance(cores, (int, float)): return int(np.clip(cores, 1, max_cores)) else: return max_cores
[docs] def relative_cores(jobs): """ Return the actual number of cores to use for a given number of jobs. Returns 1 in cases where jobs is None or 0. If jobs is less than zero, the returned value will be max_available_cores + jobs + 1. i.e., -1 will use all available cores. Parameters ---------- jobs : int or float or None Returns ------- n_cores : int The number of cores to use which will always be in the range 1 -> max_available_cores. """ if jobs is None: return 1 jobs = float(jobs) if jobs == 0: return 1 max_cores = get_core_number() if jobs % 1 != 0: jobs *= max_cores jobs = int(jobs) if jobs < 0: cores = max_cores + jobs + 1 else: cores = jobs return get_core_number(cores=cores)
[docs] def valid_relative_jobs(jobs): """ Return a valid number of jobs in the range 1 <= jobs <= max_cores. Parameters ---------- jobs : int An positive or negative integer. Negative values are processed as max_cores - jobs + 1. Returns ------- valid_jobs : int The number of jobs available to process. """ return get_core_number(relative_cores(jobs))
[docs] def multitask(func, iterable, args, kwargs, jobs=None, skip=None, max_nbytes='1M', force_threading=False, force_processes=False, logger=None): """ Process a series of tasks in serial, or in parallel using joblib. `multitask` is used to run a function multiple times on a series of arguments. Tasks may be run in series (default), or in parallel using multi-processing via the joblib package. If an error is encountered while attempting to process in parallel with joblib, an attempt will be made to process the tasks in series. The function to process multiple times (`func`) must take one of the following forms:: 1. result[i] = func(args, iterable[i]) 2. result[i] = func(args, kwargs, iterable[i]) Here, the above "args" is the same as the `args` argument. i.e., the full argument list. Setting the argument `kwargs` to `None` implies that `func` takes form 1, while anything else sets multitask to assume the function is of form 2. Since this is a non-standard method of specifying a function, it is highly likely the user will have to define their own `func`. For example, to use multitask to add ten to a series of numbers: >>> from sofia_redux.toolkit.utilities.multiprocessing import multitask >>> numbers = list(range(5)) >>> numbers [0, 1, 2, 3, 4] >>> def add_ten(args, i): ... return args[i] + 10 >>> multitask(add_ten, range(len(numbers)), numbers, None) [10, 11, 12, 13, 14] In the above example, `iterable` has been set to range(len(numbers)) indicating that multitask should supply `i` to `add_ten` multiple times (0, 1, 2, 3, 4). Note that `kwargs` is explicitly set to None indicating that `add_ten` is of form 1. While multitask may seem like overkill in this example, it can be highly adaptable for complex procedures. The `skip` parameter can be used to skip processing of certain tasks. For example: >>> skip = [False] * len(numbers) >>> skip[2] = True >>> multitask(add_ten, range(len(numbers)), numbers, None, skip=skip) [10, 11, 13, 14] By default, parallelization is managed with the loky backend. If calling code is known to perform better with the threading backend, it should be called within the joblib parallel_backend context manager: >>> from joblib import parallel_backend >>> with parallel_backend('threading', n_jobs=2): ... multitask(add_ten, range(len(numbers)), numbers, None) [10, 11, 12, 13, 14] Parameters ---------- func : function The function to repeat multiple times on different sets of arguments. Must be of the form func(args, i) if `kwargs` is None, or func(args, kwargs, i) otherwise, where `i` is a member of `iterable`. iterable : iterable A Python object that can be iterated though such that each member can be passed to `func` as the final argument. args Anything that should be passed to `func` as the first argument. The intended use is such that the output of `func` is result[i] = f(args[iterable[i]]). kwargs : None or anything If set to None, multitask assumes `func` is of the form func(args, i). Otherwise, multitask assumes `func` is of the form func(args, kwargs, i). jobs : int, optional If set to a positive integer, processes tasks in parallel using `jobs` threads. If negative, sets the number of threads to the number of CPUs - jobs + 1. i.e., if jobs = -1, multitask will use all available CPUs. skip : array_like of bool, optional Should be of len(`iterable`), where a value of True signifies that processing should be skipped for that task and omitted from the final result. max_nbytes : int or str or None, optional Threshold on the size of arrays passed to the workers that triggers automated memory mapping in temp_folder. Can be an int in Bytes, or a human-readable string, e.g., ‘1M’ for 1 megabyte. Use None to disable memmapping of large arrays. Only active when backend=”loky” or “multiprocessing”. The default is currently set to '1M' for consistency with the joblib library. Note that memmapping disallows in-place modification of data, so if this functionality is required set `max_nbytes` to `None`. force_threading : bool, optional If `True`, force joblib to run parrallel jobs using threads so that shared memory can be used. Otherwise, threading will only occur when parallel processing is spawned from a child process of the main thread. force_processes : bool, optional If `True`, force joblib to run parallel jobs using CPUs rather than threads. This can sometimes lead to unexpected outcomes when the multiprocessing is launched from a non-main thread. Pickling arguments prior and return values during processing is recommended in this case. logger : logging.Logger, optional If supplied, will attempt to produce sensibly ordered logs for the multiprocessing tasks for all handlers. Returns ------- result : list The final output where result[i] = func(args, iterable[i]). Will be of length len(`iterable`) if `skip` is None, otherwise len(iterable) - sum(skip). """ if jobs in [None, 0, 1]: return _serial(func, args, kwargs, iterable, skip=skip) return _parallel( int(jobs), func, args, kwargs, iterable, skip=skip, max_nbytes=max_nbytes, force_threading=force_threading, force_processes=force_processes, logger=logger)
def _wrap_function(func, args, kwargs): """ Wrap given arguments and keyword arguments to a function. Removes the requirement of supplying args and kwargs to the given function. :func:`multitask` should be run on a function supplied in a very strict format, and actually only takes one single runtime argument. Functions should be designed so that they are of the form: def some_function(args, kwargs, run_time_argument): <code here> or def some_function(args, run_time_argument): <code here> All calls to `some_function` will always contain the same arguments and keyword arguments, but a different run time argument will be supplied to each call. An easy way to set this up is to set args and kwargs to a list of arguments for each call, and then select which set to use using the run time argument. For example, you could set up something like: >>> arguments = [(x, x + 2) for x in range(10)] >>> keyword_arguments = {'multiply': True} >>> def my_func(my_args, my_kwargs, index): ... x, y = my_args[index] ... if my_kwargs['multiply']: ... return x * y ... else: ... return x + y :func:`_wrap_function` will remove the requirement to always specify args and kwargs for each call, so `my_func(args, kwargs, index)` is the same as `_wrap_function(my_func, args, kwargs)(index)`. Parameters ---------- func : function The function to wrap. args : tuple The function arguments. kwargs : dict or None Any function keyword arguments. Returns ------- wrapped_function : function """ if kwargs is None: return partial(func, args) else: return partial(func, args, kwargs) def _wrap_function_with_logger(func, args, kwargs, logger_pickle_file, log_directory): """ Wrap a function and also halt all logging until the process is complete. Please see :func:`_wrap_function` for details on how the function should be implemented. Additionally, a logger is unpickled and used to emit any log messages one the function is complete. Parameters ---------- func : function The function to wrap. args : tuple The function arguments. kwargs : dict or None Any function keyword arguments. logger_pickle_file : str A filename containing the logger to unpickle and use. log_directory : str Pickles the log records to the given directory. Returns ------- wrapped_function : function """ wrapped = _wrap_function(func, args, kwargs) return partial(wrapped_with_logger, wrapped, logger_pickle_file, log_directory)
[docs] def wrapped_with_logger(func, logger_pickle_file, log_directory, run_arg_and_identifier): """ Return the results of the function in multitask and save log records. Parameters ---------- func : function The function to wrap. logger_pickle_file : str The file path to the pickled logger (:class:`logging.Logger`). log_directory : str The directory in which to store the log records for each run. run_arg_and_identifier : 2-tuple Any run time argument that the wrapped function returned by :func:`_wrap_function` requires (object), and an integer identifier signifying it's position in a list of run arguments. Returns ------- results : object The results of running `func` on a given run time argument. """ logger, _ = unpickle_file(logger_pickle_file) run_arg, identifier = run_arg_and_identifier log_basename = f"multitask_log_{identifier}.p" record_file = os.path.join(log_directory, log_basename) with log_records_to_pickle_file(logger, record_file): result = func(run_arg) return result
[docs] def wrap_function(func, args, kwargs=None, logger=None, log_directory=None): """ Wrap a function for use with :func:`multitask`. Parameters ---------- func : function The function to wrap. args : tuple The function arguments. kwargs : dict, optional Any function keyword arguments. logger : logging.Logger or str, optional A logger used to output any log messages once complete. If supplied, a valid `log_directory` must also be supplied. A path to a pickle file containing the logger may also be supplied. log_directory : str, optional If supplied together with a `logger`, will store all log records to a pickle file in the given directory. Returns ------- wrapped_function, log_pickle_file : function, str The wrapped function and the file location of the pickle file for any supplied logger. If no logger was supplied, this value will be `None`. """ if logger is None: return _wrap_function(func, args, kwargs), None if not isinstance(log_directory, str) or not os.path.isdir( log_directory): raise ValueError(f"Must supply a valid log directory. Received " f"{log_directory}.") if isinstance(logger, str) and os.path.isfile(logger): pickle_file = logger else: logger_id = id((logger, args)) tmp_fh, tmp_fname = tempfile.mkstemp( prefix=f'multitask_logger_{logger_id}', suffix='.p') os.close(tmp_fh) pickle_file = pickle_object(logger, tmp_fname) multi_func = _wrap_function_with_logger( func, args, kwargs, pickle_file, log_directory) return multi_func, pickle_file
def _serial(func, args, kwargs, iterable, skip=None): """ Processes tasks serially without any multiprocessing. Parameters ---------- func : function The function to repeat multiple times on different sets of arguments. Must be of the form func(args, i) if `kwargs` is None, or func(args, kwargs, i) otherwise, where `i` is a member of `iterable`. args Anything that should be passed to `func` as the first argument. The intended use is such that the output of `func` is result[i] = f(args[iterable[i]]). kwargs : None or anything If set to None, multitask assumes `func` is of the form func(args, i). Otherwise, multitask assumes `func` is of the form func(args, kwargs, i). iterable : iterable A Python object that can be iterated though such that each member can be passed to `func` as the final argument. skip : array_like of bool, optional Should be of len(`iterable`), where a value of True signifies that processing should be skipped for that task and omitted from the final result. Returns ------- result : list The final output where result[i] = func(args, iterable[i]). Will be of length len(`iterable`) if `skip` is None, otherwise len(iterable) - sum(skip). """ multi_func, _ = wrap_function(func, args, kwargs=kwargs) result = [] if skip is None: for thing in iterable: result.append(multi_func(thing)) else: for (skip_it, thing) in zip(skip, iterable): if skip_it: continue else: result.append(multi_func(+thing)) return result def _parallel(jobs, func, args, kwargs, iterable, skip=None, force_threading=False, force_processes=False, package=None, logger=None, **joblib_kwargs): """ Process a given list of jobs in parallel. Joblib and the multiprocessing package are currently used to process jobs in parallel. Generally, the joblib package appears to produce results faster than the base Python multiprocessing package but cannot always be implemented. The default package will generally be set to joblib unless multiprocessing has been specifically requested but we :func:`_parallel` has been started on a non-main thread. Unfortunately, the joblib loky backend will always start new processes using a fork followed by an exec() which is not safe for certain operating systems, especially MacOS. For Python versions > 3.8, the multiprocessing module starts new processes using the spawn method which is safe for nearly all platforms, but does introduce additional overhead. In cases where multiprocessing is required and the Python version is < 3.8, only serial reduction will be possible. Parameters ---------- jobs : int The number of jobs to perform in parallel. func : function The function to repeat multiple times on different sets of arguments. Must be of the form func(args, i) if `kwargs` is None, or func(args, kwargs, i) otherwise, where `i` is a member of `iterable`. args Anything that should be passed to `func` as the first argument. The intended use is such that the output of `func` is result[i] = f(args[iterable[i]]). kwargs : None or anything If set to None, multitask assumes `func` is of the form func(args, i). Otherwise, multitask assumes `func` is of the form func(args, kwargs, i). iterable : iterable A Python object that can be iterated though such that each member can be passed to `func` as the final argument. skip : array_like of bool, optional Should be of len(`iterable`), where a value of True signifies that processing should be skipped for that task and omitted from the final result. force_threading : bool, optional If `True`, run the jobs in parallel using a thread pool. This is preferable for high I/O and low CPU intensive operations or when access to shared variables is required. Cannot be used in conjunction with `force_processes`. force_processes : bool, optional If `True`, run the jobs in parallel using multiprocessing. This is preferable for computationally expensive tasks but creates copies of the main process on each CPU, so no ready access to shared variables exists and startup costs are greater. package : str, optional The multiprocessing package to use. May be one of {'joblib', 'multiprocessing', None}. `None` (the default) will estimate the best package at runtime. logger : Logger, optional The logger with which to emit any messages during `func`. joblib_kwargs : dict, optional Optional keyword arguments to pass into :class:`joblib.Parallel` if applicable. The `require` and `backend` options will be overwritten if threading is used. Returns ------- result : list The final output where result[i] = func(args, iterable[i]). Will be of length len(`iterable`) if `skip` is None, otherwise len(iterable) - sum(skip). """ if force_threading and force_processes: raise ValueError("Can either force threading or processes, not both.") # Check if this is just a single job. if jobs is None or jobs < 2: return _serial(func, args, kwargs, iterable, skip=skip) if skip is None: run_args = list(iterable) else: run_args = [x[1] for x in zip(skip, iterable) if not x[0]] required_jobs = int(np.clip(jobs, 1, len(run_args))) if required_jobs == 1: return _serial(func, args, kwargs, iterable, skip=skip) # Determine which package to use. if not in_windows_os(): reason = 'not installed' try: from joblib import delayed, Parallel have_joblib = True except ImportError: # pragma: no cover have_joblib = False delayed = Parallel = None else: # pragma: no cover reason = 'not available on Windows' have_joblib = False delayed = Parallel = None requested_package = package if package == 'joblib' and not have_joblib: # pragma: no cover raise ValueError(f"Cannot use joblib package: {reason}.") elif package is None: package = 'joblib' if have_joblib else 'multiprocessing' elif package not in ['multiprocessing', 'joblib']: raise NotImplementedError(f"The {package} package is not supported.") if force_processes: use_threads = False if not in_main_thread(): # pragma: no cover if sys.version_info < (3, 8, 0): log.warning("Multiprocessing is not available from a child " "thread for Python versions < 3.8.0: will process " "serially") return _serial(func, args, kwargs, iterable, skip=skip) if package == 'joblib': package = 'multiprocessing' if requested_package == 'joblib': log.warning("Cannot use joblib for multiprocessing from " "child thread: will use the multiprocessing " "package.") elif force_threading: use_threads = True else: use_threads = not in_main_thread() if logger is not None: log_directory = tempfile.mkdtemp(prefix='multitask_temp_log_dir_') run_args = [(x, i) for (i, x) in enumerate(run_args)] initial_log_level = logger.level else: log_directory = None initial_log_level = None multi_func, log_pickle_file = wrap_function( func, args, kwargs=kwargs, logger=logger, log_directory=log_directory) if package == 'multiprocessing': pool_class = mp.pool.ThreadPool if use_threads else mp.Pool with pool_class(processes=required_jobs) as pool: result = pool.map(multi_func, run_args) pool.close() pool.join() purge_multitask_logs(log_directory, log_pickle_file, use_logger=logger) return result # Joblib processing... joblib_kwargs['n_jobs'] = required_jobs if use_threads: joblib_kwargs['require'] = 'sharedmem' joblib_kwargs['backend'] = 'threading' if 'mmap_mode' not in joblib_kwargs: joblib_kwargs['mmap_mode'] = 'r' # was previously r result = [delayed(multi_func)(run_arg) for run_arg in run_args] # joblib does not reliably close child processes # Store current child processes current_process = psutil.Process() subprocesses_before = set( [p.pid for p in current_process.children(recursive=True)]) executor = Parallel(**joblib_kwargs) processed_result = executor(result) if logger is not None: logger.setLevel(initial_log_level) purge_multitask_logs(log_directory, log_pickle_file, use_logger=logger) # Terminate new child processes that are still running. subprocesses_after = set( [p.pid for p in current_process.children(recursive=True)]) terminate = (subprocesses_after - subprocesses_before) if in_windows_os(): # pragma: no cover for subprocess in terminate: try: os.kill(subprocess, signal.CTRL_BREAK_EVENT) except ProcessLookupError: pass else: for subprocess in terminate: try: os.killpg(subprocess, signal.SIGTERM) except ProcessLookupError: pass del executor gc.collect() return processed_result
[docs] def pickle_object(obj, filename): """ Pickle a object and save to the given filename. Parameters ---------- obj : object The object to pickle. filename : str or None If `filename` points to a writeable on-disk location, `obj` will be pickled and saved to that location. If `None`, nothing will happen. Returns ------- output : str or object Either the `filename` if the object was pickled, or `obj` if it wasn't. """ if filename is None: return obj with open(filename, 'wb') as f: cloudpickle.dump(obj, f) return filename
[docs] def unpickle_file(filename): """ Unpickle a string argument if it is a file, and return the result. Parameters ---------- filename : object or str If the argument is a string and a valid file path, it will be unpickled and the result will be available in the result. Returns ------- obj, pickle_file : object, str If the argument passed in was not a string or an invalid file, the resulting output `obj` will be `argument` and `pickle_file` will be `None`. If `argument` was a valid file path to a pickle file, `obj` will be the unpickled result, and `pickle`file` will be `argument`. """ if not isinstance(filename, str): return filename, None if not os.path.isfile(filename): log.warning(f"Pickle file not found: {filename}") return filename, None pickle_file = filename with open(pickle_file, 'rb') as f: obj = cloudpickle.load(f) return obj, pickle_file
[docs] def pickle_list(object_list, prefix=None, naming_attribute=None, class_type=None): """ Pickle a list of objects to a temporary directory. The list will be updated in-place, with each element being replaced by the on-disk file path to the pickle file in which it is saved. Parameters ---------- object_list : list (object) A list of things to pickle. prefix : str, optional The prefix for the temporary directory in which to store the pickle files. See :func:`tempfile.mkdtemp` for further information. naming_attribute : str, optional The attribute used to name the pickle file. If not supplied, defaults to id(object). class_type : class, optional If supplied, only objects of this class type will be pickled. Returns ------- temporary_directory : str The temporary directory in which the objects are saved as pickle files. """ directory = tempfile.mkdtemp(prefix=prefix) for i, obj in enumerate(object_list): if class_type is not None: if not isinstance(obj, class_type): continue if naming_attribute is not None: filename = f'{getattr(obj, naming_attribute, id(obj))}.p' else: filename = f'{id(obj)}.p' pickle_file = os.path.join(directory, filename) with open(pickle_file, 'wb') as f: cloudpickle.dump(obj, f) object_list[i] = pickle_file return directory
[docs] def unpickle_list(pickle_files, delete=True): """ Restore pickle files to objects in-place. Parameters ---------- pickle_files : list (str) A list of on-disk pickle files to restore. The restored objects will replace the filepath for each element in the list. delete : bool, optional If `True`, delete each pickle file once it has been restored. Returns ------- None """ if pickle_files is None: return result = pickle_files for i, pickle_file in enumerate(pickle_files): obj, filename = unpickle_file(pickle_file) if filename is None: # not a valid file continue result[i] = obj if delete: os.remove(filename)
[docs] def in_main_thread(): """ Return whether the process is running in the main thread. Returns ------- main_thread: bool `True` if this process is running in the main thread, and `False` if it is running in a child process. """ return threading.current_thread() == threading.main_thread()
[docs] @contextmanager def log_with_multi_handler(logger): """ Context manager to temporarily log messages for unique processes/threads Temporarily disables all log handlers and outputs the results to a dictionary of the form {(process, thread): list(records)} where process is returned by :func:`multiprocessing.current_process()` and thread is returned by :func:`threading.current_thread()`. Parameters ---------- logger : logging.Logger Yields ------ multi_handler : MultitaskHandler """ original_handlers = logger.handlers.copy() multi_handler = MultitaskHandler() logger.addHandler(multi_handler) initial_level = logger.level logger.setLevel('DEBUG') # Need to capture all records for handler in original_handlers: logger.removeHandler(handler) yield multi_handler logger.setLevel(initial_level) for handler in original_handlers: logger.addHandler(handler) logger.removeHandler(multi_handler)
[docs] @contextmanager def log_for_multitask(logger): """ Context manager to output log messages during multiprocessing. Stores all log messages during multiprocessing, and emits them using the given logger once complete. Parameters ---------- logger : logging.Logger Yields ------ None """ initial_level = logger.level with log_with_multi_handler(logger) as multi_handler: yield multi_handler logger.setLevel(initial_level) multi_handler.reorder_records() handlers = [] for handler in logger.handlers: if isinstance(handler, MultitaskHandler): for record in multi_handler.records: if record not in handler.records: handler.records.append(record) else: handlers.append(handler) if len(handlers) == 0: return for record in multi_handler.records: logger.handle(record[-1])
[docs] def purge_multitask_logs(log_directory, log_pickle_file, use_logger=None): """ Remove all temporary logging files/directories and handle log records. The user must supply a `log_directory` containing pickle files of the log records to handle. The `log_pickle_file` contains the logger used to handle these records. Following completion, both the 'log_directory` and `log_pickle_file` will be deleted from the file system. Parameters ---------- log_directory : str The directory in which the log records for each run were stored. This directory will be removed. log_pickle_file : str The pickle file containing the logger for multitask. This will be removed. use_logger : Logger, optional The logger to handle any log records. If not supplied, defaults to that found in the log_pickle_file. Returns ------- None """ if not isinstance(log_pickle_file, str) or not os.path.isfile( log_pickle_file): return if not isinstance(log_directory, str) or not os.path.isdir(log_directory): os.remove(log_pickle_file) return if not isinstance(use_logger, logging.Logger): with open(log_pickle_file, 'rb') as f: logger = cloudpickle.load(f) else: logger = use_logger pickle_files = os.listdir(log_directory) if logger is None or len(pickle_files) == 0: os.remove(log_pickle_file) shutil.rmtree(log_directory) return id_search = regex.compile(r'multitask\_log\_(.*)\.p$') identifiers = {} for pickle_file in pickle_files: search = id_search.search(pickle_file) if search is None: continue identifiers[int(search.group(1))] = pickle_file job_logs = [os.path.join(log_directory, identifiers[i]) for i in sorted(identifiers.keys())] unpickle_list(job_logs) # deletes files and converts to log records for job_log in job_logs: for record in job_log: if logger.isEnabledFor(record.levelno): logger.handle(record) os.remove(log_pickle_file) shutil.rmtree(log_directory)
[docs] @contextmanager def log_records_to_pickle_file(logger, pickle_file): """ Store the log records in a pickle file rather than emitting. Parameters ---------- logger : logging.Logger pickle_file : str The path to the pickle file that will contain the log records. Yields ------ None """ initial_level = logger.level with log_with_multi_handler(logger) as multi_handler: yield logger.setLevel(initial_level) multi_handler.reorder_records() standard_records = [x[-1] for x in multi_handler.records] with open(pickle_file, 'wb') as f: cloudpickle.dump(standard_records, f)
def in_windows_os(): """ Return `True` if running from a Windows OS. Returns ------- bool """ return os.name.lower().strip() == 'nt'
[docs] class MultitaskHandler(logging.Handler): """A log handler for multitask.""" def __init__(self): """ Initialize a multitask log Handler. The multitask log handler is designed to separately store log messages for each process or thread in order to retrieve those messages later for standard logging. This allows log messages to be output in order for each process. """ logging.Handler.__init__(self) self.records = []
[docs] def emit(self, record): """ Emit a log record. Stores the record in the lookup dictionary for the given process/thread. Each message is stored in the received order for later retrieval once whatever multiprocessing job is complete. Parameters ---------- record : logging.LogRecord The record to emit. Returns ------- None """ process = mp.current_process() thread = threading.current_thread() emit_time = time.time() self.records.append((emit_time, process, thread, record))
[docs] def reorder_records(self): """ Re-order the records in a sensible order. The records are sorted by process and then thread in chronological order. I.e., records are grouped by processes, starting with the first process that appears in the logs and then within that process group, a similar grouping is performed for each child thread. Each process-thread grouping will contain a list of log records in the order that they where emitted. Note that each record is a tuple of the form (time, process, thread, log_record). Returns ------- None """ if self.records is None or len(self.records) == 0: return # Try to figure out the main and child threads which can be very # hard since there is no record in the thread objects, and GUIs may # not always launch processes from the main thread. # Need to separate out logs for separate processes process_logs = {} for index, info in enumerate(self.records): t, process, thread, record = info if process not in process_logs: process_logs[process] = [] process_log = process_logs[process] if info not in process_log: process_log.append(info) ordered = [] for process, process_info in process_logs.items(): ordered_logs = [] main_thread = None thread_logs = {} for info in process_info: t, _, thread, record = info if main_thread is None: main_thread = thread # Note that coverage cannot be determined in threads. if thread == main_thread: if len(thread_logs) > 0: # pragma: no cover for thread_info in thread_logs.values(): ordered_logs.extend(thread_info) thread_logs = {} ordered_logs.append(info) continue if thread not in thread_logs: # pragma: no cover thread_logs[thread] = [] thread_log = thread_logs[thread] # pragma: no cover thread_log.append(info) # pragma: no cover if (thread.name == 'MainThread' or not thread.daemon): # pragma: no cover if len(thread_logs) > 0: for thread_info in thread_logs.values(): ordered_logs.extend(thread_info) thread_logs = {} main_thread = thread if len(thread_logs) > 0: # pragma: no cover for thread_info in thread_logs.values(): ordered_logs.extend(thread_info) ordered.extend(ordered_logs) self.records = ordered