Cross-process streaming#

Spawning processes is the boring half of tractor: the real cool stuff is the native support for cross-process streaming. Yes, you saw it here first — 2-way msg streams with reliable, transitive setup/teardown semantics, wired straight into the runtime’s structured concurrency (SC) supervision machinery so that how a stream ends is part of the protocol.

No broker, no topic exchange, no IDL compiler. The IPC layer is a deliberately “cheap or nasty(un)protocol: a tiny set of msgspec-typed msgs over a transport (TCP or UDS today) with payload typing opt-in per dialog — handshake msgs get the nasty treatment (strict validation) while high-rate stream payloads stay cheap (receiver-side checks only). See The Context: a cross-actor task pair for the typed pld_spec contract bits.

Two ways to stream#

Rule of thumb: if the consumer ever needs to talk back — acks, control msgs, a final result — use a context. If it’s a pure pipeline stage, either works and the one-way form is less typing.

One-way streaming from an async generator#

The OG api. Write an async generator in the target actor’s module; iterate its yields from the spawning side:

examples/asynchronous_generators.py#
from typing import AsyncIterator
from itertools import repeat

import trio
import tractor


async def stream_forever() -> AsyncIterator[int]:

    for i in repeat("I can see these little future bubble things"):
        # each yielded value is sent over the ``Channel`` to the parent actor
        yield i
        await trio.sleep(0.01)


async def main():

    async with tractor.open_nursery() as n:

        portal = await n.start_actor(
            'donny',
            enable_modules=[__name__],
        )

        # this async for loop streams values from the above
        # async generator running in a separate process
        async with portal.open_stream_from(stream_forever) as stream:
            count = 0
            async for letter in stream:
                print(letter)
                count += 1

                if count > 50:
                    break

        print('stream terminated')

        await portal.cancel_actor()


if __name__ == '__main__':
    trio.run(main)

Each yield crosses the process boundary as one msg and feeds the parent’s async for. When the consumer breaks out and exits the open_stream_from() block the far-end generator task is cancelled for you: the producer’s lifetime is coupled to the consumer’s scope so a one-way stream can never leak a remote task.

Any extra kwargs (stream_data, seed=100 style) are forwarded to the remote generator’s call, and a non-async-gen target is rejected up front with a TypeError.

Note

No decorator required — any plain async-gen fn works. You may still meet @tractor.stream in the wild; it’s the legacy marker for one-way endpoints and sticks around only for compat (heads up: the param name ctx is reserved for @context endpoints nowadays, so legacy fns should call theirs stream). New code wanting anything fancier than a one-way pipe should use tractor.context() + ctx.open_stream().

Warning

One-way means one way: there’s no sending to the generator side and no graceful consumer-to-producer stop msg — the teardown above is cancel-based. Needing upstream control flow is the sign you’ve outgrown this API.

A full-fledged streaming service#

Now let’s get fancy: compose one-way streams through a nested actor tree and you’ve got yourself a fan-in pipeline.

two streamer actors fan in to an aggregator then root

Four actors, three streams, one deduped feed.#

examples/full_fledged_streaming_service.py#
import time
import trio
import tractor
from tractor import (
    ActorNursery,
    MsgStream,
    Portal,
)


# this is the first 2 actors, streamer_1 and streamer_2
async def stream_data(seed):
    for i in range(seed):
        yield i
        await trio.sleep(0.0001)  # trigger scheduler


# this is the third actor; the aggregator
async def aggregate(seed):
    '''
    Ensure that the two streams we receive match but only stream
    a single set of values to the parent.

    '''
    an: ActorNursery
    async with tractor.open_nursery() as an:
        portals: list[Portal] = []
        for i in range(1, 3):

            # fork/spawn call
            portal = await an.start_actor(
                name=f'streamer_{i}',
                enable_modules=[__name__],
            )

            portals.append(portal)

        send_chan, recv_chan = trio.open_memory_channel(500)

        async def push_to_chan(portal, send_chan):

            # TODO: https://github.com/goodboy/tractor/issues/207
            async with send_chan:
                async with portal.open_stream_from(stream_data, seed=seed) as stream:
                    async for value in stream:
                        # leverage trio's built-in backpressure
                        await send_chan.send(value)

            print(f"FINISHED ITERATING {portal.channel.uid}")

        # spawn 2 trio tasks to collect streams and push to a local queue
        async with trio.open_nursery() as n:

            for portal in portals:
                n.start_soon(
                    push_to_chan,
                    portal,
                    send_chan.clone(),
                )

            # close this local task's reference to send side
            await send_chan.aclose()

            unique_vals = set()
            async with recv_chan:
                async for value in recv_chan:
                    if value not in unique_vals:
                        unique_vals.add(value)
                        # yield upwards to the spawning parent actor
                        yield value

                assert value in unique_vals

            print("FINISHED ITERATING in aggregator")

        await an.cancel()
        print("WAITING on `ActorNursery` to finish")
    print("AGGREGATOR COMPLETE!")


