Transactions are received as part of Event messages. Use engine.recv_message() to process incoming messages:
use tashi_vertex::Message;use std::str::from_utf8;while let Some(message) = engine.recv_message().await? { match message { Message::Event(event) => { // Check if event contains transactions if event.transaction_count() > 0 { println!("Received {} transactions", event.transaction_count()); // Process each transaction for tx in event.transactions() { // tx is a &[u8] slice process_transaction(tx)?; } } } Message::SyncPoint(_) => { // Handle synchronization point println!("Sync point reached"); } }}
Each event provides metadata about the transactions it contains:
Message::Event(event) => { // Event creator (node that created the event) println!("Creator: {}", event.creator()); // When the event was created (timestamp) println!("Created at: {}", event.created_at()); // When consensus was reached (timestamp) println!("Consensus at: {}", event.consensus_at()); // Number of transactions in this event println!("Transaction count: {}", event.transaction_count()); // Consensus order guarantees println!("All nodes see this event at the same position");}
The consensus timestamp (consensus_at()) is guaranteed to be the same across all nodes for a given event, ensuring deterministic ordering.
Here’s a complete example that sends and receives transactions:
use std::str::from_utf8;use tashi_vertex::{ Context, Engine, KeySecret, Message, Options, Peers, Socket, Transaction,};#[tokio::main]async fn main() -> anyhow::Result<()> { // Setup (abbreviated - see pingback example for full details) let context = Context::new()?; let socket = Socket::bind(&context, "127.0.0.1:8001").await?; let key = KeySecret::generate(); let peers = Peers::with_capacity(1)?; let options = Options::default(); let engine = Engine::start(&context, socket, options, &key, peers)?; // Send transactions send_message(&engine, "Hello")?; send_message(&engine, "World")?; // Receive and process transactions while let Some(message) = engine.recv_message().await? { if let Message::Event(event) = message { for tx in event.transactions() { let text = from_utf8(tx)?; println!("Received: {}", text); } } } Ok(())}fn send_message(engine: &Engine, message: &str) -> tashi_vertex::Result<()> { let mut transaction = Transaction::allocate(message.len()); transaction.copy_from_slice(message.as_bytes()); engine.send_transaction(transaction)}