Parallelism and worker pools#

The initial ask is almost always the same: “how do i make a worker pool?” — i.e. the thing multiprocessing and concurrent.futures.ProcessPoolExecutor get reached for once the GIL becomes the enemy.

Here’s the structured concurrency (SC) answer: tractor is built to handle any SC process tree you can imagine; a “worker pool” pattern is a trivial special case. So instead of shipping a pool class with knobs bolted on, you compose one from the same two ingredients used everywhere else in tractor: an actor nursery and some IPC.

The stdlib baseline#

For a fair comparison, start from the canonical ProcessPoolExecutor primes example straight out of the Python docs,

examples/parallelism/concurrent_futures_primes.py#
'''
The pure-stdlib `concurrent.futures.ProcessPoolExecutor`
primes demo (from the std docs) verbatim; the baseline twin
of `concurrent_actors_primes.py`.

The `async def main()` + `trio.run()` shim at the bottom only
exists so the docs-example test runner can exercise this
script; the executor code itself is untouched stdlib fare.

'''
import time
import concurrent.futures
import math

import trio

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]


def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


def check_primes():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        start = time.time()

        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

        print(f'processing took {time.time() - start} seconds')


async def main() -> None:
    # thin shim: the pool blocks this (sole) trio task
    # which is just fine for a one-shot baseline script.
    check_primes()


if __name__ == '__main__':

    start = time.time()
    trio.run(main)
    print(f'script took {time.time() - start} seconds')

Synchronous code, a hidden thread + IPC machine under the hood, and an API surface (executors, futures, .map()) invented to paper over the fact that the pool isn’t part of your program’s task tree. Keep an eye on three things for the rewrite: how work is submitted, how results come back, and what happens when a worker dies.

The tractor way#

Now the same workload as a tractor program,

examples/parallelism/concurrent_actors_primes.py#
"""
Demonstration of the prime number detector example from the
``concurrent.futures`` docs:

https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example

This uses no extra threads, fancy semaphores or futures; all we need
is ``tractor``'s channels.

"""
from contextlib import (
    asynccontextmanager as acm,
    aclosing,
)
from typing import Callable
import itertools
import math
import time

import tractor
import trio


PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419,
]


async def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


@acm
async def worker_pool(workers=4):
    """Though it's a trivial special case for ``tractor``, the well
    known "worker pool" seems to be the defacto "but, I want this
    process pattern!" for most parallelism pilgrims.

    Yes, the workers stay alive (and ready for work) until you close
    the context.
    """
    async with tractor.open_nursery() as tn:

        portals = []
        snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES))

        for i in range(workers):

            # this starts a new sub-actor (process + trio runtime) and
            # stores it's "portal" for later use to "submit jobs" (ugh).
            portals.append(
                await tn.start_actor(
                    f'worker_{i}',
                    enable_modules=[__name__],
                )
            )

        async def _map(
            worker_func: Callable[[int], bool],
            sequence: list[int]
        ) -> list[bool]:

            # define an async (local) task to collect results from workers
            async def send_result(func, value, portal):
                await snd_chan.send((value, await portal.run(func, n=value)))

            async with trio.open_nursery() as n:

                for value, portal in zip(sequence, itertools.cycle(portals)):
                    n.start_soon(
                        send_result,
                        worker_func,
                        value,
                        portal
                    )

                # deliver results as they arrive
                for _ in range(len(sequence)):
                    yield await recv_chan.receive()

        # deliver the parallel "worker mapper" to user code
        yield _map

        # tear down all "workers" on pool close
        await tn.cancel()


async def main():

    async with worker_pool() as actor_map:

        start = time.time()

        async with aclosing(actor_map(is_prime, PRIMES)) as results:
            async for number, prime in results:

                print(f'{number} is prime: {prime}')

        print(f'processing took {time.time() - start} seconds')


if __name__ == '__main__':
    start = time.time()
    trio.run(main)
    print(f'script took {time.time() - start} seconds')

