Skip to main content
IronRDP provides first-class support for async I/O through the ironrdp-async crate, with runtime-specific implementations for Tokio and Futures. This guide covers async patterns, performance optimization, and best practices.

Async vs Blocking

When to Use Async

✅ Use async I/O when:
  • Handling multiple concurrent connections
  • Building servers that need to scale
  • Integrating with existing async codebases
  • Requiring non-blocking I/O for responsiveness

When to Use Blocking

✅ Use blocking I/O when:
  • Building simple single-connection clients
  • Prototyping or testing
  • Working in environments without async runtime
  • Simplicity is more important than concurrency

Architecture

IronRDP’s async architecture uses these crates:
  • ironrdp-async - Runtime-agnostic async abstractions
  • ironrdp-tokio - Tokio-specific implementations
  • ironrdp-futures - Futures-specific implementations

Async Client Example

Here’s a complete async client using Tokio:
1
Set up dependencies
2
[dependencies]
ironrdp = "0.1"
ironrdp-async = "0.1"
ironrdp-tokio = "0.1"
tokio = { version = "1", features = ["full"] }
tokio-rustls = "0.26"
anyhow = "1.0"
tracing = "0.1"
3
Create async connector
4
use ironrdp::connector::{self, Credentials, ClientConnector};
use ironrdp_async::{connect_begin, mark_as_upgraded, connect_finalize};
use ironrdp_tokio::TokioFramed;
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let config = connector::Config {
        credentials: Credentials::UsernamePassword {
            username: "user".to_owned(),
            password: "pass".to_owned(),
        },
        domain: None,
        enable_tls: false,
        enable_credssp: true,
        // ... other config fields
    };

    let tcp_stream = TcpStream::connect(("server.example.com", 3389)).await?;
    let client_addr = tcp_stream.local_addr()?;

    let mut framed = TokioFramed::new(tcp_stream);
    let mut connector = ClientConnector::new(config, client_addr);

    // Begin connection (async)
    let should_upgrade = connect_begin(&mut framed, &mut connector).await?;

    // TLS upgrade (see next step)
    // ...

    Ok(())
}
5
Async TLS upgrade
6
use tokio_rustls::{TlsConnector, rustls};
use std::sync::Arc;

async fn tls_upgrade(
    stream: TcpStream,
    server_name: &str,
) -> anyhow::Result<tokio_rustls::client::TlsStream<TcpStream>> {
    let mut config = rustls::ClientConfig::builder()
        .dangerous()
        .with_custom_certificate_verifier(/* ... */)
        .with_no_client_auth();

    config.resumption = rustls::client::Resumption::disabled();

    let connector = TlsConnector::from(Arc::new(config));
    let domain = rustls::pki_types::ServerName::try_from(server_name)?;

    let tls_stream = connector.connect(domain.to_owned(), stream).await?;

    Ok(tls_stream)
}

// Usage:
let initial_stream = framed.into_inner_no_leftover();
let tls_stream = tls_upgrade(initial_stream, "server.example.com").await?;

let upgraded = mark_as_upgraded(should_upgrade, &mut connector);
let mut upgraded_framed = TokioFramed::new(tls_stream);
7
Finalize connection
8
use sspi::network_client::reqwest_network_client::ReqwestNetworkClient;

let mut network_client = ReqwestNetworkClient;
let connection_result = connect_finalize(
    upgraded,
    connector,
    &mut upgraded_framed,
    &mut network_client,
    "server.example.com".into(),
    server_public_key,
    None,
).await?;

println!("Connected! Desktop size: {}x{}",
    connection_result.desktop_size.width,
    connection_result.desktop_size.height);
9
Process session asynchronously
10
use ironrdp::session::{ActiveStage, ActiveStageOutput};
use ironrdp::session::image::DecodedImage;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

