Skip to main content
Apicentric’s digital twin feature transforms it into a multi-protocol IoT simulation engine. Create virtual devices that behave like real sensors, controllers, and industrial equipment using actor-based simulation with realistic physics.

What are digital twins?

A digital twin is a virtual representation of a physical device. In Apicentric, each twin runs as an independent actor with:
  • State management - Variables that track device conditions (temperature, pressure, status)
  • Physics simulation - Realistic behavior using sine waves, noise, or custom scripts
  • Protocol support - MQTT, Modbus TCP, and HTTP communication
  • Actor model - Lightweight concurrent processes for multiple devices

Supported protocols

  • MQTT - Publish telemetry to any MQTT broker
  • Modbus TCP - Act as a Modbus server exposing holding registers

Why use digital twins?

IoT development

Test IoT applications without physical hardware. Simulate sensors, actuators, and controllers.

Industrial automation

Mock PLCs, SCADA systems, and industrial protocols like Modbus.

Testing at scale

Simulate thousands of devices to test system performance and scalability.

Edge computing

Develop edge applications with simulated sensor data before hardware arrives.

Getting started

1

Define a device

Create a YAML file describing your virtual device:
sensor_device.yaml
twin:
  name: "Temperature_Sensor_01"
  
  physics:
    - variable: "temperature"
      strategy: "sine"
      params:
        min: 20.0
        max: 30.0
        frequency: 0.1
    
    - variable: "humidity"
      strategy: "sine"
      params:
        min: 40.0
        max: 60.0
        frequency: 0.05
  
  transports:
    - type: "mqtt"
      broker_url: "localhost"
      port: 1883
      topic_prefix: "sensors/temp"
      client_id: "sensor_01"
      publish_interval_ms: 1000
2

Start an MQTT broker (if needed)

If you don’t have an MQTT broker running:
# Using Docker
docker run -d --name mosquitto -p 1883:1883 eclipse-mosquitto

# Or install locally
brew install mosquitto
mosquitto -p 1883
3

Run the digital twin

Start your simulated device:
apicentric twin run --device sensor_device.yaml
You’ll see output like:
Starting Digital Twin simulation...
Loaded twin definition: Temperature_Sensor_01
MQTT adapter initialized: localhost:1883
Starting simulation loop...
Published to sensors/temp/data: {"temperature":25.3,"humidity":50.1}
Published to sensors/temp/data: {"temperature":25.8,"humidity":50.5}
4

Subscribe to device data

Monitor device telemetry:
# Using mosquitto_sub
mosquitto_sub -h localhost -t "sensors/temp/data"

# Or with MQTT Explorer GUI

Physics strategies

Define how device variables change over time.

Sine wave

Smooth oscillating values, perfect for temperature, pressure, or cyclic patterns:
physics:
  - variable: "pressure"
    strategy: "sine"
    params:
      min: 100.0      # Minimum value
      max: 120.0      # Maximum value
      frequency: 0.1  # Oscillations per second
Generates smooth waves between min and max values.

Scripted behavior

Custom logic using Rhai scripting language:
physics:
  - variable: "temperature"
    strategy: "script"
    params:
      code: |
        if value > 30.0 {
          value - 0.5  // Cool down
        } else if value < 20.0 {
          value + 0.5  // Heat up
        } else {
          value + (random() - 0.5) * 2.0  // Random walk
        }
Access:
  • value - Current variable value
  • random() - Random number between 0 and 1
  • time - Elapsed time in seconds
  • Full Rhai language features (if statements, loops, functions)

Replay from file

Replay recorded telemetry:
physics:
  - variable: "sensor_reading"
    strategy: "replay"
    params:
      file: "recorded_data.csv"
      loop: true
CSV format:
timestamp,value
0,25.3
1,25.8
2,26.1

Protocol adapters

MQTT adapter

