Cross-process streaming#
Spawning processes is the boring half of tractor: the real
cool stuff is the native support for cross-process streaming.
Yes, you saw it here first — 2-way msg streams with reliable,
transitive setup/teardown semantics, wired straight into the
runtime’s structured concurrency (SC) supervision machinery so
that how a stream ends is part of the protocol.
No broker, no topic exchange, no IDL compiler. The IPC layer is a
deliberately “cheap or nasty” (un)protocol: a tiny set of
msgspec-typed msgs over a transport (TCP or UDS today) with
payload typing opt-in per dialog — handshake msgs get the nasty
treatment (strict validation) while high-rate stream payloads
stay cheap (receiver-side checks only). See
The Context: a cross-actor task pair for the typed pld_spec contract bits.
Two ways to stream#
Bidirectional, context-based: open a
tractor.Contextto a peer task then enterctx.open_stream()for a full-duplextractor.MsgStream. This is the modern core API, taught end-to-end in The Context: a cross-actor task pair; we won’t re-teach it here.One-way, portal-based: point
tractor.Portal.open_stream_from()at a plain async generator fn in the peer actor. Legacy, but perfectly fine for simple produce/consume pipelines — and it powers the classic examples below.
Rule of thumb: if the consumer ever needs to talk back — acks, control msgs, a final result — use a context. If it’s a pure pipeline stage, either works and the one-way form is less typing.
One-way streaming from an async generator#
The OG api. Write an async generator in the target actor’s module; iterate its yields from the spawning side:
from typing import AsyncIterator
from itertools import repeat
import trio
import tractor
async def stream_forever() -> AsyncIterator[int]:
for i in repeat("I can see these little future bubble things"):
# each yielded value is sent over the ``Channel`` to the parent actor
yield i
await trio.sleep(0.01)
async def main():
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'donny',
enable_modules=[__name__],
)
# this async for loop streams values from the above
# async generator running in a separate process
async with portal.open_stream_from(stream_forever) as stream:
count = 0
async for letter in stream:
print(letter)
count += 1
if count > 50:
break
print('stream terminated')
await portal.cancel_actor()
if __name__ == '__main__':
trio.run(main)
Each yield crosses the process boundary as one msg and feeds
the parent’s async for. When the consumer breaks out
and exits the open_stream_from() block the far-end generator
task is cancelled for you: the producer’s lifetime is coupled to
the consumer’s scope so a one-way stream can never leak a remote
task.
Any extra kwargs (stream_data, seed=100 style) are forwarded
to the remote generator’s call, and a non-async-gen target is
rejected up front with a TypeError.
Note
No decorator required — any plain async-gen fn works. You may
still meet @tractor.stream in the wild; it’s the legacy
marker for one-way endpoints and sticks around only for
compat (heads up: the param name ctx is reserved for
@context endpoints nowadays, so legacy fns should call
theirs stream). New code wanting anything fancier than a
one-way pipe should use tractor.context() +
ctx.open_stream().
Warning
One-way means one way: there’s no sending to the generator side and no graceful consumer-to-producer stop msg — the teardown above is cancel-based. Needing upstream control flow is the sign you’ve outgrown this API.
A full-fledged streaming service#
Now let’s get fancy: compose one-way streams through a nested actor tree and you’ve got yourself a fan-in pipeline.
Four actors, three streams, one deduped feed.#
import time
import trio
import tractor
from tractor import (
ActorNursery,
MsgStream,
Portal,
)
# this is the first 2 actors, streamer_1 and streamer_2
async def stream_data(seed):
for i in range(seed):
yield i
await trio.sleep(0.0001) # trigger scheduler
# this is the third actor; the aggregator
async def aggregate(seed):
'''
Ensure that the two streams we receive match but only stream
a single set of values to the parent.
'''
an: ActorNursery
async with tractor.open_nursery() as an:
portals: list[Portal] = []
for i in range(1, 3):
# fork/spawn call
portal = await an.start_actor(
name=f'streamer_{i}',
enable_modules=[__name__],
)
portals.append(portal)
send_chan, recv_chan = trio.open_memory_channel(500)
async def push_to_chan(portal, send_chan):
# TODO: https://github.com/goodboy/tractor/issues/207
async with send_chan:
async with portal.open_stream_from(stream_data, seed=seed) as stream:
async for value in stream:
# leverage trio's built-in backpressure
await send_chan.send(value)
print(f"FINISHED ITERATING {portal.channel.uid}")
# spawn 2 trio tasks to collect streams and push to a local queue
async with trio.open_nursery() as n:
for portal in portals:
n.start_soon(
push_to_chan,
portal,
send_chan.clone(),
)
# close this local task's reference to send side
await send_chan.aclose()
unique_vals = set()
async with recv_chan:
async for value in recv_chan:
if value not in unique_vals:
unique_vals.add(value)
# yield upwards to the spawning parent actor
yield value
assert value in unique_vals
print("FINISHED ITERATING in aggregator")
await an.cancel()
print("WAITING on `ActorNursery` to finish")
print("AGGREGATOR COMPLETE!")
async def main() -> list[int]:
'''
This is the "root" actor's main task's entrypoint.
By default (and if not otherwise specified) that root process
also acts as a "registry actor" / "registrar" on the localhost
for the purposes of multi-actor "service discovery".
'''
# yes, a nursery which spawns `trio`-"actors" B)
an: ActorNursery
async with tractor.open_nursery(
loglevel='error',
# debug_mode=True,
) as an:
seed = int(1e3)
pre_start = time.time()
portal: Portal = await an.start_actor(
name='aggregator',
enable_modules=[__name__],
)
stream: MsgStream
async with portal.open_stream_from(
aggregate,
seed=seed,
) as stream:
start = time.time()
# the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally
result_stream: list[int] = []
async for value in stream:
result_stream.append(value)
cancelled: bool = await portal.cancel_actor()
assert cancelled
print(
f"STREAM TIME = {time.time() - start}\n"
f"STREAM + SPAWN TIME = {time.time() - pre_start}\n"
)
assert result_stream == list(range(seed))
return result_stream
if __name__ == '__main__':
final_stream = trio.run(main)
What’s going on?
the root actor spawns
'aggregator'which opens its own actor nursery and spawns'streamer_1'+'streamer_2': 4 processes total, supervision nested two levels deep with zero special casing.aggregate()opens a one-way stream from each streamer and fans both into a singletrio.open_memory_channel()via one local trio task per portal — in-actor fan-in riding trio’s built-in backpressure end-to-end.duplicates get dropped via a
setand the deduped sequence is re-yielded upward:aggregate()is itself an async gen being consumed over IPC by the root. Streams compose.when the seed runs out the streamer gens finish, the memory channel drains closed, the aggregator’s gen returns and the root’s
async forends;await an.cancel()then reaps the subtree. Every exit is awaited — if you can produce a zombie process from this, it is a bug.
Watch the tree breathe while it runs, using the README’s signature process-monitor incantation:
$TERM -e watch -n 0.1 "pstree -a $$" \
& python examples/full_fledged_streaming_service.py \
&& kill $!
No extra threads, no fancy semaphores, no futures; all we need is
tractor’s IPC.
Two streams, one portal#
Every open_stream_from() call starts its own remote task —
even through the same portal — so two local consumer tasks can
independently stream the same generator fn concurrently, both
dialogs multiplexed over the single underlying IPC channel:
import trio
import tractor
log = tractor.log.get_logger('multiportal')
async def stream_data(seed=10):
log.info("Starting stream task")
for i in range(seed):
yield i
await trio.sleep(0) # trigger scheduler
async def stream_from_portal(p, consumed):
async with p.open_stream_from(stream_data) as stream:
async for item in stream:
if item in consumed:
consumed.remove(item)
else:
consumed.append(item)
async def main():
async with tractor.open_nursery(loglevel='info') as an:
p = await an.start_actor('stream_boi', enable_modules=[__name__])
consumed = []
async with trio.open_nursery() as n:
for i in range(2):
n.start_soon(stream_from_portal, p, consumed)
# both streaming consumer tasks have completed and so we should
# have nothing in our list thanks to single threadedness
assert not consumed
await an.cancel()
if __name__ == '__main__':
trio.run(main)
The add-else-remove trick on the shared consumed list is the
proof: each value arrives in both streams, getting appended by
whichever task sees it first and removed by the other, so the
list always ends up empty. Two streams, same data, zero
interference.
This works because every dialog is keyed by its own context id
(Context.cid) — any number of concurrent streams, contexts
and one-shot RPCs share a single underlying
tractor.Channel per peer pair.
Fan-out inside an actor: MsgStream.subscribe()#
The inverse pattern: one IPC stream feeding many local tasks.
Instead of paying for N redundant cross-process streams, call
tractor.MsgStream.subscribe() to get a
BroadcastReceiver — a tokio-style broadcast channel from
tractor.trionics — which copies every received value to each
subscribed task:
'''
Demonstrate fanning out ONE inter-actor `MsgStream` to N
local (parent-side) trio tasks using `MsgStream.subscribe()`:
each subscriber gets its own `BroadcastReceiver` copy of
every msg from the single underlying IPC stream.
The child waits for a 'go' msg so that all subscribers are
guaranteed-attached before the first tick is sent; when the
child's stream closes each subscriber's `async for` ends
cleanly.
'''
import trio
import tractor
@tractor.context
async def tick_stream(
ctx: tractor.Context,
count: int,
) -> None:
'''
Send `count` "ticks" once the parent says go.
'''
await ctx.started(count)
async with ctx.open_stream() as stream:
# wait for the go-signal ensuring every parent-side
# subscriber is attached before any tick is sent.
assert await stream.receive() == 'go'
for i in range(count):
await stream.send(i)
# falling out gracefully closes our stream side;
# all parent-side subscribers see end-of-channel.
async def consume(
name: str,
stream: tractor.MsgStream,
task_status: trio.TaskStatus = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Consume a private broadcast-copy of the IPC stream.
'''
async with stream.subscribe() as bcaster:
task_status.started()
ticks: list[int] = []
async for tick in bcaster:
print(f'{name}: rx {tick}')
ticks.append(tick)
# EVERY subscriber gets its own full copy B)
print(f'{name}: stream ended, got {ticks}')
async def main() -> None:
async with tractor.open_nursery() as an:
portal = await an.start_actor(
'ticker',
enable_modules=[__name__],
)
async with (
portal.open_context(
tick_stream,
count=5,
) as (ctx, first),
ctx.open_stream() as stream,
):
assert first == 5
async with trio.open_nursery() as tn:
# use `.start()` so each consumer is known
# to be subscribed before the ticks flow.
for i in range(3):
await tn.start(
consume,
f'sub_{i}',
stream,
)
await stream.send('go')
await portal.cancel_actor()
if __name__ == '__main__':
trio.run(main)
Each task entering stream.subscribe() receives its own copy
of everything sent from that point on. The underlying stream
keeps pace with the fastest subscriber; a task falling more
than the buffered window behind has its next receive raise
tractor.trionics.Lagged to say it lost data.
The broadcast handle stays duplex btw: it proxies send()
through to the underlying stream, so each subscriber task can
keep talking upstream while consuming its fan-out copy.
Warning
.subscribe() is idempotent and non-reversible: the
first call permanently swaps the stream’s receive machinery
over to the internally allocated broadcaster. There’s no
un-subscribing back to the raw stream, so make sure you’re ok
with the (theoretical) overhead before opting in.
Consuming: async for and friends#
async for msg in stream: is just sugar over repeated
await stream.receive(). The receive-side surface:
receive()— next msg, or raisestrio.EndOfChannelon a graceful far-end close (async fortranslates that into a clean loop exit for you).receive_nowait()— opportunistic, non-blocking drain.closed— property flagging an already-ended stream.
Send-side it’s just await stream.send(data) — one Yield
msg per call carrying any msgspec-encodable payload (or
whatever your pld_spec permits, see The Context: a cross-actor task pair).
End-of-stream: close vs. cancel#
How a stream ends is part of the protocol; the runtime keeps the polite case and the violent case distinct:
graceful close: the far side exits its stream block, its async gen returns, or it calls
await stream.aclose(). AStopmsg is sent so yourasync forsimply ends (StopAsyncIteration, viatrio.EndOfChannelunder the hood). A normal, non-error ending — the dialog’s result phase proceeds as usual.cancel or error: no
Stopis sent. Instead the cancel/error itself is relayed so the far end knows the dialog did not end on purpose and raises accordingly — atractor.ContextCancelled, a boxedtractor.RemoteActorError, etc. See the cancellation section of The Context: a cross-actor task pair for exactly who raises what.
Tying it together: every MsgStream is one-shot use. Both
endings are final — once closed a stream can’t be re-opened and
the supported “retry” is opening a fresh tractor.Context
(they’re cheap).
See also
The Context: a cross-actor task pair — the full
Contextlifecycle: the handshake, results, cancellation semantics and the overrun/backpressure knobs.tractor.MsgStreamandtractor.Portal.open_stream_from()API docs.The zguide chapters our wire philosophy is named after: “cheap or nasty” and (un)protocols.