let mut image = DecodedImage::new(
    ironrdp_graphics::image_processing::PixelFormat::RgbA32,
    connection_result.desktop_size.width,
    connection_result.desktop_size.height,
);

let mut active_stage = ActiveStage::new(connection_result);

loop {
    let (action, payload) = upgraded_framed.read_pdu().await?;

    let outputs = active_stage.process(&mut image, action, &payload)?;

    for output in outputs {
        match output {
            ActiveStageOutput::ResponseFrame(frame) => {
                upgraded_framed.write_all(&frame).await?;
            }
            ActiveStageOutput::GraphicsUpdate => {
                // Trigger async redraw
                tokio::spawn(async move {
                    render_image(&image).await;
                });
            }
            ActiveStageOutput::Terminate(_) => return Ok(()),
            _ => {}
        }
    }
}

Async Server Example

The ironrdp-server crate is built on async/await:
use ironrdp::server::{RdpServer, RdpServerInputHandler, RdpServerDisplay};
use tokio::net::TcpListener;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let handler = MyHandler::new();

    let mut server = RdpServer::builder()
        .with_addr("0.0.0.0:3389".parse()?)
        .with_no_security()
        .with_input_handler(handler.clone())
        .with_display_handler(handler)
        .build();

    server.run().await
}
The server automatically handles multiple clients concurrently.

Concurrency Patterns

Multiple Concurrent Connections

use tokio::task::JoinSet;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut join_set = JoinSet::new();

    // Spawn multiple connection tasks
    for i in 0..10 {
        let config = create_config(i);
        join_set.spawn(async move {
            connect_to_server(config).await
        });
    }

    // Wait for all to complete
    while let Some(result) = join_set.join_next().await {
        match result {
            Ok(Ok(_)) => println!("Connection succeeded"),
            Ok(Err(e)) => eprintln!("Connection failed: {}", e),
            Err(e) => eprintln!("Task panicked: {}", e),
        }
    }

    Ok(())
}

Async Channel Communication

use tokio::sync::mpsc;

enum ClientCommand {
    SendInput(InputEvent),
    Disconnect,
}

async fn client_task(
    mut rx: mpsc::Receiver<ClientCommand>,
    mut framed: TokioFramed<TlsStream<TcpStream>>,
) -> anyhow::Result<()> {
    loop {
        tokio::select! {
            // Handle incoming PDUs
            result = framed.read_pdu() => {
                let (action, payload) = result?;
                // Process...
            }

            // Handle commands from other tasks
            Some(cmd) = rx.recv() => {
                match cmd {
                    ClientCommand::SendInput(event) => {
                        // Send input to server
                    }
                    ClientCommand::Disconnect => {
                        return Ok(());
                    }
                }
            }
        }
    }
}

Timeout Handling

use tokio::time::{timeout, Duration};

// Timeout on connection
let connection_result = timeout(
    Duration::from_secs(30),
    connect_begin(&mut framed, &mut connector)
).await??;

// Timeout on read
let read_result = timeout(
    Duration::from_secs(5),
    framed.read_pdu()
).await;

match read_result {
    Ok(Ok((action, payload))) => {
        // Process PDU
    }
    Ok(Err(e)) => {
        eprintln!("Read error: {}", e);
    }
    Err(_) => {
        eprintln!("Read timeout");
    }
}

Graceful Shutdown

use tokio::signal;
use tokio::sync::broadcast;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let (shutdown_tx, _) = broadcast::channel::<()>(1);

    let shutdown_tx_clone = shutdown_tx.clone();
    tokio::spawn(async move {
        signal::ctrl_c().await.expect("failed to listen for Ctrl+C");
        println!("Shutting down...");
        let _ = shutdown_tx_clone.send(());
    });

    let mut shutdown_rx = shutdown_tx.subscribe();

    tokio::select! {
        result = run_client() => {
            result?;
        }
        _ = shutdown_rx.recv() => {
            println!("Received shutdown signal");
        }
    }

    Ok(())
}