Publish device data to MQTT topics:
transports:
  - type: "mqtt"
    broker_url: "mqtt.example.com"
    port: 1883
    topic_prefix: "devices/sensor_01"
    client_id: "sensor_01"
    publish_interval_ms: 1000
    username: "user"          # Optional
    password: "${MQTT_PASSWORD}"  # Use env var
    qos: 1                     # QoS level (0, 1, or 2)
    
    # Optional: Subscribe to command topics
    subscriptions:
      - "devices/sensor_01/commands"
Published message format:
{
  "timestamp": "2026-03-01T10:15:23Z",
  "device_id": "sensor_01",
  "variables": {
    "temperature": 25.3,
    "humidity": 50.1
  }
}

Modbus TCP adapter

Expose device variables as Modbus holding registers:
transports:
  - type: "modbus"
    bind_address: "0.0.0.0"
    port: 502
    unit_id: 1
    
    # Map variables to register addresses
    register_mapping:
      temperature: 0    # Address 0
      humidity: 1       # Address 1
      pressure: 2       # Address 2
Read values with any Modbus client:
# Using modbus-cli
modbus read 127.0.0.1:502 1 0 3  # Read 3 registers starting at 0
# Using pymodbus
from pymodbus.client import ModbusTcpClient

client = ModbusTcpClient('127.0.0.1', port=502)
result = client.read_holding_registers(0, 3, unit=1)
print(result.registers)  # [253, 501, 1050] (scaled values)

Complete examples

Industrial pressure sensor

pressure_sensor.yaml
twin:
  name: "Pressure_Sensor_Industrial_01"
  
  physics:
    - variable: "pressure"
      strategy: "script"
      params:
        code: |
          // Simulate realistic pressure fluctuations
          let base = 100.0;
          let noise = (random() - 0.5) * 2.0;
          let trend = sin(time * 0.01) * 5.0;
          base + trend + noise
    
    - variable: "status"
      strategy: "script"
      params:
        code: |
          // Status code based on pressure
          if pressure > 115.0 {
            2  // High pressure warning
          } else if pressure < 95.0 {
            1  // Low pressure warning
          } else {
            0  // Normal
          }
  
  transports:
    - type: "mqtt"
      broker_url: "broker.hivemq.com"
      port: 1883
      topic_prefix: "industrial/pressure_sensor_01"
      client_id: "pressure_01"
      publish_interval_ms: 500
    
    - type: "modbus"
      bind_address: "0.0.0.0"
      port: 5020
      unit_id: 1
      register_mapping:
        pressure: 0
        status: 1
Run the sensor:
apicentric twin run --device pressure_sensor.yaml

Smart thermostat

thermostat.yaml
twin:
  name: "Thermostat_Living_Room"
  
  physics:
    - variable: "current_temp"
      strategy: "script"
      params:
        code: |
          // Simulate heating/cooling toward target
          let target = target_temp ?? 22.0;
          let diff = target - value;
          value + (diff * 0.1) + (random() - 0.5) * 0.5
    
    - variable: "target_temp"
      strategy: "script"
      params:
        code: |
          // Keep target stable unless commanded
          value ?? 22.0
    
    - variable: "mode"
      strategy: "script"
      params:
        code: |
          // 0=off, 1=heat, 2=cool, 3=auto
          value ?? 3
  
  transports:
    - type: "mqtt"
      broker_url: "localhost"
      port: 1883
      topic_prefix: "home/thermostat/living_room"
      client_id: "thermostat_lr"
      publish_interval_ms: 2000
      subscriptions:
        - "home/thermostat/living_room/set"
Control the thermostat:
# Set target temperature
mosquitto_pub -h localhost -t "home/thermostat/living_room/set" \
  -m '{"target_temp": 24.0}'

# Change mode
mosquitto_pub -h localhost -t "home/thermostat/living_room/set" \
  -m '{"mode": 1}'

Multi-device fleet

Simulate multiple devices with a shell script:
simulate_fleet.sh
#!/bin/bash

