Contexts and streaming#

The modern core of tractor: a Context links a task in one actor to a task in another as a single structured concurrency (SC) scope stretched across the IPC boundary — errors, results and cancellation flow between the pair exactly like trio tasks under a common nursery. Open one with Portal.open_context() (see tractor._context.open_context_from_portal() in Runtime and spawning), then optionally bridge a bidirectional MsgStream between the two tasks.

parent and child actor context handshake sequence

The open_context() <-> ctx.started() handshake.#

For the guided, example-driven tour see The Context: a cross-actor task pair; this page is the precise API surface.

The @context decorator#

tractor.context(func=None, *, pld_spec=typing.Any, dec_hook=None, enc_hook=None)[source]#

Mark an async function as an SC-supervised, inter-Actor, RPC scheduled child-side Task, IPC endpoint otherwise known more colloquially as a (RPC) “context”.

Functions annotated the fundamental IPC endpoint type offered by tractor.

Parameters:
Return type:

Callable

Note

The decorated function must declare a parameter annotated tractor.Context (any param name works); the runtime injects the context instance there on each remote invocation. Pass pld_spec to type-restrict (and validate) the payloads this endpoint may shuttle — violations raise MsgTypeError. See examples/typed_payloads.py.

Context#

class tractor.Context(chan, cid, _actor, _rx_chan, _send_chan, _pld_rx, _nsf, _remote_func_type=None, _portal=None, _scope=None, _task=None, _outcome_msg=<class 'tractor._context.Unresolved'>, _result=<class 'tractor._context.Unresolved'>, _local_error=None, _remote_error=None, _cancel_called=False, _canceller=None, _cancel_msg=None, _enter_debugger_on_cancel=True, _started_called=False, _started_msg=None, _started_pld=None, _stream_opened=False, _stream=None, _caller_info=None, _overflow_q=<factory>, _scope_nursery=None, _in_overrun=False, _allow_overruns=False, _strict_started=False, _cancel_on_msgerr=True)[source]#

An inter-actor, SC transitive, trio.Task (pair) communication context.

(We’ve also considered other names and ideas:

  • “communicating tasks scope”: cts

  • “distributed task scope”: dts

  • “communicating tasks context”: ctc

Got a better idea for naming? Make an issue dawg! )

NB: This class should never be instatiated directly, it is allocated by the runtime in 2 ways:

  • by entering Portal.open_context() which is the primary public API for any “parent” task or,

  • by the RPC machinery’s ._rpc._invoke() as a ctx arg to a remotely scheduled “child” function.

AND is always constructed using the below mk_context().

Allows maintaining task or protocol specific state between 2 cancel-scope-linked, communicating and parallel executing Task`s. Contexts are allocated on each side of any task RPC-linked msg dialog, i.e. for every request to a remote actor from a `Portal. On the “child” side a context is always allocated inside ._rpc._invoke().

TODO: more detailed writeup on cancellation, error and streaming semantics..

A context can be cancelled and (possibly eventually restarted) from either side of the underlying IPC channel, it can also open task oriented message streams, and acts more or less as an IPC aware inter-actor-task trio.CancelScope.

Parameters:
property cancel_called: bool#

Records whether cancellation has been requested for this context by a call to .cancel() either due to,

  • an explicit call by some local task,

  • or an implicit call due to an error caught inside the Portal.open_context() block.

property canceller: tuple[str, str] | None#

Actor.aid.uid: tuple[str, str] of the (remote) actor-process who’s task was cancelled thus causing this (side of the) context to also be cancelled.

property cancel_acked: bool#

Records whether the task on the remote side of this IPC context acknowledged a cancel request via a relayed ContextCancelled with the .canceller attr set to the Actor.aid.uid of the local actor who’s task entered Portal.open_context().

This will only be True when .cancel() is called and the ctxc response contains a .canceller: tuple field equal to the uid of the calling task’s actor.

property cancelled_caught: bool#

Exactly the value of self._scope.cancelled_caught (delegation) and should only be (able to be read as) True for a .side == “parent” ctx wherein the Portal.open_context() block was exited due to a call to ._scope.cancel() - which should only ocurr in 2 cases:

  • a parent side calls .cancel(), the far side cancels and delivers back a ContextCancelled (making .cancel_acked == True) and ._scope.cancel() is called by ._maybe_cancel_and_set_remote_error() which in turn cancels all .open_context() started tasks (including any overrun queuing ones). => ._scope.cancelled_caught == True by normal trio cs semantics.

  • a parent side is delivered a ._remote_error: RemoteActorError via ._deliver_msg() and a transitive call to _maybe_cancel_and_set_remote_error() calls ._scope.cancel() and that cancellation eventually results in trio.Cancelled`(s) caught in the `.open_context() handling around the @acm’s yield.