Performance Optimization

Buffering Strategy

use tokio::io::BufReader;

// Wrap stream with buffer for better performance
let buffered_stream = BufReader::new(tcp_stream);
let framed = TokioFramed::new(buffered_stream);

Batch Processing

use tokio::time::{interval, Duration};

let mut batch = Vec::new();
let mut interval = interval(Duration::from_millis(16));

loop {
    tokio::select! {
        (action, payload) = framed.read_pdu() => {
            batch.push((action, payload));
        }

        _ = interval.tick() => {
            if !batch.is_empty() {
                process_batch(&mut active_stage, &mut image, &batch)?;
                batch.clear();
            }
        }
    }
}

Connection Pooling

use std::collections::HashMap;
use tokio::sync::RwLock;
use std::sync::Arc;

struct ConnectionPool {
    connections: Arc<RwLock<HashMap<String, Connection>>>,
}

impl ConnectionPool {
    async fn get_or_connect(&self, server: &str) -> anyhow::Result<Connection> {
        // Try to get existing connection
        {
            let connections = self.connections.read().await;
            if let Some(conn) = connections.get(server) {
                return Ok(conn.clone());
            }
        }

        // Create new connection
        let conn = connect_to_server(server).await?;

        // Store for reuse
        {
            let mut connections = self.connections.write().await;
            connections.insert(server.to_owned(), conn.clone());
        }

        Ok(conn)
    }
}

Error Handling

Retry Logic

use tokio::time::{sleep, Duration};

async fn connect_with_retry(
    config: connector::Config,
    server: &str,
    max_retries: u32,
) -> anyhow::Result<ConnectionResult> {
    let mut attempts = 0;

    loop {
        match connect_to_server(config.clone(), server).await {
            Ok(result) => return Ok(result),
            Err(e) if attempts < max_retries => {
                attempts += 1;
                eprintln!("Connection attempt {} failed: {}", attempts, e);
                sleep(Duration::from_secs(2_u64.pow(attempts))).await;
            }
            Err(e) => return Err(e),
        }
    }
}

Async Error Propagation

use anyhow::Context;

async fn connect_and_process() -> anyhow::Result<()> {
    let tcp_stream = TcpStream::connect("server:3389")
        .await
        .context("failed to connect to server")?;

    let connection_result = perform_handshake(tcp_stream)
        .await
        .context("handshake failed")?;

    process_session(connection_result)
        .await
        .context("session processing failed")?;

    Ok(())
}

Runtime Selection

Using Tokio

[dependencies]
ironrdp-tokio = "0.1"
tokio = { version = "1", features = ["rt-multi-thread", "net", "io-util"] }
use ironrdp_tokio::TokioFramed;

#[tokio::main]
async fn main() {
    // Use TokioFramed
}

Using async-std (via Futures)

[dependencies]
ironrdp-futures = "0.1"
async-std = { version = "1", features = ["attributes"] }
use ironrdp_futures::FuturesFramed;

#[async_std::main]
async fn main() {
    // Use FuturesFramed
}

Testing Async Code

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_connection() {
        let config = create_test_config();
        let result = connect_to_server(config).await;
        assert!(result.is_ok());
    }

    #[tokio::test]
    async fn test_timeout() {
        let config = create_test_config();
        let result = timeout(
            Duration::from_millis(100),
            connect_to_slow_server(config)
        ).await;
        assert!(result.is_err()); // Should timeout
    }
}

Best Practices

  1. Use tokio::spawn for independent tasks - Don’t block the async runtime
  2. Avoid blocking operations - Use async alternatives or spawn_blocking
  3. Handle backpressure - Use bounded channels and flow control
  4. Set timeouts - Always timeout long-running operations
  5. Graceful shutdown - Clean up resources properly
  6. Monitor task health - Use supervision strategies for critical tasks

Next Steps

Build docs developers (and LLMs) love