Source code for whimsy.runner.parallel

'''
Implements a base :class:`~.WorkerPool` which will execute a serial or parallel
implementation of imap_unordered depending on the number of threads requested
on subclass `__init__`. A single instance of imap_unordered may be active at
one time. Additional executions will require that the previous imap have
finished.
'''
import abc
import multiprocessing
from multiprocessing.managers import SyncManager
import Queue
from itertools import imap

from .. import config_module
from ..logger import log

[docs]class WorkerPool(object): ''' Will execute a serial or parallel implementation of imap_unordered depending on the number of threads requested on subclass `__init__`. A single instance of imap_unordered may be active at one time. Additional executions will require that the previous imap have finished. ''' __metaclass__ = abc.ABCMeta def __init__(self, threads): self.threads = threads self.parallel = threads and threads > 1 @abc.abstractproperty def pool(self): pass
[docs] def imap_unordered(self, map_function, args): ''' :param function: A module level function to supply jobs to. (Note: Must be exposed globaly by a module. :param args: An iterable containing arguments provided to members of the pool which the function will take. Effectively this function performs: >>> return (map_function(arg) for arg in args) ''' if self.parallel: return self._imap_parallel(map_function, args) return self._imap_serial(map_function, args)
_imap_serial = imap def _imap_parallel(self, map_function, args): return self.pool.imap_unordered(map_function, args)
[docs]class MulticoreWorkerPool(WorkerPool): ''' A worker takes jobs of its queue used to initalize it and sends them to the process which it wraps to execute. ''' def __init__(self, threads=None): super(MulticoreWorkerPool, self).__init__(threads) self._process_pool = None if self.parallel: self._process_pool = multiprocessing.Pool(threads) @property def pool(self): return getattr(self, '_process_pool', None) def _imap_parallel(self, map_function, args): jobs = ((map_function, arg) for arg in args) try: gen = super(MulticoreWorkerPool, self)._imap_parallel( subprocess_exception_wrapper, jobs ) # We need to use polling since termination is broken in python2. # (Blocking waits do not internally poll for us.) # # NOTE: The cpython library will automatically poll faster than # the time we give it. for res in gen: yield res except KeyboardInterrupt: self._process_pool.terminate() self._process_pool.join() raise
[docs]class ComplexMulticorePool(WorkerPool): ''' Class implements the server container for a multi-client remote and local multiprocessing pool. Exposes an :meth:`imap_unordered` method which will either execute serially if only initialized with a single thread, or will exececute in parallel if more given. Additionally, if more than one thread is given will allow remote clients to connect and join the pool. ''' def __init__(self, threads=None): super(ComplexMulticorePool, self).__init__(threads) if self.parallel: credentials = config_module.config.credentials self.server = WorkServer(*credentials) # The work server starts it's own worker, so we only make n-1 # additional workers. self._additional_workers = [] for thread in range(1, threads): new_worker = WorkClient(*credentials, as_client=False) new_worker.daemon = True # NOTE: When this pool is deleted and the server closes down # this process will be killed. new_worker.start() self._additional_workers.append(new_worker) @property def pool(self): return self.server def _imap_parallel(self, function, args): self.server.start() for i in self.server.imap_unordered(function, args): yield i self.server.shutdown()
[docs]class WorkQueueServer(SyncManager): ''' Implements a server which clients can connect to get work and result queues as well as the server's config setup. ''' def __init__(self, hostname, port, passkey): self.work_queue = Queue.Queue() self.result_queue = Queue.Queue() self.register('get_work_queue', lambda:self.work_queue) self.register('get_result_queue', lambda:self.result_queue) # NOTE: We use a tuple with dictionaries because the SyncManager will # not automatically pass 'deepcopy's of objects. So the config manually # builds out of dictionaries rather than copying a parent __dict__. self.register('get_shared_config', lambda:(config_module.config._config, config_module.config._defaults)) super(WorkQueueServer, self).__init__((hostname, port), passkey)
[docs]class WorkQueueClient(SyncManager): def __init__(self, hostname, port, passkey): self.register('get_work_queue') self.register('get_result_queue') self.register('get_shared_config') super(WorkQueueClient, self).__init__((hostname, port), passkey)
[docs]class WorkServer(object): ''' Implements a server object which creates work queues and result queues which clients can connect to to assist in work. Additionally in order to implement an imap_unordered function that does not block, a separate client is spawned with the server. ''' def __init__(self, hostname, port, passkey): self.queue_server = WorkQueueServer(hostname, port, passkey) self.dest = (hostname, port, passkey) # Indicates that a imap function is already in progress. self.in_progress = False self.started = False
[docs] def start(self): '''Start the server and the helping work_client.''' # Start the work queue manager. self.queue_server.start() # Spawn a subprocess to also participate in work in case we are a server # with no workers. # NOTE: We use a subprocess rather than a thread so we are not required # to wait for the sockets to cleanup. self.work_client = WorkClient(*self.dest, as_client=False) self.work_client.daemon = True self.work_client.start()
[docs] def shutdown(self): ''' Shutdown the server, the client will close asynchronosly since it will block in its own process waiting for the socket to close. ''' self.queue_server.shutdown()
# NOTE: It will take a decent amount of time for the work_client to close # its sockets so we don't bother joining. Just let the process cleanup # on its own. # self.p.terminate() # self.work_client.join()
[docs] def imap_unordered(self, function, args): ''' Provides functional equivalence of: >>> return (function(arg) for arg in args) .. note:: This will not block since we also spawn a `work_client` to assist this process. ''' work_queue = self.queue_server.get_work_queue() result_queue = self.queue_server.get_result_queue() length = 0 for arg in args: length += 1 work_queue.put((function, arg)) for _ in range(length): yield result_queue.get()
[docs]class WorkClient(multiprocessing.Process): # Signals sent through the work queue. def __init__(self, hostname, port, passkey, as_client=True): ''' :param as_client: Set the config.command of the client process to 'client'. ''' self.dest = (hostname, port, passkey) self.queue_client = WorkQueueClient(hostname, port, passkey) self.as_client = as_client super(WorkClient, self).__init__()
[docs] def run(self): ''' Connects to the server and gathers its work and result queues. Also the majority of the config of the server process. (Allowing flags like `fail_fast` to be passed to us.) .. note:: Exectued on the client process. ''' try: self.queue_client.connect() except: log.bold('WorkClient failed to connect to WorkServer.') return else: log_if_client(log.bold, 'Connected to test server.') disconnected_msg = ('WorkClient disconnected from WorkServer before it' ' could start.') try: self._copy_config() work_queue = self.queue_client.get_work_queue() result_queue = self.queue_client.get_result_queue() except IOError: log.bold(disconnected_msg) except EOFError: log.bold(disconnected_msg) else: self.imap_task(work_queue, result_queue) log_if_client(log.bold, 'Work completed for test server, closing.')
def _copy_config(self): ''' Copies the config of the server, modifying it sligtly to fit the requirements for a work client. ''' newconfig = self.queue_client.get_shared_config() config_module.config._init_with_dicts(*newconfig._getvalue()) # In the copied config, don't spawn additional threads. config_module.config._set('threads', 1) if self.as_client: config_module.config._set('command', 'client')
[docs] @staticmethod def imap_task(wq, rq): try: while True: (function, arg) = wq.get() rq.put(function(arg)) except EOFError: return
[docs]def log_if_client(callback, *args, **kwargs): ''' Execute the given callback if the client command was given at program startup. ''' if config_module.config.command == 'client': callback(*args, **kwargs)
[docs]class SubprocessException(Exception): ''' Exception represents an exception that occured in a child python process. '''
[docs]def subprocess_exception_wrapper(args): ''' Wraps a python child process with a function that will enable tracebacks to be printed from child python processes. ''' import traceback import sys (function, args) = args try: return function(args) except: raise SubprocessException("".join(traceback.format_exception(*sys.exc_info())))