Parallelism and worker pools#
The initial ask is almost always the same: “how do i make a worker
pool?” — i.e. the thing multiprocessing and
concurrent.futures.ProcessPoolExecutor get reached for
once the GIL becomes the enemy.
Here’s the structured concurrency (SC) answer: tractor is built
to handle any SC process tree you can imagine; a “worker pool”
pattern is a trivial special case. So instead of shipping a pool
class with knobs bolted on, you compose one from the same two
ingredients used everywhere else in tractor: an actor nursery
and some IPC.
The stdlib baseline#
For a fair comparison, start from the canonical
ProcessPoolExecutor primes example
straight out of the Python docs,
'''
The pure-stdlib `concurrent.futures.ProcessPoolExecutor`
primes demo (from the std docs) verbatim; the baseline twin
of `concurrent_actors_primes.py`.
The `async def main()` + `trio.run()` shim at the bottom only
exists so the docs-example test runner can exercise this
script; the executor code itself is untouched stdlib fare.
'''
import time
import concurrent.futures
import math
import trio
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def check_primes():
with concurrent.futures.ProcessPoolExecutor() as executor:
start = time.time()
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
print(f'processing took {time.time() - start} seconds')
async def main() -> None:
# thin shim: the pool blocks this (sole) trio task
# which is just fine for a one-shot baseline script.
check_primes()
if __name__ == '__main__':
start = time.time()
trio.run(main)
print(f'script took {time.time() - start} seconds')
Synchronous code, a hidden thread + IPC machine under the hood, and
an API surface (executors, futures, .map()) invented to paper
over the fact that the pool isn’t part of your program’s task tree.
Keep an eye on three things for the rewrite: how work is submitted,
how results come back, and what happens when a worker dies.
The tractor way#
Now the same workload as a tractor program,
"""
Demonstration of the prime number detector example from the
``concurrent.futures`` docs:
https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example
This uses no extra threads, fancy semaphores or futures; all we need
is ``tractor``'s channels.
"""
from contextlib import (
asynccontextmanager as acm,
aclosing,
)
from typing import Callable
import itertools
import math
import time
import tractor
import trio
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419,
]
async def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
@acm
async def worker_pool(workers=4):
"""Though it's a trivial special case for ``tractor``, the well
known "worker pool" seems to be the defacto "but, I want this
process pattern!" for most parallelism pilgrims.
Yes, the workers stay alive (and ready for work) until you close
the context.
"""
async with tractor.open_nursery() as tn:
portals = []
snd_chan, recv_chan = trio.open_memory_channel(len(PRIMES))
for i in range(workers):
# this starts a new sub-actor (process + trio runtime) and
# stores it's "portal" for later use to "submit jobs" (ugh).
portals.append(
await tn.start_actor(
f'worker_{i}',
enable_modules=[__name__],
)
)
async def _map(
worker_func: Callable[[int], bool],
sequence: list[int]
) -> list[bool]:
# define an async (local) task to collect results from workers
async def send_result(func, value, portal):
await snd_chan.send((value, await portal.run(func, n=value)))
async with trio.open_nursery() as n:
for value, portal in zip(sequence, itertools.cycle(portals)):
n.start_soon(
send_result,
worker_func,
value,
portal
)
# deliver results as they arrive
for _ in range(len(sequence)):
yield await recv_chan.receive()
# deliver the parallel "worker mapper" to user code
yield _map
# tear down all "workers" on pool close
await tn.cancel()
async def main():
async with worker_pool() as actor_map:
start = time.time()
async with aclosing(actor_map(is_prime, PRIMES)) as results:
async for number, prime in results:
print(f'{number} is prime: {prime}')
print(f'processing took {time.time() - start} seconds')
if __name__ == '__main__':
start = time.time()
trio.run(main)
print(f'script took {time.time() - start} seconds')
What’s different (and what isn’t),
worker_pool()is ~30 lines of your code: an actor nursery spawningworkerssubactors — each a full process running its owntriotask tree — kept alive and ready for work until the block exits;enable_modules=[__name__]is the capability allowlist letting them run this module’s functions,jobs are “submitted” by just… calling the function:
portal.run(is_prime, n=value)runsis_prime()in a worker and hands back its result like any localawait,results stream back through a plain
trio.open_memory_channel()as they complete — no futures and no polling,teardown is one
await tn.cancel()(tractor.ActorNursery.cancel()), and any worker crash triggers the one-cancels-all machinery from Cancellation and error propagation — a dead worker can never strand the pool.
This uses no extra threads, fancy semaphores or futures; all we
need is tractor’s IPC! The full scorecard,
|
|
|---|---|
|
|
|
|
|
|
results in input order |
results as they complete |
worker crash -> |
boxed |
pool teardown on |
one-cancels-all nursery teardown |
And because the pool is just SC code, every variation — bounded submission, per-worker state, streaming partial results (see Cross-process streaming), nested pools — is a local edit to your pool, not a feature request against an executor class B)
An async pool, though?#
Yep: RPC targets must be async functions — the runtime rejects a
plain def with TypeError: ... must be an async function!.
That’s not zealotry, it’s cancel-responsiveness: each worker is a
full trio runtime whose msg loop is what hears graceful cancel
requests, and a hot loop that never yields can’t be (politely)
interrupted.
Two practical consequences,
CPU-bound loops should checkpoint once in a while; note how
burn_cpu()in the next example sprinklesawait trio.sleep()calls so the worker stays responsive while still pegging a core,if some sync call blocks a worker anyway you’re still covered: an unresponsive actor just rides the graceful-then-hard teardown ladder from Cancellation and error propagation instead of acking its cancel — slower, but never a zombie.
Run a func in a process#
Even a pool can be overkill; “run this one async func in a
subprocess and give me the result” is a one-liner via
tractor.ActorNursery.run_in_actor(),
"""
Run with a process monitor from a terminal using::
$TERM -e watch -n 0.1 "pstree -a $$" \
& python examples/parallelism/single_func.py \
&& kill $!
"""
import os
import tractor
import trio
async def burn_cpu():
pid = os.getpid()
# burn a core @ ~ 50kHz
for _ in range(50000):
await trio.sleep(1/50000/50)
return os.getpid()
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(burn_cpu)
# burn rubber in the parent too
await burn_cpu()
# wait on result from target function
pid = await portal.wait_for_result()
# end of nursery block
print(f"Collected subproc {pid}")
if __name__ == '__main__':
trio.run(main)
run_in_actor() is a convenience wrapper — spawn an actor, run
exactly one task in it, reap on result — not the core spawning
model (that’s tractor.ActorNursery.start_actor() plus
tractor.Portal.open_context(); see The Context: a cross-actor task pair).
But for this fire-and-collect shape it’s exactly the right amount
of typing.
As the module docstring suggests, run it under a process-tree monitor to watch the child appear and get reaped,
$TERM -e watch -n 0.1 "pstree -a $$" \
& python examples/parallelism/single_func.py \
&& kill $!
You’ll see a core get burned in both parent and child — real parallelism, no GIL sharing, since these are processes (i.e. non-shared-memory threads).
When all you have is sync code#
Honesty corner: if your workload is purely synchronous functions
and you’ve zero need for IPC dialogs, streaming, daemons or
supervision trees — i.e. you really do just want
“ProcessPoolExecutor but trio-native” — the smaller,
focused trio-parallel project may serve you better. tractor
happily covers the use case (as above) but brings a whole runtime
along for the ride. (And when blocking I/O — not the GIL — is the
actual problem, plain in-process trio.to_thread.run_sync()
may be all you ever needed.)
And to see that runtime’s process-management story — a per-core
fleet self-destructing with zero zombies left behind — go run
examples/parallelism/we_are_processes.py, walked through in
the Quickstart.
See also
Higher-level cluster APIs — the one-liner flat-cluster convenience (
open_actor_cluster()) for when even a hand-rolled pool is too much typing,Cancellation and error propagation — why pool teardown is bulletproof (graceful-then-hard escalation, no zombies),
The Context: a cross-actor task pair — the core per-task API your pool workers can graduate to.