async def main() -> list[int]:
    '''
    This is the "root" actor's main task's entrypoint.

    By default (and if not otherwise specified) that root process
    also acts as a "registry actor" / "registrar" on the localhost
    for the purposes of multi-actor "service discovery".

    '''
    # yes, a nursery which spawns `trio`-"actors" B)
    an: ActorNursery
    async with tractor.open_nursery(
        loglevel='error',
        # debug_mode=True,
    ) as an:

        seed = int(1e3)
        pre_start = time.time()

        portal: Portal = await an.start_actor(
            name='aggregator',
            enable_modules=[__name__],
        )

        stream: MsgStream
        async with portal.open_stream_from(
            aggregate,
            seed=seed,
        ) as stream:

            start = time.time()
            # the portal call returns exactly what you'd expect
            # as if the remote "aggregate" function was called locally
            result_stream: list[int] = []
            async for value in stream:
                result_stream.append(value)

        cancelled: bool = await portal.cancel_actor()
        assert cancelled

        print(
            f"STREAM TIME = {time.time() - start}\n"
            f"STREAM + SPAWN TIME = {time.time() - pre_start}\n"
        )
        assert result_stream == list(range(seed))
        return result_stream


if __name__ == '__main__':
    final_stream = trio.run(main)

What’s going on?

  • the root actor spawns 'aggregator' which opens its own actor nursery and spawns 'streamer_1' + 'streamer_2': 4 processes total, supervision nested two levels deep with zero special casing.

  • aggregate() opens a one-way stream from each streamer and fans both into a single trio.open_memory_channel() via one local trio task per portal — in-actor fan-in riding trio’s built-in backpressure end-to-end.

  • duplicates get dropped via a set and the deduped sequence is re-yielded upward: aggregate() is itself an async gen being consumed over IPC by the root. Streams compose.

  • when the seed runs out the streamer gens finish, the memory channel drains closed, the aggregator’s gen returns and the root’s async for ends; await an.cancel() then reaps the subtree. Every exit is awaited — if you can produce a zombie process from this, it is a bug.

Watch the tree breathe while it runs, using the README’s signature process-monitor incantation:

$TERM -e watch -n 0.1  "pstree -a $$" \
    & python examples/full_fledged_streaming_service.py \
    && kill $!

No extra threads, no fancy semaphores, no futures; all we need is tractor’s IPC.

Two streams, one portal#

Every open_stream_from() call starts its own remote task — even through the same portal — so two local consumer tasks can independently stream the same generator fn concurrently, both dialogs multiplexed over the single underlying IPC channel:

examples/multiple_streams_one_portal.py#
import trio
import tractor


log = tractor.log.get_logger('multiportal')


async def stream_data(seed=10):
    log.info("Starting stream task")

    for i in range(seed):
        yield i
        await trio.sleep(0)  # trigger scheduler


async def stream_from_portal(p, consumed):

    async with p.open_stream_from(stream_data) as stream:
        async for item in stream:
            if item in consumed:
                consumed.remove(item)
            else:
                consumed.append(item)


async def main():

    async with tractor.open_nursery(loglevel='info') as an:

        p = await an.start_actor('stream_boi', enable_modules=[__name__])

        consumed = []

        async with trio.open_nursery() as n:
            for i in range(2):
                n.start_soon(stream_from_portal, p, consumed)

        # both streaming consumer tasks have completed and so we should
        # have nothing in our list thanks to single threadedness
        assert not consumed

        await an.cancel()


if __name__ == '__main__':
    trio.run(main)

The add-else-remove trick on the shared consumed list is the proof: each value arrives in both streams, getting appended by whichever task sees it first and removed by the other, so the list always ends up empty. Two streams, same data, zero interference.

This works because every dialog is keyed by its own context id (Context.cid) — any number of concurrent streams, contexts and one-shot RPCs share a single underlying tractor.Channel per peer pair.

Fan-out inside an actor: MsgStream.subscribe()#

