Skip to main content

System Architecture

The Invernaderos API is built with a modern, event-driven architecture optimized for high-throughput IoT sensor data ingestion, storage, and real-time distribution.

High-Level Architecture

Core Components

1. MQTT Message Broker (EMQX)

Role: Central hub for IoT device communication

Specifications

  • Image: emqx/emqx:latest
  • Ports: 1883 (MQTT), 8083 (WebSocket), 18083 (Dashboard)
  • Protocol: MQTT v3.1.1 / v5.0
  • QoS Levels: 0 (sensors), 1 (actuators), 2 (alerts)

Features

  • High-throughput message routing
  • WebSocket MQTT support
  • Authentication & ACL
  • Management dashboard
Topic Structure:
GREENHOUSE/{tenantId}           # Multi-tenant sensor data
GREENHOUSE                      # Legacy (maps to DEFAULT tenant)
GREENHOUSE/RESPONSE             # Echo responses for verification
greenhouse/+/actuators/#        # Actuator commands
greenhouse/+/actuators/status   # Actuator status updates
greenhouse/+/alerts/#           # Alert notifications
system/events/#                 # System events
Multi-Tenant Routing: Topic format GREENHOUSE/{tenantId} enables automatic tenant isolation. The API extracts tenantId from the topic path and associates all data with that tenant.

2. Spring Boot API (Core Application)

Technology Stack:

Framework

Spring Boot 3.5.7
  • Spring Framework 6.x
  • Jakarta EE 10+
  • Java 21 LTS support

Language

Kotlin 2.2.21
  • K2 compiler
  • Null-safety
  • Coroutines-ready

Runtime

Java 21 LTS
  • Virtual threads
  • Pattern matching
  • Records
Key Libraries:
build.gradle.kts
dependencies {
    // Core Spring Boot
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.boot:spring-boot-starter-data-jpa")
    implementation("org.springframework.boot:spring-boot-starter-security")
    implementation("org.springframework.boot:spring-boot-starter-actuator")
    
    // MQTT Integration
    implementation("org.springframework.integration:spring-integration-mqtt:6.5.3")
    implementation("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5")
    
    // WebSocket
    implementation("org.springframework.boot:spring-boot-starter-websocket")
    implementation("org.springframework.integration:spring-integration-stomp")
    
    // Redis Cache
    implementation("org.springframework.boot:spring-boot-starter-data-redis")
    implementation("org.apache.commons:commons-pool2")
    
    // Database
    implementation("org.postgresql:postgresql")
    implementation("org.flywaydb:flyway-core")
    implementation("org.flywaydb:flyway-database-postgresql")
    
    // JWT Authentication
    implementation("io.jsonwebtoken:jjwt-api:0.12.5")
    runtimeOnly("io.jsonwebtoken:jjwt-impl:0.12.5")
    
    // OpenAPI Documentation
    implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:2.8.14")
}

3. Dual Database Strategy

The system uses two specialized databases for different data types:
Purpose: Store sensor readings with time-series optimizationsSpecifications:
  • Image: timescale/timescaledb:latest-pg16
  • Port: 5432
  • Schema: iot
  • Hypertable: sensor_readings
  • Chunk Interval: 7 days
  • Compression: Enabled after 7 days (saves ~90% storage)
  • Retention: 2 years automatic cleanup
Schema:
CREATE TABLE iot.sensor_readings (
    time TIMESTAMPTZ NOT NULL,
    sensor_id VARCHAR(50) NOT NULL,
    greenhouse_id BIGINT NOT NULL,
    tenant_id BIGINT,
    sensor_type VARCHAR(30) NOT NULL,
    value DOUBLE PRECISION NOT NULL,
    unit VARCHAR(20),
    PRIMARY KEY (time, sensor_id)
);

-- Convert to hypertable
SELECT create_hypertable('iot.sensor_readings', 'time', 
    chunk_time_interval => INTERVAL '7 days');

-- Configure compression
ALTER TABLE iot.sensor_readings SET (
  timescaledb.compress,
  timescaledb.compress_segmentby = 'sensor_id, greenhouse_id',
  timescaledb.compress_orderby = 'time DESC'
);

-- Add retention policy
SELECT add_retention_policy('iot.sensor_readings', 
    INTERVAL '2 years');
Indexes:
CREATE INDEX idx_sensor_readings_greenhouse_id 
    ON iot.sensor_readings(greenhouse_id, time DESC);