# Start 10 temperature sensors
for i in {1..10}; do
  # Generate unique device config
  cat > sensor_$i.yaml <<EOF
twin:
  name: "Sensor_$i"
  physics:
    - variable: "temperature"
      strategy: "sine"
      params:
        min: $((15 + RANDOM % 10))
        max: $((25 + RANDOM % 10))
        frequency: 0.1
  transports:
    - type: "mqtt"
      broker_url: "localhost"
      port: 1883
      topic_prefix: "fleet/sensor_$i"
      client_id: "sensor_$i"
      publish_interval_ms: 1000
EOF

  # Start in background
  apicentric twin run --device sensor_$i.yaml &
done

echo "Started 10 sensor twins"
wait

Advanced features

State persistence

Preserve device state between runs:
twin:
  name: "Stateful_Device"
  state_file: "./device_state.json"  # Save state here
  
  physics:
    - variable: "counter"
      strategy: "script"
      params:
        code: "value + 1"  # Increment each tick
State is saved to device_state.json and restored on restart.

Dynamic variable updates

Modify variables via MQTT commands:
transports:
  - type: "mqtt"
    # ... other config
    subscriptions:
      - "devices/sensor_01/set"
Send commands:
mosquitto_pub -t "devices/sensor_01/set" -m '{"temperature": 28.5}'

Custom update rates

Different variables can update at different rates:
physics:
  - variable: "fast_sensor"
    strategy: "sine"
    params:
      min: 0
      max: 100
    update_interval_ms: 100  # Update every 100ms
  
  - variable: "slow_sensor"
    strategy: "sine"
    params:
      min: 0
      max: 100
    update_interval_ms: 5000  # Update every 5 seconds

Commands reference

Run a digital twin

apicentric twin run --device <DEVICE.yaml>
Start a simulated device based on its configuration. Options:
  • --device <FILE> - Path to device definition YAML (required)
  • --duration <SECONDS> - Run for specified duration then stop (optional)
  • --log-level <LEVEL> - Set logging verbosity: error, warn, info, debug (default: info)

Integration examples

Node-RED flow

Connect digital twins to Node-RED:
  1. Add MQTT input node
  2. Configure broker: localhost:1883
  3. Subscribe to: sensors/+/data
  4. Add function node to process data
  5. Connect to dashboard or database

Python data processing

process_telemetry.py
import paho.mqtt.client as mqtt
import json

def on_message(client, userdata, message):
    data = json.loads(message.payload)
    temp = data['variables']['temperature']
    
    if temp > 28:
        print(f"Alert: High temperature {temp}°C")

client = mqtt.Client()
client.on_message = on_message
client.connect("localhost", 1883)
client.subscribe("sensors/+/data")
client.loop_forever()

InfluxDB logging

log_to_influx.py
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient, Point
import json

influx = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = influx.write_api()

def on_message(client, userdata, message):
    data = json.loads(message.payload)
    
    point = Point("sensor_data") \
        .tag("device", data['device_id']) \
        .field("temperature", data['variables']['temperature']) \
        .field("humidity", data['variables']['humidity'])
    
    write_api.write(bucket="iot", record=point)

client = mqtt.Client()
client.on_message = on_message
client.connect("localhost", 1883)
client.subscribe("sensors/+/data")
client.loop_forever()

Tips and best practices

Start with simple sine wave physics to verify connectivity, then add complex scripted behavior.
Use environment variables for sensitive data like MQTT passwords: password: "${MQTT_PASSWORD}"
Modbus registers store values as 16-bit integers. Floating-point values are scaled (multiply by 10 or 100) before storing.
Running many digital twins simultaneously can be resource-intensive. Monitor CPU and memory usage when simulating large fleets.

Next steps

  • Use the API simulator to create HTTP interfaces for device management
  • Monitor twins with the TUI for real-time visibility
  • Integrate MCP to let AI create device definitions
  • Export device data schemas using code generation

Build docs developers (and LLMs) love