Skip to main content
The AsyncSession class provides the same functionality as Session but with an async API for use in async Python code.

Constructor

AsyncSession(notebook_id=None)
notebook_id
str
Unique identifier for this session. If not provided, a random UUID is generated. Multiple AsyncSession objects with the same notebook_id will share the same kernel.
session = runtimed.AsyncSession()
print(session.notebook_id)  # "agent-session-a1b2c3d4-..."

Properties

notebook_id
str
Unique identifier for this notebook session (synchronous property).
Unlike Session, property accessors in AsyncSession are async methods that return coroutines:
# These are coroutines, not properties
connected = await session.is_connected()     # bool
kernel_running = await session.kernel_started()  # bool
env = await session.env_source()             # str | None

# Only notebook_id is a sync property
notebook_id = session.notebook_id  # str

is_connected()

Check if connected to the daemon.
connected = await session.is_connected()
return
bool
True if connected to daemon.

kernel_started()

Check if a kernel is running.
running = await session.kernel_started()
return
bool
True if kernel is running.

env_source()

Get environment source if kernel is running.
env = await session.env_source()
return
str | None
Environment source (e.g., "uv:prewarmed") or None if no kernel.

Connection Methods

connect()

Connect to the daemon.
await session.connect()
This is called automatically by start_kernel() if not already connected. Respects the RUNTIMED_SOCKET_PATH environment variable if set.

start_kernel()

Start a kernel for this session.
await session.start_kernel(
    kernel_type="python",
    env_source="auto"
)
kernel_type
str
default:"python"
Type of kernel: "python" or "deno".
env_source
str
default:"auto"
Environment source. Options:
  • "auto": Auto-detect from inline deps or project files
  • "uv:prewarmed": Use prewarmed UV environment
  • "conda:prewarmed": Use prewarmed Conda environment
  • For Deno kernels, this is ignored
await session.start_kernel()  # Uses prewarmed Python environment
If a kernel is already running for this session’s notebook_id, this returns immediately without starting a new one.

shutdown_kernel()

Shutdown the kernel.
await session.shutdown_kernel()

interrupt()

Interrupt the currently executing cell.
await session.interrupt()

Document Operations

create_cell()

Create a new cell in the automerge document.
cell_id = await session.create_cell(
    source="",
    cell_type="code",
    index=None
)
source
str
default:""
The cell source code.
cell_type
str
default:"code"
Cell type: "code", "markdown", or "raw".
index
int | None
Position to insert the cell. If not provided, appends at end.
return
str
The cell ID (e.g., "cell-a1b2c3d4-...")
cell_id = await session.create_cell("print('hello')")
print(cell_id)  # "cell-a1b2c3d4-..."

set_source()

Update a cell’s source in the automerge document.
await session.set_source(cell_id, source)
cell_id
str
required
The cell ID.
source
str
required
The new source code.
await session.set_source(cell_id, "print('updated')")

get_cell()

Get a cell from the automerge document.
cell = await session.get_cell(cell_id)
cell_id
str
required
The cell ID.
return
Cell
Cell object with id, cell_type, source, and execution_count properties.
cell = await session.get_cell(cell_id)
print(cell.source)           # "print('hello')"
print(cell.execution_count)  # 1 or None
Raises: RuntimedError if cell not found.

get_cells()

Get all cells from the automerge document.
cells = await session.get_cells()
return
List[Cell]
List of Cell objects.
cells = await session.get_cells()
for cell in cells:
    print(f"{cell.id}: {cell.source[:30]}...")

delete_cell()

Delete a cell from the automerge document.
await session.delete_cell(cell_id)
cell_id
str
required
The cell ID to delete.
await session.delete_cell(cell_id)

Execution Methods

execute_cell()

Execute a cell by ID.
result = await session.execute_cell(
    cell_id,
    timeout_secs=60.0
)
cell_id
str
required
The cell ID to execute.
timeout_secs
float
Maximum time to wait for execution.
return
ExecutionResult
Result with outputs, success status, and execution count.
cell_id = await session.create_cell("x = 42")
result = await session.execute_cell(cell_id)
print(result.success)         # True
print(result.execution_count) # 1
The daemon reads the cell’s source from the automerge document and executes it. This ensures all clients see the same code being executed.If a kernel isn’t running yet, this will start one automatically.
Raises: RuntimedError if not connected, cell not found, or timeout.

run()

Convenience method: create a cell, execute it, and return the result.
result = await session.run(
    code,
    timeout_secs=60.0
)
code
str
required
The code to execute.
timeout_secs
float
Maximum time to wait for execution.
return
ExecutionResult
Result with outputs, success status, and execution count.
result = await session.run("print('hello')")
print(result.stdout)  # "hello\n"
print(result.success) # True
This is a shortcut that combines create_cell() and execute_cell(). The cell is written to the automerge document before execution, so other connected clients will see it.

queue_cell()

Queue a cell for execution without waiting for the result.
await session.queue_cell(cell_id)
cell_id
str
required
The cell ID to execute.
await session.queue_cell(cell_id)
# Execution happens in background
# Poll for results later:
cell = await session.get_cell(cell_id)
Raises: RuntimedError if not connected or cell not found.

Async Context Manager

AsyncSession works as an async context manager for automatic cleanup:
async with runtimed.AsyncSession() as session:
    await session.start_kernel()
    cell_id = await session.create_cell("1 + 1")
    result = await session.execute_cell(cell_id)
# Kernel automatically shut down on exit

Complete Example

import asyncio
import runtimed

async def main():
    # Create a session with async context manager
    async with runtimed.AsyncSession(notebook_id="my-notebook") as session:
        # Start Python kernel
        await session.start_kernel()
        env = await session.env_source()
        print(f"Environment: {env}")  # "uv:prewarmed"
        
        # Create and execute cells
        cell1 = await session.create_cell("x = 42")
        result1 = await session.execute_cell(cell1)
        print(f"Success: {result1.success}")  # True
        
        cell2 = await session.create_cell("print(x)")
        result2 = await session.execute_cell(cell2)
        print(f"Output: {result2.stdout}")  # "42\n"
        
        # List all cells
        cells = await session.get_cells()
        print(f"Total cells: {len(cells)}")  # 2
        
        # Update and re-execute
        await session.set_source(cell1, "x = 100")
        await session.execute_cell(cell1)
        result3 = await session.execute_cell(cell2)
        print(f"Updated output: {result3.stdout}")  # "100\n"

asyncio.run(main())

Parallel Execution

Use asyncio.gather() for parallel operations:
import asyncio
import runtimed

async def main():
    async with runtimed.AsyncSession() as session:
        await session.start_kernel()
        
        # Create multiple cells in parallel
        cells = await asyncio.gather(
            session.create_cell("import math"),
            session.create_cell("import pandas as pd"),
            session.create_cell("import numpy as np")
        )
        
        # Execute them in parallel
        results = await asyncio.gather(*[
            session.execute_cell(cell_id) for cell_id in cells
        ])
        
        print(f"Executed {len(results)} cells")

asyncio.run(main())

See Also

Session

Synchronous version of AsyncSession

ExecutionResult

Understanding execution results

Build docs developers (and LLMs) love