import asyncio

from eventkit import Event, Op

[docs]class PoolMap(Op): """ Map a function using a distributed pool with `eventkit <>`_. Args: func: Function to map. If it returns an awaitable then the result will be awaited and returned. timeout: Timeout in seconds since map is started. chunksize: Source emits are chunked up to this size. A larger chunksize can greatly improve efficiency for small tasks. ordered: * ``True``: The order of results preserves the source order. * ``False``: Results are in order of completion. """ __slots__ = ('_pool', '_func', '_task', '_kwargs') def __init__( self, pool, func, timeout=None, chunksize=1, ordered=True, source=None): self._pool = pool self._func = func self._kwargs = dict( timeout=timeout, chunksize=chunksize, ordered=ordered, star=True) Op.__init__(self, source) def set_source(self, source): async def looper(): try: async for result in self._pool.map_async( self._func, source.aiter(tuples=True), **self._kwargs): self.emit(result) except Exception as error: self.error_event.emit(self, error) self._task = None self.set_done() self._task = asyncio.ensure_future(looper()) def __del__(self): if self._task: self._task.cancel()
def poolmap( self, pool, func, timeout=None, chunksize=1, ordered=True): return PoolMap(pool, func, timeout, chunksize, ordered, self) poolmap.__doc__ = PoolMap.__doc__ Event.poolmap = poolmap