The inverse pattern: one IPC stream feeding many local tasks. Instead of paying for N redundant cross-process streams, call tractor.MsgStream.subscribe() to get a BroadcastReceiver — a tokio-style broadcast channel from tractor.trionics — which copies every received value to each subscribed task:

examples/streaming_broadcast_fanout.py#
'''
Demonstrate fanning out ONE inter-actor `MsgStream` to N
local (parent-side) trio tasks using `MsgStream.subscribe()`:
each subscriber gets its own `BroadcastReceiver` copy of
every msg from the single underlying IPC stream.

The child waits for a 'go' msg so that all subscribers are
guaranteed-attached before the first tick is sent; when the
child's stream closes each subscriber's `async for` ends
cleanly.

'''
import trio
import tractor


@tractor.context
async def tick_stream(
    ctx: tractor.Context,
    count: int,
) -> None:
    '''
    Send `count` "ticks" once the parent says go.

    '''
    await ctx.started(count)
    async with ctx.open_stream() as stream:
        # wait for the go-signal ensuring every parent-side
        # subscriber is attached before any tick is sent.
        assert await stream.receive() == 'go'
        for i in range(count):
            await stream.send(i)
        # falling out gracefully closes our stream side;
        # all parent-side subscribers see end-of-channel.


async def consume(
    name: str,
    stream: tractor.MsgStream,
    task_status: trio.TaskStatus = trio.TASK_STATUS_IGNORED,
) -> None:
    '''
    Consume a private broadcast-copy of the IPC stream.

    '''
    async with stream.subscribe() as bcaster:
        task_status.started()
        ticks: list[int] = []
        async for tick in bcaster:
            print(f'{name}: rx {tick}')
            ticks.append(tick)
        # EVERY subscriber gets its own full copy B)
        print(f'{name}: stream ended, got {ticks}')


async def main() -> None:
    async with tractor.open_nursery() as an:
        portal = await an.start_actor(
            'ticker',
            enable_modules=[__name__],
        )
        async with (
            portal.open_context(
                tick_stream,
                count=5,
            ) as (ctx, first),
            ctx.open_stream() as stream,
        ):
            assert first == 5
            async with trio.open_nursery() as tn:
                # use `.start()` so each consumer is known
                # to be subscribed before the ticks flow.
                for i in range(3):
                    await tn.start(
                        consume,
                        f'sub_{i}',
                        stream,
                    )
                await stream.send('go')
        await portal.cancel_actor()


if __name__ == '__main__':
    trio.run(main)

Each task entering stream.subscribe() receives its own copy of everything sent from that point on. The underlying stream keeps pace with the fastest subscriber; a task falling more than the buffered window behind has its next receive raise tractor.trionics.Lagged to say it lost data.

The broadcast handle stays duplex btw: it proxies send() through to the underlying stream, so each subscriber task can keep talking upstream while consuming its fan-out copy.

Warning

.subscribe() is idempotent and non-reversible: the first call permanently swaps the stream’s receive machinery over to the internally allocated broadcaster. There’s no un-subscribing back to the raw stream, so make sure you’re ok with the (theoretical) overhead before opting in.

Consuming: async for and friends#

async for msg in stream: is just sugar over repeated await stream.receive(). The receive-side surface:

  • receive() — next msg, or raises trio.EndOfChannel on a graceful far-end close (async for translates that into a clean loop exit for you).

  • receive_nowait() — opportunistic, non-blocking drain.

  • closed — property flagging an already-ended stream.

Send-side it’s just await stream.send(data) — one Yield msg per call carrying any msgspec-encodable payload (or whatever your pld_spec permits, see The Context: a cross-actor task pair).

End-of-stream: close vs. cancel#

How a stream ends is part of the protocol; the runtime keeps the polite case and the violent case distinct:

  • graceful close: the far side exits its stream block, its async gen returns, or it calls await stream.aclose(). A Stop msg is sent so your async for simply ends (StopAsyncIteration, via trio.EndOfChannel under the hood). A normal, non-error ending — the dialog’s result phase proceeds as usual.

  • cancel or error: no Stop is sent. Instead the cancel/error itself is relayed so the far end knows the dialog did not end on purpose and raises accordingly — a tractor.ContextCancelled, a boxed tractor.RemoteActorError, etc. See the cancellation section of The Context: a cross-actor task pair for exactly who raises what.

Tying it together: every MsgStream is one-shot use. Both endings are final — once closed a stream can’t be re-opened and the supported “retry” is opening a fresh tractor.Context (they’re cheap).

See also