CREATE INDEX idx_sensor_readings_tenant_time 
    ON iot.sensor_readings(tenant_id, time DESC) 
    WHERE tenant_id IS NOT NULL;

CREATE INDEX idx_sensor_readings_greenhouse_sensor_time 
    ON iot.sensor_readings(greenhouse_id, sensor_id, time DESC);
Continuous Aggregates:
-- Hourly aggregates (refreshed every 1 hour)
CREATE MATERIALIZED VIEW iot.sensor_readings_hourly
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', time) AS bucket,
    sensor_id,
    greenhouse_id,
    tenant_id,
    AVG(value) AS avg_value,
    MIN(value) AS min_value,
    MAX(value) AS max_value,
    STDDEV(value) AS stddev_value,
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY value) AS median,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) AS p95,
    COUNT(*) AS count
FROM iot.sensor_readings
GROUP BY bucket, sensor_id, greenhouse_id, tenant_id;

-- Daily tenant-level aggregates
CREATE MATERIALIZED VIEW iot.sensor_readings_daily_by_tenant
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 day', time) AS day,
    tenant_id,
    COUNT(*) AS total_readings,
    COUNT(DISTINCT sensor_id) AS unique_sensors,
    COUNT(DISTINCT greenhouse_id) AS unique_greenhouses
FROM iot.sensor_readings
WHERE tenant_id IS NOT NULL
GROUP BY day, tenant_id;
Performance:
  • Queries on compressed data: 10-100x faster
  • Storage reduction: ~90% with compression
  • Continuous aggregates: 60x faster than raw queries

4. Redis Cache (High-Speed Access)

Purpose: Cache last 1000 messages for instant retrieval

Configuration

  • Image: redis:7-alpine
  • Port: 6379
  • Client: Lettuce (async, reactive)
  • Pool: max-active=100, max-idle=50, min-idle=10

Data Structure

  • Type: Sorted Set (ZSET)
  • Key: greenhouse:messages:{tenantId}
  • Score: timestamp (milliseconds since epoch)
  • Max Size: 1000 messages (auto-trimmed)
  • TTL: 24 hours
Operations (with time complexity):
// Cache a message - O(log N)
fun cacheMessage(data: RealDataDto) {
    val key = "greenhouse:messages:${data.tenantId ?: "DEFAULT"}"
    val score = data.timestamp.toEpochMilli().toDouble()
    val json = objectMapper.writeValueAsString(data)
    
    redisTemplate.opsForZSet().add(key, json, score)
    redisTemplate.opsForZSet().removeRange(key, 0, -1001)  // Keep last 1000
    redisTemplate.expire(key, 24, TimeUnit.HOURS)
}

// Get recent messages - O(log N + M) where M = limit
fun getRecentMessages(tenantId: String?, limit: Int): List<RealDataDto> {
    val key = "greenhouse:messages:${tenantId ?: "DEFAULT"}"
    val json = redisTemplate.opsForZSet()
        .reverseRange(key, 0, limit - 1L)
    return json?.mapNotNull { parseJson(it) } ?: emptyList()
}

// Get messages by time range - O(log N + M)
fun getMessagesByTimeRange(tenantId: String?, start: Instant, end: Instant): List<RealDataDto> {
    val key = "greenhouse:messages:${tenantId ?: "DEFAULT"}"
    val startScore = start.toEpochMilli().toDouble()
    val endScore = end.toEpochMilli().toDouble()
    
    val json = redisTemplate.opsForZSet()
        .reverseRangeByScore(key, startScore, endScore)
    return json?.mapNotNull { parseJson(it) } ?: emptyList()
}

// Get latest message - O(log N)
fun getLatestMessage(tenantId: String?): RealDataDto? {
    val key = "greenhouse:messages:${tenantId ?: "DEFAULT"}"
    val json = redisTemplate.opsForZSet()
        .reverseRange(key, 0, 0)?.firstOrNull()
    return json?.let { parseJson(it) }
}
Performance Characteristics:
  • Response time: 1-5ms for cached queries
  • Memory-efficient: Compressed JSON strings
  • Automatic eviction: Oldest messages removed when > 1000
  • Self-renewing TTL: 24-hour expiration refreshed on each write

Data Flow Pipeline

Message Processing Flow

Step-by-Step Processing

1

MQTT Message Reception