Only as an FYI, in the “child” side case it can also be set but never is readable by any task outside the RPC machinery in ._invoke() since,:

  • when a child side calls .cancel(), ._scope.cancel() is called immediately and handled specially inside ._invoke() to raise a ContextCancelled which is then sent to the parent side.

    However, ._scope.cancelled_caught can NEVER be accessed/read as True by any RPC invoked task since it will have terminated before the cs block exit.

property side: str#

Return string indicating which task this instance is wrapping.

async cancel(timeout=0.616)[source]#

Cancel this inter-actor IPC context by requestng the remote side’s cancel-scope-linked Task by calling ._scope.cancel() and delivering an ContextCancelled ack msg in reponse.

Behaviour:

  • after the far end cancels, the .cancel() calling side should receive a ContextCancelled with the .canceller: tuple uid set to the current Actor.aid.uid.

  • timeout (quickly) on failure to rx this ACK error-msg in an attempt to sidestep 2-generals when the transport layer fails.

Note, that calling this method DOES NOT also necessarily result in Context._scope.cancel() being called locally!

=> That is, an IPC Context (this) does not

have the same semantics as a trio.CancelScope.

If the parent (who entered the Portal.open_context()) desires that the internal block’s cancel-scope be cancelled it should open its own trio.CancelScope and manage it as needed.

Parameters:

timeout (float)

Return type:

None

async wait_for_result(hide_tb=True)[source]#

From some (parent) side task, wait for and return the final result from the remote (child) side’s task.

This provides a mechanism for one task running in some actor to wait on another task at the other side, in some other actor, to terminate.

