Trio patterns: tractor.trionics#

Sugary structured concurrency (SC) patterns for plain trio code — no actor runtime required. These helpers grew out of real distributed-system needs in tractor apps but every one of them works in a single-process program too; import via from tractor import trionics.

Context-manager helpers#

tractor.trionics.gather_contexts(mngrs, tn=None)[source]#

Concurrently enter a sequence of async context managers (acms), each scheduled in a separate trio.Task and deliver their unwrapped yield-ed values in the same order once all @acms in every task have entered.

On exit, all acms are subsequently and concurrently exited with no order guarantees.

This function is somewhat similar to a batch of non-blocking calls to contextlib.AsyncExitStack.enter_async_context() (inside a loop) in combo with a asyncio.gather() to get the .__aenter__()-ed values, except the managers are both concurrently entered and exited and cancellation-just-works™.

Parameters:
Return type:

AsyncGenerator[tuple[T | None, …], None]

tractor.trionics.maybe_open_context(acm_func, kwargs={}, key=None, tn=None)[source]#

Maybe open an async-context-manager (acm) if there is not already a _Cached version for the provided (input) key for this actor.

Return the _Cached instance on a _Cache hit.

Parameters:
Return type:

AsyncIterator[tuple[bool, T]]

tractor.trionics.maybe_open_nursery(nursery=None, shield=False, lib=<module 'trio' from '/home/runner/work/tractor/tractor/.venv/lib/python3.14/site-packages/trio/__init__.py'>, **kwargs)[source]#

Create a new nursery if None provided.

Blocks on exit as expected if no input nursery is provided.

Parameters:
Return type:

AsyncGenerator[trio.Nursery, Any]

Note

gather_contexts() is “a nursery for async context managers”: it enters N acms concurrently and yields their values in input order. maybe_open_context() is the actor-wide cache/multiplex layer on top — the first task pays the acm setup cost, later callers get (cache_hit=True, ...) and share the same value until all users exit.

Broadcast fan-out#

tractor.trionics.broadcast_receiver(recv_chan, max_buffer_size, receive_afunc=None, raise_on_lag=True)[source]#
Parameters:
Return type:

BroadcastReceiver

class tractor.trionics.BroadcastReceiver(rx_chan, state, receive_afunc=None, raise_on_lag=True)[source]#

A memory receive channel broadcaster which is non-lossy for the fastest consumer.

Additional consumer tasks can receive all produced values by registering with .subscribe() and receiving from the new instance it delivers.

Parameters:
  • rx_chan (AsyncReceiver)

  • state (BroadcastState)

  • receive_afunc (Callable[[], Awaitable[Any]] | None)

  • raise_on_lag (bool)

async receive()[source]#

Attempt to receive an incoming object, blocking if necessary.

Returns:

object: Whatever object was received.

Raises:
trio.EndOfChannel: if the sender has been closed cleanly, and no

more objects are coming. This is not an error condition.

trio.ClosedResourceError: if you previously closed this

ReceiveChannel object.

trio.BrokenResourceError: if something has gone wrong, and the

channel is broken.

trio.BusyResourceError: some channels allow multiple tasks to call

receive at the same time, but others don’t. If you try to call receive simultaneously from multiple tasks on a channel that doesn’t support it, then you can get ~trio.BusyResourceError.

Return type:

ReceiveType

subscribe(raise_on_lag=True)[source]#

Subscribe for values from this broadcast receiver.

Returns a new BroadCastReceiver which is registered for and pulls data from a clone of the original trio.abc.ReceiveChannel provided at creation.

Parameters:

raise_on_lag (bool)

Return type:

AsyncIterator[BroadcastReceiver]

async aclose()[source]#

Close this receiver without affecting other consumers.

Return type:

None

exception tractor.trionics.Lagged[source]#

Bases: TooSlowError

Subscribed consumer task was too slow and was overrun by the fastest consumer-producer pair.

A single-producer, many-consumer broadcast layer over any trio-style receive channel: non-lossy for the fastest consumer while slower consumers raise Lagged (a trio.TooSlowError subtype) once they fall behind the internal ring. This is exactly the machinery behind tractor.MsgStream.subscribe() — see examples/streaming_broadcast_fanout.py.

ExceptionGroup helpers#

tractor.trionics.collapse_eg(hide_tb=True, ignore={}, add_notes=True, bp=False)[source]#

If BaseExceptionGroup raised in the body scope is “collapse-able” (in the same way that trio.open_nursery(strict_exception_groups=False) works) then only raise the lone emedded non-eg in in place.

Parameters:
tractor.trionics.maybe_raise_from_masking_exc(unmask_from=(<class 'trio.Cancelled'>, ), raise_unmasked=True, extra_note='This can occurr when, \n\n - a `trio.Nursery/CancelScope` embeds a `finally/except: `-block which execs an un-shielded checkpoint!', always_warn_on=(<class 'trio.Cancelled'>, ), never_warn_on={<class 'KeyboardInterrupt'>: <class 'trio.Cancelled'>, <class 'trio.Cancelled'>: <class 'trio.Cancelled'>})[source]#

Maybe un-mask and re-raise exception(s) suppressed by a known error-used-as-signal type (cough namely trio.Cancelled).

Though this unmasker targets cancelleds, it can be used more generally to capture and unwrap masked excs detected as .__context__ values which were suppressed by any error type passed in unmask_from.

STILL-TODO ??#

-[ ] support for egs which have multiple masked entries in

maybe_eg.exceptions, in which case we should unmask the individual sub-excs but maintain the eg-parent’s form right?

Parameters:
Return type:

BoxedMaybeException

Note

collapse_eg() “un-nests” single-exception ExceptionGroup wrappers from strict-eg trio nurseries so your except clauses match the original error; maybe_raise_from_masking_exc() surfaces real errors that would otherwise be masked by trio.Cancelled during teardown.

See also

Contexts and streaming for the IPC-stream consumer of BroadcastReceiver, Cross-process streaming for fan-out in a worked pipeline, and the trio docs for the underlying channel and nursery semantics these helpers compose.