Infected asyncio#
tractor is “just trio”, but the Python world is packed with
libraries that only speak asyncio: websocket stacks, vendor
SDKs, that one exchange client you can’t route around. Rather than
make you rewrite them, tractor lets you quarantine them inside
a dedicated subactor which runs both event loops at once, with full
structured concurrency (SC) guarantees maintained across the
loop boundary and the process tree.
In the project’s own words:
Yes, we spawn a python process, run
asyncio, starttrioon theasyncioloop, then send commands to thetrioscheduled tasks to tellasynciotasks what to do XD
We call this “infected asyncio” mode: the subactor’s stdlib
loop runs as the host with trio embedded on top in guest
mode, and your trio tasks drive asyncio tasks through
a linked, SC-supervised, in-memory channel.
One process, two schedulers: trio rides the
asyncio loop as a guest while the parent speaks plain
tractor IPC, none the wiser.#
Note
Infected asyncio mode is experimental: it works (we
beat on it plenty) but parts of the API surface and some
edge-case semantics are still settling. Got opinions on the
interop design? Feel free to sling them in #273!
How the infection takes hold#
A normal subactor boots by running the tractor runtime’s task
tree directly under trio.run(). Pass infect_asyncio=True
at spawn time and the child’s entrypoint changes shape entirely:
the process starts the stdlib loop via
asyncio.run(),the first
asynciotask callstrio.lowlevel.start_guest_run(), embedding thetrioscheduler inside the already runningasyncioloop (the upstream guest-mode feature),the regular
tractorruntime then boots on the guesttrioside and connects back to its parent like any other subactor.
Both schedulers interleave in a single thread, no GIL gymnastics
required. From the rest of the actor tree the infected child is
indistinguishable from any other actor: same IPC protocol, same
supervision and cancellation semantics, same zombie-safety
guarantees. The difference is purely internal: trio tasks in
that process can start and drive asyncio tasks through the
tractor.to_asyncio API.
Spawning an infected subactor#
Just flip the flag on tractor.ActorNursery.start_actor():
async with tractor.open_nursery() as an:
portal = await an.start_actor(
'aio_side',
enable_modules=[__name__],
infect_asyncio=True,
)
The one-shot convenience ActorNursery.run_in_actor() accepts
the same flag. The to_asyncio APIs may only be called from
tasks inside an infected actor; calling them anywhere else raises
a loud RuntimeError. You can introspect at runtime with
tractor.current_actor().is_infected_aio().
Linking tasks with open_channel_from()#
The core primitive is tractor.to_asyncio.open_channel_from(),
an async context manager which starts your asyncio function as
a real asyncio.Task and yields a two-way channel linking it to
the calling trio task:
from tractor import to_asyncio
async with to_asyncio.open_channel_from(
aio_main, # async def aio_main(chan, **kwargs)
period=0.5, # extra kwargs are passed through
) as (chan, first):
await chan.send('tick')
The semantics deliberately mirror the inter-actor Context
handshake from The Context: a cross-actor task pair:
the target fn must declare a parameter literally named
chan; the runtime injects the sharedLinkedTaskChannelby keyword.the
trioside blocks at entry until theasynciotask callschan.started_nowait(value); that value is delivered asfirst, exactly like the(ctx, first)pair you get fromPortal.open_context()after the child callsctx.started().a first value must be sent from the
asyncioside or thetrioside will never unblock.on block exit the pair is torn down together; neither task can outlive the other (more on this below).
A full example: the echo server#
Here’s the canonical demo, a round-trip echo service where the
asyncio task is told what to do by a trio task which is in
turn driven over IPC by the root actor:
'''
An SC compliant infected ``asyncio`` echo server.
'''
import asyncio
from statistics import mean
import time
import trio
import tractor
async def aio_echo_server(
chan: tractor.to_asyncio.LinkedTaskChannel,
) -> None:
# a first message must be sent **from** this ``asyncio``
# task or the ``trio`` side will never unblock from
# ``tractor.to_asyncio.open_channel_from():``
chan.started_nowait('start')
while True:
# echo the msg back
chan.send_nowait(await chan.get())
await asyncio.sleep(0)
@tractor.context
async def trio_to_aio_echo_server(
ctx: tractor.Context,
):
# this will block until the ``asyncio`` task sends a "first"
# message.
async with tractor.to_asyncio.open_channel_from(
aio_echo_server,
) as (chan, first):
assert first == 'start'
await ctx.started(first)
async with ctx.open_stream() as stream:
async for msg in stream:
await chan.send(msg)
out = await chan.receive()
# echo back to parent actor-task
await stream.send(out)
async def main():
async with tractor.open_nursery() as n:
p = await n.start_actor(
'aio_server',
enable_modules=[__name__],
infect_asyncio=True,
)
async with p.open_context(
trio_to_aio_echo_server,
) as (ctx, first):
assert first == 'start'
count = 0
async with ctx.open_stream() as stream:
delays = []
send = time.time()
await stream.send(count)
async for msg in stream:
recv = time.time()
delays.append(recv - send)
assert msg == count
count += 1
send = time.time()
await stream.send(count)
if count >= 1e3:
break
print(f'mean round trip rate (Hz): {1/mean(delays)}')
await p.cancel_actor()
if __name__ == '__main__':
trio.run(main)
What’s going on?
there are three task layers: the root actor’s pure
triotask, the infected child’strio-side@tractor.contextendpoint (trio_to_aio_echo_server()), and the child’sasynciotask (aio_echo_server()).two
started-style handshakes compose: the aio task’schan.started_nowait('start')unblocks the child’sopen_channel_from()entry, then the child relays that same value up viaawait ctx.started(first)which unblocks the root’sopen_context()entry. Synchronization all the way down, er, up.each round trip flows: root
stream.send()-> IPC -> childasync for msg in stream->chan.send(msg)-> aioawait chan.get()->chan.send_nowait()-> childchan.receive()->stream.send(out)-> IPC -> root.when the root breaks out of its stream loop and exits the context block, the child’s stream ends, its channel block exits, and the
asynciotask is reaped along with it; the finalportal.cancel_actor()then tears down the whole process. No orphanedasynciotasks, no zombie procs; if you manage to create either it is a bug.
LinkedTaskChannel: one channel, two sides#
The same channel object is shared by both tasks; which methods you
call depends on which loop schedules your task. The trio side
gets a standard trio.abc.Channel interface while the
asyncio side gets queue-flavored, mostly-sync methods:
side |
call |
what it does |
|---|---|---|
|
|
ship |
|
|
wait for the next value from the |
|
|
block until the |
|
|
acm yielding a |
|
|
explicitly request cancellation of the linked |
|
|
deliver the “first” value; unblocks the |
|
|
wait for the next value sent from the |
|
|
push a value to the |
Fan-out with .subscribe()#
Just like tractor.MsgStream.subscribe() does for IPC
streams, chan.subscribe() lets multiple local trio tasks
each receive every value sent from the single asyncio task:
async with chan.subscribe() as bcast:
async for msg in bcast:
...
The underlying broadcast machinery is lazily allocated on first use and is not reversible for the channel’s remaining lifetime, so only reach for it when you actually want the fan-out.
One-shot calls with run_task()#
When you just want a single asyncio result and no streaming
dialog, skip the channel ceremony and use
tractor.to_asyncio.run_task():
import asyncio
from tractor import to_asyncio
async def aio_fetch(url: str) -> str:
await asyncio.sleep(0.3) # pretend-IO, aio style
return f'<html>sup {url}</html>'
# from any trio task inside the infected actor:
page = await to_asyncio.run_task(aio_fetch, url='https://x.io')
It schedules the fn as an asyncio.Task, waits for completion
and hands the return value back to trio; think of it as the
cross-loop sibling of ActorNursery.run_in_actor(). Errors and
cancellation are translated exactly as for channels.
Cross-loop errors and cancellation#
The paired tasks are SC linked: exception and cancel handling
tears down both sides on any unexpected error or cancellation,
in either loop. There is no fire-and-forget mode; a
LinkedTaskChannel is a supervision scope just like a
Context is across processes.
Because each loop has its own (incompatible) cancellation and exit
machinery, boundary crossings are translated into dedicated
exception types, all importable from tractor.to_asyncio:
exception |
raised in |
meaning |
|---|---|---|
|
the |
the linked |
|
the |
the |
|
the |
the |
|
the |
the |
By default open_channel_from(suppress_graceful_exits=True)
absorbs the two *TaskExited signals so happy-path teardown
stays silent; pass False when your app wants to handle early
peer-exit explicitly.
Past the task pair, everything composes with the normal actor
story: an unhandled asyncio error is translated into the
trio side, propagates out of your @tractor.context
endpoint, and arrives at the parent boxed as
a tractor.RemoteActorError. One SC discipline,
end-to-end, across loops and processes.
Breakpoints in asyncio tasks#
Yes, the multi-actor REPL works here too. With
debug_mode=True enabled on your tree the trio side of an
infected actor can await tractor.pause() as usual, and with
greenback enabled (maybe_enable_greenback=True) even the
builtin breakpoint() works from inside asyncio tasks;
see examples/debugging/asyncio_bp.py for the full tour. The
root-TTY locking dance behind all this is covered in
“Native” multi-process debugging.
Where to next?#
See also
The Context: a cross-actor task pair for the inter-actor handshake and streaming APIs which this whole interop layer mirrors.
Typed messaging for typing the payloads you shuttle between actors (and loops).
“Native” multi-process debugging for the multi-process REPL that keeps working even when your loop has “the trios”.