If the remote task is still in a streaming state (it is delivering values from inside a Context.open_stream(): block, then those msgs are drained but discarded since it is presumed this side of the context has already finished with its own streaming logic.

If the remote context (or its containing actor runtime) was canceled, either by a local task calling one of Context.cancel() or Portal.cancel_actor()`, we ignore the received ContextCancelled exception if the context or underlying IPC channel is marked as having been “cancel called”. This is similar behavior to using trio.Nursery.cancel() wherein tasks which raise trio.Cancel are silently reaped; the main different in this API is in the “cancel called” case, instead of just not raising, we also return the exception as the result since client code may be interested in the details of the remote cancellation.

Parameters:

hide_tb (bool)

Return type:

Any | Exception

property maybe_error: BaseException | None#

Return the (remote) error as outcome or None.

Remote errors take precedence over local ones.

property outcome: Any | RemoteActorError | ContextCancelled#

Return the “final outcome” (state) of the far end peer task non-blocking. If the remote task has not completed then this field always resolves to the module defined Unresolved handle.

—— - —— TODO->( this is doc-driven-dev content not yet actual ;P )

The final “outcome” from an IPC context which can be any of:

  • some outcome.Value which boxes the returned output from the peer task’s @context-decorated remote task-as-func, or

  • an outcome.Error wrapping an exception raised that same RPC task after a fault or cancellation, or

  • an unresolved outcome.Outcome when the peer task is still executing and has not yet completed.

async started(value=None, validate_pld_spec=True, strict_pld_parity=False, hide_tb=True)[source]#

Indicate to calling actor’s task that this linked context has started and send value to the other side via IPC.

On the calling side value is the second item delivered in the tuple returned by Portal.open_context().

Parameters:
  • value (PayloadT | None)

  • validate_pld_spec (bool)

  • strict_pld_parity (bool)

  • hide_tb (bool)

Return type:

None

Deprecated since version 0.1.0a6: Context.result() warns; use Context.wait_for_result().

Note

A Context is not a trio.CancelScope: Context.cancel() requests cancellation of the remote peer task and does not cancel the local scope. If you requested the cancel, the resulting ContextCancelled is absorbed at open_context() exit; a cancel originating anywhere else (the peer, or a third-party actor recorded in ContextCancelled.canceller) is raised locally. This self-vs-cross-cancel rule is the key to writing correct inter-actor teardown logic — see The Context: a cross-actor task pair.

Bidirectional streaming#

tractor._streaming.open_stream_from_ctx(ctx, allow_overruns=False, msg_buffer_size=None)[source]#

Open a MsgStream, a bi-directional msg transport dialog connected to the cross-actor peer task for an IPC Context.

This context manager must be entered in both the “parent” (task which entered Portal.open_context()) and “child” (RPC task which is decorated by @context) tasks for the stream to logically be considered “open”; if one side begins sending to an un-opened peer, depending on policy config, msgs will either be queued until the other side opens and/or a StreamOverrun will (eventually) be raised.

—— - ——

Runtime semantics design:

A MsgStream session adheres to “one-shot use” semantics, meaning if you close the scope it can not be “re-opened”.

Instead you must re-establish a new surrounding RPC Context (RTC: remote task context?) using Portal.open_context().

In the future this design choice may need to be changed but currently there seems to be no obvious reason to support such semantics..

  • “pausing a stream” can be supported with a message implemented by the tractor application dev.

  • any remote error will normally require a restart of the entire trio.Task’s scope due to the nature of trio’s cancellation (CancelScope) system and semantics (level triggered).

Parameters:
  • ctx (Context)

  • allow_overruns (bool | None)

  • msg_buffer_size (int | None)

Return type:

AsyncGenerator[MsgStream, None]

Note

open_stream_from_ctx() is bound as the method-alias Context.open_stream() — call it as async with ctx.open_stream() as stream:. Both sides of the context must enter it for the dialog to be open.

class tractor.MsgStream(ctx, rx_chan, _broadcaster=None)[source]#

A bidirectional message stream for receiving logically sequenced values over an inter-actor IPC Channel.

Termination rules:

  • on cancellation the stream is not implicitly closed and the surrounding Context is expected to handle how that cancel is relayed to any task on the remote side.

  • if the remote task signals the end of a stream the ReceiveChannel semantics dictate that a StopAsyncIteration to terminate the local async for.

Parameters:
property ctx: Context#

A read-only ref to this stream’s inter-actor-task Context.

async receive(hide_tb=False)[source]#

Receive a single msg from the IPC transport, the next in sequence sent by the far end task (possibly in order as determined by the underlying protocol).

Parameters:

hide_tb (bool)

async aclose()[source]#

Cancel associated remote actor task and local memory channel on close.

Notes:
Return type:

list[Exception | dict]

subscribe()[source]#

Allocate and return a BroadcastReceiver which delegates to this message stream.

This allows multiple local tasks to receive each their own copy of this message stream.

This operation is indempotent and and mutates this stream’s receive machinery to copy and window-length-store each received value from the far end via the internally created broudcast receiver wrapper.

Return type:

AsyncIterator[BroadcastReceiver]

async send(data, hide_tb=True)[source]#

Send a message over this stream to the far end.

Parameters:
Return type:

None

Note

A MsgStream is one-shot use: once closed it can never be “re-opened” — open a fresh Context instead. Remote end-of-stream surfaces as StopAsyncIteration from async for; un-consumed sends overrun the receiver and raise tractor._exceptions.StreamOverrun unless the context was opened with allow_overruns=True.

MsgStream.subscribe() fans a single IPC stream out to multiple local tasks via a tractor.trionics.BroadcastReceiver (see Trio patterns: tractor.trionics); the underlying allocation is idempotent and non-reversible for the stream’s lifetime. See examples/streaming_broadcast_fanout.py for the pattern in action.

Legacy one-way streaming#

tractor.stream(func)[source]#

Mark an async function as a streaming routine with @stream.

Parameters:

func (Callable)

Return type:

Callable

Warning

@tractor.stream and Portal.open_stream_from() are the legacy one-way streaming API kept for backward compat: a plain async-generator function streamed parent-ward with no child-side receive leg. New code should use @tractor.context + ctx.open_stream() (bidirectional, SC-linked, typed). Note ctx is now a reserved param name for @context endpoints — @stream functions must use stream instead, and ctx.send_yield() is deprecated in favor of MsgStream.send().

See also

Errors and cancellation types for ContextCancelled / MsgTypeError semantics, Typed messaging: tractor.msg for payload typing via pld_spec and codecs, Trio patterns: tractor.trionics for the broadcast fan-out machinery, and the guided tours in Cross-process streaming + Cancellation and error propagation.