Contexts and streaming#
The modern core of tractor: a Context links a task in
one actor to a task in another as a single structured concurrency
(SC) scope stretched across the IPC boundary — errors, results and
cancellation flow between the pair exactly like trio tasks under
a common nursery. Open one with Portal.open_context() (see
tractor._context.open_context_from_portal() in
Runtime and spawning), then optionally bridge a bidirectional
MsgStream between the two tasks.
The open_context() <-> ctx.started() handshake.#
For the guided, example-driven tour see The Context: a cross-actor task pair; this page is the precise API surface.
The @context decorator#
- tractor.context(func=None, *, pld_spec=typing.Any, dec_hook=None, enc_hook=None)[source]#
Mark an async function as an SC-supervised, inter-Actor, RPC scheduled child-side Task, IPC endpoint otherwise known more colloquially as a (RPC) “context”.
Functions annotated the fundamental IPC endpoint type offered by tractor.
Note
The decorated function must declare a parameter annotated
tractor.Context (any param name works); the runtime injects
the context instance there on each remote invocation. Pass
pld_spec to type-restrict (and validate) the payloads this
endpoint may shuttle — violations raise
MsgTypeError. See examples/typed_payloads.py.
Context#
- class tractor.Context(chan, cid, _actor, _rx_chan, _send_chan, _pld_rx, _nsf, _remote_func_type=None, _portal=None, _scope=None, _task=None, _outcome_msg=<class 'tractor._context.Unresolved'>, _result=<class 'tractor._context.Unresolved'>, _local_error=None, _remote_error=None, _cancel_called=False, _canceller=None, _cancel_msg=None, _enter_debugger_on_cancel=True, _started_called=False, _started_msg=None, _started_pld=None, _stream_opened=False, _stream=None, _caller_info=None, _overflow_q=<factory>, _scope_nursery=None, _in_overrun=False, _allow_overruns=False, _strict_started=False, _cancel_on_msgerr=True)[source]#
An inter-actor, SC transitive, trio.Task (pair) communication context.
(We’ve also considered other names and ideas:
“communicating tasks scope”: cts
“distributed task scope”: dts
“communicating tasks context”: ctc
Got a better idea for naming? Make an issue dawg! )
NB: This class should never be instatiated directly, it is allocated by the runtime in 2 ways:
by entering Portal.open_context() which is the primary public API for any “parent” task or,
by the RPC machinery’s ._rpc._invoke() as a ctx arg to a remotely scheduled “child” function.
AND is always constructed using the below mk_context().
Allows maintaining task or protocol specific state between 2 cancel-scope-linked, communicating and parallel executing Task`s. Contexts are allocated on each side of any task RPC-linked msg dialog, i.e. for every request to a remote actor from a `Portal. On the “child” side a context is always allocated inside ._rpc._invoke().
TODO: more detailed writeup on cancellation, error and streaming semantics..
A context can be cancelled and (possibly eventually restarted) from either side of the underlying IPC channel, it can also open task oriented message streams, and acts more or less as an IPC aware inter-actor-task
trio.CancelScope.- Parameters:
chan (Channel)
cid (str)
_actor (Actor)
_rx_chan (trio.MemoryReceiveChannel)
_send_chan (trio.MemorySendChannel)
_pld_rx (msgops.PldRx)
_nsf (NamespacePath)
_remote_func_type (str | None)
_portal (Portal | None)
_scope (trio.CancelScope | None)
_task (Task | None)
_outcome_msg (Return | Error | ContextCancelled)
_result (PayloadT | Unresolved)
_local_error (BaseException | None)
_remote_error (BaseException | None)
_cancel_called (bool)
_enter_debugger_on_cancel (bool)
_started_called (bool)
_started_msg (MsgType | None)
_started_pld (Any)
_stream_opened (bool)
_stream (MsgStream | None)
_caller_info (CallerInfo | None)
_overflow_q (deque[dict])
_scope_nursery (trio.Nursery | None)
_in_overrun (bool)
_allow_overruns (bool)
_strict_started (bool)
_cancel_on_msgerr (bool)
- property cancel_called: bool#
Records whether cancellation has been requested for this context by a call to .cancel() either due to,
an explicit call by some local task,
or an implicit call due to an error caught inside the Portal.open_context() block.
- property canceller: tuple[str, str] | None#
Actor.aid.uid: tuple[str, str] of the (remote) actor-process who’s task was cancelled thus causing this (side of the) context to also be cancelled.
- property cancel_acked: bool#
Records whether the task on the remote side of this IPC context acknowledged a cancel request via a relayed ContextCancelled with the .canceller attr set to the Actor.aid.uid of the local actor who’s task entered Portal.open_context().
This will only be True when .cancel() is called and the ctxc response contains a .canceller: tuple field equal to the uid of the calling task’s actor.
- property cancelled_caught: bool#
Exactly the value of self._scope.cancelled_caught (delegation) and should only be (able to be read as) True for a .side == “parent” ctx wherein the Portal.open_context() block was exited due to a call to ._scope.cancel() - which should only ocurr in 2 cases:
a parent side calls .cancel(), the far side cancels and delivers back a ContextCancelled (making .cancel_acked == True) and ._scope.cancel() is called by ._maybe_cancel_and_set_remote_error() which in turn cancels all .open_context() started tasks (including any overrun queuing ones). => ._scope.cancelled_caught == True by normal trio cs semantics.
a parent side is delivered a ._remote_error: RemoteActorError via ._deliver_msg() and a transitive call to _maybe_cancel_and_set_remote_error() calls ._scope.cancel() and that cancellation eventually results in trio.Cancelled`(s) caught in the `.open_context() handling around the @acm’s yield.
Only as an FYI, in the “child” side case it can also be set but never is readable by any task outside the RPC machinery in ._invoke() since,:
when a child side calls .cancel(), ._scope.cancel() is called immediately and handled specially inside ._invoke() to raise a ContextCancelled which is then sent to the parent side.
However, ._scope.cancelled_caught can NEVER be accessed/read as True by any RPC invoked task since it will have terminated before the cs block exit.
- async cancel(timeout=0.616)[source]#
Cancel this inter-actor IPC context by requestng the remote side’s cancel-scope-linked Task by calling ._scope.cancel() and delivering an ContextCancelled ack msg in reponse.
Behaviour:
after the far end cancels, the .cancel() calling side should receive a ContextCancelled with the .canceller: tuple uid set to the current Actor.aid.uid.
timeout (quickly) on failure to rx this ACK error-msg in an attempt to sidestep 2-generals when the transport layer fails.
Note, that calling this method DOES NOT also necessarily result in Context._scope.cancel() being called locally!
- => That is, an IPC Context (this) does not
have the same semantics as a trio.CancelScope.
If the parent (who entered the Portal.open_context()) desires that the internal block’s cancel-scope be cancelled it should open its own trio.CancelScope and manage it as needed.
- Parameters:
timeout (float)
- Return type:
None
- async wait_for_result(hide_tb=True)[source]#
From some (parent) side task, wait for and return the final result from the remote (child) side’s task.
This provides a mechanism for one task running in some actor to wait on another task at the other side, in some other actor, to terminate.
If the remote task is still in a streaming state (it is delivering values from inside a
Context.open_stream():block, then those msgs are drained but discarded since it is presumed this side of the context has already finished with its own streaming logic.If the remote context (or its containing actor runtime) was canceled, either by a local task calling one of
Context.cancel()or Portal.cancel_actor()`, we ignore the receivedContextCancelledexception if the context or underlying IPC channel is marked as having been “cancel called”. This is similar behavior to usingtrio.Nursery.cancel()wherein tasks which raisetrio.Cancelare silently reaped; the main different in this API is in the “cancel called” case, instead of just not raising, we also return the exception as the result since client code may be interested in the details of the remote cancellation.
- property maybe_error: BaseException | None#
Return the (remote) error as outcome or None.
Remote errors take precedence over local ones.
- property outcome: Any | RemoteActorError | ContextCancelled#
Return the “final outcome” (state) of the far end peer task non-blocking. If the remote task has not completed then this field always resolves to the module defined Unresolved handle.
—— - —— TODO->( this is doc-driven-dev content not yet actual ;P )
The final “outcome” from an IPC context which can be any of:
some outcome.Value which boxes the returned output from the peer task’s @context-decorated remote task-as-func, or
an outcome.Error wrapping an exception raised that same RPC task after a fault or cancellation, or
an unresolved outcome.Outcome when the peer task is still executing and has not yet completed.
- async started(value=None, validate_pld_spec=True, strict_pld_parity=False, hide_tb=True)[source]#
Indicate to calling actor’s task that this linked context has started and send
valueto the other side via IPC.On the calling side
valueis the second item delivered in the tuple returned byPortal.open_context().
Deprecated since version 0.1.0a6: Context.result() warns; use Context.wait_for_result().
Note
A Context is not a trio.CancelScope:
Context.cancel() requests cancellation of the remote
peer task and does not cancel the local scope. If you
requested the cancel, the resulting ContextCancelled
is absorbed at open_context() exit; a cancel originating
anywhere else (the peer, or a third-party actor recorded in
ContextCancelled.canceller) is raised locally. This
self-vs-cross-cancel rule is the key to writing correct
inter-actor teardown logic — see The Context: a cross-actor task pair.
Bidirectional streaming#
- tractor._streaming.open_stream_from_ctx(ctx, allow_overruns=False, msg_buffer_size=None)[source]#
Open a MsgStream, a bi-directional msg transport dialog connected to the cross-actor peer task for an IPC Context.
This context manager must be entered in both the “parent” (task which entered Portal.open_context()) and “child” (RPC task which is decorated by @context) tasks for the stream to logically be considered “open”; if one side begins sending to an un-opened peer, depending on policy config, msgs will either be queued until the other side opens and/or a StreamOverrun will (eventually) be raised.
—— - ——
Runtime semantics design:
A MsgStream session adheres to “one-shot use” semantics, meaning if you close the scope it can not be “re-opened”.
Instead you must re-establish a new surrounding RPC Context (RTC: remote task context?) using Portal.open_context().
In the future this design choice may need to be changed but currently there seems to be no obvious reason to support such semantics..
“pausing a stream” can be supported with a message implemented by the tractor application dev.
any remote error will normally require a restart of the entire trio.Task’s scope due to the nature of trio’s cancellation (CancelScope) system and semantics (level triggered).
Note
open_stream_from_ctx() is bound as
the method-alias Context.open_stream() — call it as
async with ctx.open_stream() as stream:. Both sides of the
context must enter it for the dialog to be open.
- class tractor.MsgStream(ctx, rx_chan, _broadcaster=None)[source]#
A bidirectional message stream for receiving logically sequenced values over an inter-actor IPC Channel.
Termination rules:
on cancellation the stream is not implicitly closed and the surrounding
Contextis expected to handle how that cancel is relayed to any task on the remote side.if the remote task signals the end of a stream the
ReceiveChannelsemantics dictate that aStopAsyncIterationto terminate the localasync for.
- Parameters:
ctx (Context)
rx_chan (trio.MemoryReceiveChannel)
_broadcaster (BroadcastReceiver | None)
- async receive(hide_tb=False)[source]#
Receive a single msg from the IPC transport, the next in sequence sent by the far end task (possibly in order as determined by the underlying protocol).
- Parameters:
hide_tb (bool)
- async aclose()[source]#
Cancel associated remote actor task and local memory channel on close.
- Notes:
REMEMBER that this is also called by .__aexit__() so careful consideration must be made to handle whatever internal stsate is mutated, particuarly in terms of draining IPC msgs!
more or less we try to maintain adherance to trio’s .aclose() semantics: https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
- subscribe()[source]#
Allocate and return a
BroadcastReceiverwhich delegates to this message stream.This allows multiple local tasks to receive each their own copy of this message stream.
This operation is indempotent and and mutates this stream’s receive machinery to copy and window-length-store each received value from the far end via the internally created broudcast receiver wrapper.
- Return type:
Note
A MsgStream is one-shot use: once closed it can never
be “re-opened” — open a fresh Context instead. Remote
end-of-stream surfaces as StopAsyncIteration from
async for; un-consumed sends overrun the receiver and raise
tractor._exceptions.StreamOverrun unless the context
was opened with allow_overruns=True.
MsgStream.subscribe() fans a single IPC stream out to
multiple local tasks via a
tractor.trionics.BroadcastReceiver (see
Trio patterns: tractor.trionics); the underlying allocation is idempotent and
non-reversible for the stream’s lifetime. See
examples/streaming_broadcast_fanout.py for the pattern in
action.
Legacy one-way streaming#
Warning
@tractor.stream and Portal.open_stream_from() are the
legacy one-way streaming API kept for backward compat: a
plain async-generator function streamed parent-ward with no
child-side receive leg. New code should use
@tractor.context + ctx.open_stream() (bidirectional,
SC-linked, typed). Note ctx is now a reserved param name
for @context endpoints — @stream functions must use
stream instead, and ctx.send_yield() is deprecated in
favor of MsgStream.send().
See also
Errors and cancellation types for ContextCancelled /
MsgTypeError semantics, Typed messaging: tractor.msg for payload
typing via pld_spec and codecs, Trio patterns: tractor.trionics for
the broadcast fan-out machinery, and the guided tours in
Cross-process streaming + Cancellation and error propagation.