What’s different (and what isn’t),

  • worker_pool() is ~30 lines of your code: an actor nursery spawning workers subactors — each a full process running its own trio task tree — kept alive and ready for work until the block exits; enable_modules=[__name__] is the capability allowlist letting them run this module’s functions,

  • jobs are “submitted” by just… calling the function: portal.run(is_prime, n=value) runs is_prime() in a worker and hands back its result like any local await,

  • results stream back through a plain trio.open_memory_channel() as they complete — no futures and no polling,

  • teardown is one await tn.cancel() (tractor.ActorNursery.cancel()), and any worker crash triggers the one-cancels-all machinery from Cancellation and error propagation — a dead worker can never strand the pool.

This uses no extra threads, fancy semaphores or futures; all we need is tractor’s IPC! The full scorecard,

concurrent.futures

tractor

ProcessPoolExecutor()

worker_pool() — yours, ~30 lines

executor.map(is_prime, PRIMES)

actor_map(is_prime, PRIMES) async-gen

Future + internal result queue

trio.open_memory_channel()

results in input order

results as they complete

worker crash -> BrokenProcessPool

boxed tractor.RemoteActorError

pool teardown on with exit

one-cancels-all nursery teardown

And because the pool is just SC code, every variation — bounded submission, per-worker state, streaming partial results (see Cross-process streaming), nested pools — is a local edit to your pool, not a feature request against an executor class B)

An async pool, though?#

Yep: RPC targets must be async functions — the runtime rejects a plain def with TypeError: ... must be an async function!. That’s not zealotry, it’s cancel-responsiveness: each worker is a full trio runtime whose msg loop is what hears graceful cancel requests, and a hot loop that never yields can’t be (politely) interrupted.

Two practical consequences,

  • CPU-bound loops should checkpoint once in a while; note how burn_cpu() in the next example sprinkles await trio.sleep() calls so the worker stays responsive while still pegging a core,

  • if some sync call blocks a worker anyway you’re still covered: an unresponsive actor just rides the graceful-then-hard teardown ladder from Cancellation and error propagation instead of acking its cancel — slower, but never a zombie.

Run a func in a process#

Even a pool can be overkill; “run this one async func in a subprocess and give me the result” is a one-liner via tractor.ActorNursery.run_in_actor(),

examples/parallelism/single_func.py#
"""
Run with a process monitor from a terminal using::

    $TERM -e watch -n 0.1  "pstree -a $$" \
        & python examples/parallelism/single_func.py \
        && kill $!

"""
import os

import tractor
import trio


async def burn_cpu():

    pid = os.getpid()

    # burn a core @ ~ 50kHz
    for _ in range(50000):
        await trio.sleep(1/50000/50)

    return os.getpid()


async def main():

    async with tractor.open_nursery() as n:

        portal = await n.run_in_actor(burn_cpu)

        #  burn rubber in the parent too
        await burn_cpu()

        # wait on result from target function
        pid = await portal.wait_for_result()

    # end of nursery block
    print(f"Collected subproc {pid}")


if __name__ == '__main__':
    trio.run(main)

run_in_actor() is a convenience wrapper — spawn an actor, run exactly one task in it, reap on result — not the core spawning model (that’s tractor.ActorNursery.start_actor() plus tractor.Portal.open_context(); see The Context: a cross-actor task pair). But for this fire-and-collect shape it’s exactly the right amount of typing.

As the module docstring suggests, run it under a process-tree monitor to watch the child appear and get reaped,

$TERM -e watch -n 0.1  "pstree -a $$" \
    & python examples/parallelism/single_func.py \
    && kill $!

You’ll see a core get burned in both parent and child — real parallelism, no GIL sharing, since these are processes (i.e. non-shared-memory threads).

When all you have is sync code#

Honesty corner: if your workload is purely synchronous functions and you’ve zero need for IPC dialogs, streaming, daemons or supervision trees — i.e. you really do just want “ProcessPoolExecutor but trio-native” — the smaller, focused trio-parallel project may serve you better. tractor happily covers the use case (as above) but brings a whole runtime along for the ride. (And when blocking I/O — not the GIL — is the actual problem, plain in-process trio.to_thread.run_sync() may be all you ever needed.)

And to see that runtime’s process-management story — a per-core fleet self-destructing with zero zombies left behind — go run examples/parallelism/we_are_processes.py, walked through in the Quickstart.

See also