Distex documentation

Release 0.6.0.

Pool

class distex.pool.Pool(num_workers=None, hosts=None, qsize=2, initializer=None, initargs=(), localhost=None, localport=None, lazy_create=False, worker_loop=<LoopType.default: 0>, func_pickle=<PickleType.dill: 2>, data_pickle=<PickleType.pickle: 0>, loop=None)[source]

Pool of local and remote workers that can run tasks.

To create a process pool of 4 local workers:

pool = Pool(4)

To create 8 remote workers on host maxi, using SSH (unix only):

pool = Pool(0, 'ssh://maxi/8')

distex must be installed on all remote hosts and the distex_proc script must be in the path. Test this with ssh <host> distex_proc. When using SSH it is not necessary to have a distex server running on the hosts.

When not using SSH a spawning server has to be started first on all hosts involved:

python3 -m distex.server

Warning

Only use this in a trusted network environment.

With the server running on host mini, to create a pool of 2 workers running there:

pool = Pool(0, 'mini/2')

Local, remote SSH and remote non-SSH workers can all be combined in one pool:

pool = Pool(4, ['ssh://maxi/8', 'mini/2'])

To give a SSH username or a non-default port such as 10022, specify the host as 'ssh://username@maxi:10022/8'. It is not possible to give a password, use SSH keys instead: ssh-keygen can be used to create a key and ssh-copy-id to copy it to all hosts.

Parameters:
  • num_workers (Optional[int]) – Number of local process workers. The default of None will use the number of CPUs.
  • hosts – List of remote host specification strings in the format [ssh://][username@]hostname[:portnumber]/num_workers.
  • qsize (int) – Number of pending tasks per worker. To improve the throughput of small tasks this can be increased from the default of 2. If no queueing is desired then it can be set to 1.
  • initializer – Callable to initialize worker processes.
  • initargs (tuple) – Arguments tuple that is unpacked into the initializer.
  • localhost (Optional[str]) – Local TCP server (if any) will listen on this address.
  • localport (Optional[int]) – Local TCP server (if any) will listen on this port (default: random open port).
  • lazy_create (bool) – If True then no workers will be created until the first task is submitted.
  • worker_loop

    LoopType to use for workers:

    1. default (=uvloop when available, proactor on Windows)
    2. asyncio (standard selector event loop)
    3. uvloop (Unix only)
    4. proactor (Windows only)
    5. quamash (PyQt)
  • func_pickle

    PickleType to to use for serializing functions:

    1. pickle
    2. cloudpickle
    3. dill
  • data_pickle

    PickleType to to use for data:

    1. pickle
    2. cloudpickle
    3. dill
  • loop – The asyncio event loop to run the pool in.

distex.Pool implements the concurrent.futures.Executor interface and can be used in the place of ProcessPoolExecutor.

exception TimeoutError

The operation exceeded the given deadline.

is_ready()[source]

True if the pool is ready to process tasks, false otherwise.

There is also the public ready event.

Return type:bool
total_workers()[source]

Total number of workers in the pool.

Return type:int
submit(func, *args, **kwargs)[source]

Submit the task to be run in the pool and return a concurrent.futures.Future that will hold the result.

This method is provided for compatibility with concurrent.futures.Executor.

map(func, *iterables, timeout=None, chunksize=1, ordered=True, star=False)[source]

Map the function onto the given iterable(s) and return an iterator that yields the results.

Parameters:
  • func – Function to map. If it returns an awaitable then the result is awaited and returned.
  • iterables – Sync or async iterables (in any combination) that yield the arguments for func. The iterables can be unbounded (i.e. they don’t need to have a length).
  • timeout – Timeout in seconds since map is started.
  • chunksize – Iterator is chunked up to this size. A larger chunksize can greatly improve efficiency for small tasks.
  • ordered
    • True: The order of results preserves the order of the input iterables.
    • False: The results are in order of completion.
  • star
    • True: There can be only one iterable and it must yield sequences (such as tuples). The sequences are unpacked (‘starred’) into func.
    • False: The values that the iterators yield are supplied in-place to func.

Tip

The function func is is pickled only once and then cached. If it takes arguments that remain constant during the mapping then consider using functools.partial to bind the function with the constant arguments; Then do the mapping with the bound function and with lesser arguments. Especially when map uses large constant datasets this can be beneficial.

run(func, *args, **kwargs)[source]

Run the function with the given arguments in the pool and wait for the result.

run_on_all(func, *args, **kwargs)[source]

Run the task on each worker in the pool. Return a list of all results (in order of completion) or raise an exception in case the task fails on one or more workers.

Will first wait for any other pending tasks to finish and then schedule the task over all workers at the same time.

This can be used for initializing, cleanup, intermittent polling, etc.

create()[source]

Coroutine to create local processors and servers and start up remote processors.

map_async(func, *iterables, timeout=None, chunksize=1, ordered=True, star=False)[source]

Async version of map. This runs with less overhead than map and can be twice as fast for small tasks.

run_async(func, *args, **kwargs)[source]

Asynchronously run the function with the given arguments in the pool and return the result when it becomes available.

run_on_all_async(func, *args, **kwargs)[source]

Async version of run_on_all.

shutdown_async(wait=True)[source]
shutdown(wait=True)[source]

Shutdown the pool and clean up resources.

PoolMap

class distex.poolmap.PoolMap(pool, func, timeout=None, chunksize=1, ordered=True, source=None)[source]

Map a function using a distributed pool with eventkit.

Parameters:
  • func – Function to map. If it returns an awaitable then the result will be awaited and returned.
  • timeout – Timeout in seconds since map is started.
  • chunksize – Source emits are chunked up to this size. A larger chunksize can greatly improve efficiency for small tasks.
  • ordered
    • True: The order of results preserves the source order.
    • False: Results are in order of completion.
set_source(source)[source]