Trio patterns: tractor.trionics#
Sugary structured concurrency (SC) patterns for plain trio
code — no actor runtime required. These helpers grew out of
real distributed-system needs in tractor apps but every one of
them works in a single-process program too; import via
from tractor import trionics.
Context-manager helpers#
- tractor.trionics.gather_contexts(mngrs, tn=None)[source]#
Concurrently enter a sequence of async context managers (acms), each scheduled in a separate trio.Task and deliver their unwrapped yield-ed values in the same order once all @acms in every task have entered.
On exit, all acms are subsequently and concurrently exited with no order guarantees.
This function is somewhat similar to a batch of non-blocking calls to contextlib.AsyncExitStack.enter_async_context() (inside a loop) in combo with a asyncio.gather() to get the .__aenter__()-ed values, except the managers are both concurrently entered and exited and cancellation-just-works™.
- Parameters:
mngrs (Sequence[AsyncContextManager[T, bool | None]])
tn (Nursery | None)
- Return type:
AsyncGenerator[tuple[T | None, …], None]
- tractor.trionics.maybe_open_context(acm_func, kwargs={}, key=None, tn=None)[source]#
Maybe open an async-context-manager (acm) if there is not already a _Cached version for the provided (input) key for this actor.
Return the _Cached instance on a _Cache hit.
- tractor.trionics.maybe_open_nursery(nursery=None, shield=False, lib=<module 'trio' from '/home/runner/work/tractor/tractor/.venv/lib/python3.14/site-packages/trio/__init__.py'>, **kwargs)[source]#
Create a new nursery if None provided.
Blocks on exit as expected if no input nursery is provided.
- Parameters:
nursery (trio.Nursery | ActorNursery | None)
shield (bool)
lib (ModuleType)
- Return type:
AsyncGenerator[trio.Nursery, Any]
Note
gather_contexts() is “a nursery for async context
managers”: it enters N acms concurrently and yields their
values in input order. maybe_open_context() is the
actor-wide cache/multiplex layer on top — the first task pays
the acm setup cost, later callers get (cache_hit=True, ...)
and share the same value until all users exit.
Broadcast fan-out#
- tractor.trionics.broadcast_receiver(recv_chan, max_buffer_size, receive_afunc=None, raise_on_lag=True)[source]#
- class tractor.trionics.BroadcastReceiver(rx_chan, state, receive_afunc=None, raise_on_lag=True)[source]#
A memory receive channel broadcaster which is non-lossy for the fastest consumer.
Additional consumer tasks can receive all produced values by registering with
.subscribe()and receiving from the new instance it delivers.- Parameters:
rx_chan (AsyncReceiver)
state (BroadcastState)
receive_afunc (Callable[[], Awaitable[Any]] | None)
raise_on_lag (bool)
- async receive()[source]#
Attempt to receive an incoming object, blocking if necessary.
- Returns:
object: Whatever object was received.
- Raises:
- trio.EndOfChannel: if the sender has been closed cleanly, and no
more objects are coming. This is not an error condition.
- trio.ClosedResourceError: if you previously closed this
ReceiveChannelobject.- trio.BrokenResourceError: if something has gone wrong, and the
channel is broken.
- trio.BusyResourceError: some channels allow multiple tasks to call
receive at the same time, but others don’t. If you try to call receive simultaneously from multiple tasks on a channel that doesn’t support it, then you can get ~trio.BusyResourceError.
- Return type:
ReceiveType
- exception tractor.trionics.Lagged[source]#
Bases:
TooSlowErrorSubscribed consumer task was too slow and was overrun by the fastest consumer-producer pair.
A single-producer, many-consumer broadcast layer over any
trio-style receive channel: non-lossy for the fastest
consumer while slower consumers raise Lagged (a
trio.TooSlowError subtype) once they fall behind the
internal ring. This is exactly the machinery behind
tractor.MsgStream.subscribe() — see
examples/streaming_broadcast_fanout.py.
ExceptionGroup helpers#
- tractor.trionics.collapse_eg(hide_tb=True, ignore={}, add_notes=True, bp=False)[source]#
If BaseExceptionGroup raised in the body scope is “collapse-able” (in the same way that trio.open_nursery(strict_exception_groups=False) works) then only raise the lone emedded non-eg in in place.
- tractor.trionics.maybe_raise_from_masking_exc(unmask_from=(<class 'trio.Cancelled'>, ), raise_unmasked=True, extra_note='This can occurr when, \n\n - a `trio.Nursery/CancelScope` embeds a `finally/except: `-block which execs an un-shielded checkpoint!', always_warn_on=(<class 'trio.Cancelled'>, ), never_warn_on={<class 'KeyboardInterrupt'>: <class 'trio.Cancelled'>, <class 'trio.Cancelled'>: <class 'trio.Cancelled'>})[source]#
Maybe un-mask and re-raise exception(s) suppressed by a known error-used-as-signal type (cough namely trio.Cancelled).
Though this unmasker targets cancelleds, it can be used more generally to capture and unwrap masked excs detected as .__context__ values which were suppressed by any error type passed in unmask_from.
STILL-TODO ??#
- -[ ] support for egs which have multiple masked entries in
maybe_eg.exceptions, in which case we should unmask the individual sub-excs but maintain the eg-parent’s form right?
- Parameters:
unmask_from (BaseException | tuple[BaseException])
raise_unmasked (bool)
extra_note (str)
always_warn_on (tuple[Type[BaseException]])
never_warn_on (dict[Type[BaseException], Type[BaseException]])
- Return type:
BoxedMaybeException
Note
collapse_eg() “un-nests” single-exception
ExceptionGroup wrappers from strict-eg trio
nurseries so your except clauses match the original error;
maybe_raise_from_masking_exc() surfaces real errors that
would otherwise be masked by trio.Cancelled during
teardown.
See also
Contexts and streaming for the IPC-stream consumer of
BroadcastReceiver, Cross-process streaming for
fan-out in a worked pipeline, and the trio docs for the
underlying channel and nursery semantics these helpers
compose.