Component: GreenhouseDataListener.kt
@ServiceActivator(inputChannel = "mqttInputChannel")
fun handleGreenhouseData(message: Message<*>) {
    val topic = message.headers[MqttHeaders.RECEIVED_TOPIC] as String
    val payload = String(message.payload as ByteArray)
    
    // Extract tenantId from topic path
    val tenantId = when {
        topic.startsWith("GREENHOUSE/") -> 
            topic.substringAfter("GREENHOUSE/").takeWhile { it != '/' }
        topic == "GREENHOUSE" -> "DEFAULT"  // Legacy compatibility
        else -> "UNKNOWN"
    }
    
    mqttMessageProcessor.processGreenhouseData(payload, tenantId)
}
2

JSON Parsing

Component: MqttMessageProcessor.kt
val messageDto = payload.toRealDataDto(
    timestamp = Instant.now(), 
    tenantId = tenantId
)
Parses JSON with 22 numeric fields:
  • 6 fields: Temperature and humidity for 3 greenhouses
  • 12 fields: Sectors (4 per greenhouse)
  • 3 fields: Extractors (1 per greenhouse)
  • 1 field: Reserve
3

Redis Caching

cacheService.cacheMessage(messageDto)
Stores in Sorted Set with timestamp as score. Automatically trims to last 1000 messages.
4

Data Transformation

val readings = mutableListOf<SensorReading>()
val jsonMap = objectMapper.readValue<Map<String, Any?>>(payload)

jsonMap.forEach { (key, value) ->
    if (value is Number) {
        readings.add(SensorReading(
            time = messageDto.timestamp,
            sensorId = key,
            greenhouseId = greenhouseId,
            tenantId = tenantId,
            sensorType = determineSensorType(key),
            value = value.toDouble(),
            unit = determineUnit(key)
        ))
    }
}
Result: 1 JSON payload → 22 SensorReading entities
5

TimescaleDB Persistence

@Transactional("timescaleTransactionManager")
repository.saveAll(readings)  // Batch INSERT (1 query)
Uses batch insert for performance: 22 rows inserted in 1 query instead of 22 separate queries.
6

Event Publishing

publisher.publishEvent(
    GreenhouseMessageEvent(source = this, message = messageDto)
)
Spring ApplicationEvent decouples MQTT processing from WebSocket broadcasting.
7

WebSocket Broadcasting

Component: GreenhouseWebSocketHandler.kt
@EventListener
fun handleGreenhouseMessage(event: GreenhouseMessageEvent) {
    messagingTemplate.convertAndSend(
        "/topic/greenhouse/messages",
        event.message  // RealDataDto (22 fields)
    )
}
Broadcasts to all WebSocket clients subscribed to /topic/greenhouse/messages.
8

MQTT Echo (Optional)

mqttPublishService.publishToResponseTopic(messageDto.toJson())
Echoes message to GREENHOUSE/RESPONSE topic for hardware verification.

API Endpoints

Authentication Endpoints

curl -X POST http://localhost:8080/api/v1/auth/login \
  -H "Content-Type: application/json" \
  -d '{
    "username": "[email protected]",
    "password": "your_password"
  }'

# Response
{
  "token": "eyJhbGciOiJIUzI1NiIs...",
  "type": "Bearer",
  "expiresIn": 86400
}

Greenhouse Data Endpoints

# Get last 100 messages for tenant SARA
curl -H "Authorization: Bearer <token>" \
  "http://localhost:8080/api/v1/greenhouse/messages/recent?tenantId=SARA&limit=100"

# Response
[
  {
    "timestamp": "2025-03-03T21:30:00Z",
    "TEMPERATURA INVERNADERO 01": 24.5,
    "HUMEDAD INVERNADERO 01": 65.3,
    "TEMPERATURA INVERNADERO 02": 23.8,
    "HUMEDAD INVERNADERO 02": 68.2,
    "TEMPERATURA INVERNADERO 03": 25.1,
    "HUMEDAD INVERNADERO 03": 64.7,
    "INVERNADERO_01_SECTOR_01": 1.0,
    "INVERNADERO_01_SECTOR_02": 0.0,
    // ... 14 more fields
    "greenhouseId": null,
    "tenantId": "SARA"
  }
]

Sensor Reading Endpoints

# Get latest sensor readings
curl -H "Authorization: Bearer <token>" \
  "http://localhost:8080/api/v1/sensors/latest?greenhouseId=001&limit=10"

WebSocket Real-Time Streaming

Connection Setup

<script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/@stomp/stompjs@7/bundles/stomp.umd.min.js"></script>

<script>
// Create WebSocket connection
const socket = new SockJS('http://localhost:8080/ws/greenhouse');
const stompClient = Stomp.over(socket);

// Connect to the WebSocket
stompClient.connect({}, function(frame) {
  console.log('Connected: ' + frame);
  
  // Subscribe to greenhouse messages
  stompClient.subscribe('/topic/greenhouse/messages', function(message) {
    const data = JSON.parse(message.body);
    console.log('New sensor reading:', data);
    
    // Update your UI here
    updateDashboard(data);
  });
});

function updateDashboard(data) {
  document.getElementById('temp01').innerText = data['TEMPERATURA INVERNADERO 01'] + '°C';
  document.getElementById('humidity01').innerText = data['HUMEDAD INVERNADERO 01'] + '%';
  // ... update other fields
}
</script>

WebSocket Topics

TopicDescriptionMessage Type
/topic/greenhouse/messagesReal-time sensor readingsRealDataDto (22 fields)
/topic/greenhouse/statisticsAggregated statisticsGreenhouseSummaryDto
/topic/alertsSystem alertsAlertDto
Broadcasting Frequency: Messages are broadcast as they arrive via MQTT (typically every 5-10 seconds per greenhouse).

Security Architecture

JWT Authentication

1

User Login

Client sends username + password to /api/v1/auth/login
2

Token Generation

Server validates credentials and generates JWT token:
val token = Jwts.builder()
    .setSubject(user.username)
    .claim("tenantId", user.tenantId)
    .claim("role", user.role)
    .setIssuedAt(Date())
    .setExpiration(Date(System.currentTimeMillis() + 86400000)) // 24h
    .signWith(secretKey, SignatureAlgorithm.HS256)
    .compact()
3

Client Stores Token

Client stores token in localStorage or secure storage
4

Authenticated Requests

Client includes token in Authorization header:
Authorization: Bearer eyJhbGciOiJIUzI1NiIs...
5

Token Validation

JwtAuthenticationFilter intercepts requests and validates token:
val claims = Jwts.parserBuilder()
    .setSigningKey(secretKey)
    .build()
    .parseClaimsJws(token)
    .body

val username = claims.subject
val tenantId = claims["tenantId"] as String

Multi-Tenant Data Isolation

All queries automatically filter by tenant:
// Retrieve authenticated user's tenantId from JWT
val tenantId = SecurityContextHolder.getContext()
    .authentication.principal as UserDetails

// Query with tenant filter
val greenhouses = greenhouseRepository
    .findByTenantId(tenantId)
Important: Existing sensor data has NULL tenant_id and requires manual migration. See Migration Guide for steps to populate tenant associations.

Performance Optimization

Query Performance

Redis Cache

1-5ms response timeFor recent data (last 1000 messages)

TimescaleDB

10-50ms response timeFor historical queries with indexes

Continuous Aggregates

60x faster than raw queriesPre-computed hourly/daily stats

Batch Inserts

22x faster than individual insertsSingle query for 22 rows

Scaling Strategies

1

Horizontal API Scaling

Deploy multiple API instances behind load balancer:
kubernetes
apiVersion: apps/v1
kind: Deployment
metadata:
  name: invernaderos-api
spec:
  replicas: 3  # Run 3 instances
  selector:
    matchLabels:
      app: invernaderos-api
  template:
    # ... pod spec
2

Redis Cluster (Future)

Migrate from single Redis instance to Redis Cluster for high availability
3

TimescaleDB Replication

Configure TimescaleDB streaming replication for read replicas
4

MQTT Broker Clustering

EMQX Enterprise supports clustering for high availability

Monitoring & Observability

Health Checks

# Health check
curl http://localhost:8080/actuator/health

# Response
{
  "status": "UP",
  "components": {
    "db": {"status": "UP"},
    "redis": {"status": "UP"},
    "diskSpace": {"status": "UP"}
  }
}

Logging

application.yaml
logging:
  level:
    root: INFO
    com.apptolast.invernaderos: DEBUG
    org.springframework.integration.mqtt: DEBUG
  pattern:
    console: "%d{yyyy-MM-dd HH:mm:ss} %5p [%15.15t] %-40.40logger : %m%n"
View logs:
docker-compose logs -f api

Next Steps

API Reference

Explore all REST endpoints

WebSocket Guide

Connect to real-time stream

Deployment

Deploy to production

Architecture designed for high throughput • low latency • horizontal scaling • multi-tenancy • real-time streaming

Build docs developers (and LLMs) love