Skip to main content

Overview

The Invernaderos API uses Spring Integration MQTT with Eclipse Paho client to receive real-time sensor data from greenhouse hardware. The system supports multi-tenant topic routing, automatic reconnection, and bidirectional message verification.
MQTT Broker: EMQX (open-source MQTT broker)Protocol: MQTT 3.1.1 over WebSocket Secure (WSS)QoS Levels: Configurable per topic (default QoS 0)

Configuration

Connection Settings

The MQTT client is configured via MqttConfig.kt using Spring Integration:
MqttConfig.kt
@Configuration
class MqttConfig(
    @param:Value("\${spring.mqtt.broker.url}")
    private val brokerUrl: String,  // e.g., wss://mqtt.example.com:8084/mqtt

    @param:Value("\${spring.mqtt.username}")
    private val username: String,

    @param:Value("\${spring.mqtt.password}")
    private val password: String,

    @param:Value("\${spring.mqtt.client.id-prefix:greenhouse-api}")
    private val clientIdPrefix: String,

    @param:Value("\${spring.mqtt.client.clean-session:false}")
    private val cleanSession: Boolean,

    @param:Value("\${spring.mqtt.client.connection-timeout:10}")
    private val connectionTimeout: Int,

    @param:Value("\${spring.mqtt.client.keep-alive-interval:60}")
    private val keepAliveInterval: Int,

    @param:Value("\${spring.mqtt.client.automatic-reconnect:true}")
    private val automaticReconnect: Boolean,

    // ... listener dependencies
) {
    /**
     * Factory for creating MQTT clients configured per official documentation
     * https://docs.spring.io/spring-integration/reference/mqtt.html
     */
    @Bean
    fun mqttClientFactory(): MqttPahoClientFactory {
        val factory = DefaultMqttPahoClientFactory()
        val options = MqttConnectOptions()

        // Configure connection options
        options.serverURIs = arrayOf(brokerUrl)
        options.userName = username
        options.password = password.toCharArray()
        options.isCleanSession = cleanSession
        options.connectionTimeout = connectionTimeout
        options.keepAliveInterval = keepAliveInterval
        options.isAutomaticReconnect = automaticReconnect

        factory.connectionOptions = options
        return factory
    }
}

Application Properties

Configure MQTT connection in application.yaml:
application.yaml
spring:
  mqtt:
    broker:
      url: ${MQTT_BROKER_URL:wss://mqtt.invernaderos.com:8084/mqtt}
    username: ${MQTT_USERNAME:api-client}
    password: ${MQTT_PASSWORD}
    client:
      id-prefix: greenhouse-api
      clean-session: false  # Persist session state
      connection-timeout: 10  # seconds
      keep-alive-interval: 60  # seconds
      automatic-reconnect: true
    topics:
      greenhouse-multi-tenant: GREENHOUSE/+  # Wildcard for all tenants
      sensors-pattern: greenhouse/+/sensors/#
      actuators-pattern: greenhouse/+/actuators/#
      system-events: system/events/#
    qos:
      default: 0  # At most once (fire and forget)
  • brokerUrl: WebSocket Secure (WSS) URL for MQTT broker
  • cleanSession: false = persist subscriptions across restarts
  • connectionTimeout: Max seconds to wait for connection
  • keepAliveInterval: Ping interval to keep connection alive
  • automaticReconnect: Auto-reconnect on connection loss
  • id-prefix: Used to generate unique client IDs ({prefix}-inbound-{UUID})

Topic Structure

Multi-Tenant Topics

The system supports dynamic multi-tenant topic routing:
Pattern: GREENHOUSE/{tenantId}

Examples:
- GREENHOUSE/SARA     → Vivero Sara greenhouse company
- GREENHOUSE/001      → Generic tenant ID
- GREENHOUSE/NARANJOS → Los Naranjos farm

Payload Format:
{
  "TEMPERATURA INVERNADERO 01": 24.5,
  "HUMEDAD INVERNADERO 01": 62.3,
  "INVERNADERO_01_SECTOR_01": 1,
  "INVERNADERO_01_EXTRACTOR": 0
}

Topic Subscription

The MQTT adapter subscribes to multiple topics using wildcards:
MqttConfig.kt
@Bean
fun mqttInbound(): MessageProducer {
    // Generate a unique client ID for the inbound adapter
    val clientId = "$clientIdPrefix-inbound-${UUID.randomUUID()}"

    // Configure topics to subscribe
    val topics = arrayOf(
        "GREENHOUSE",                    // Legacy topic (backward compatibility)
        greenhouseMultiTenantPattern,    // Multi-tenant: GREENHOUSE/+ (wildcard)
        sensorsTopicPattern,             // greenhouse/+/sensors/#
        actuatorsTopicPattern,           // greenhouse/+/actuators/#
        systemEventsTopicPattern         // system/events/#
    )

    // Create the adapter with client ID, factory, and topics
    val adapter = MqttPahoMessageDrivenChannelAdapter(
        clientId,
        mqttClientFactory(),
        *topics
    )

    // Configure the adapter
    adapter.setCompletionTimeout(5000)
    adapter.setConverter(DefaultPahoMessageConverter())
    adapter.setQos(defaultQos)  // QoS 0 by default
    adapter.outputChannel = mqttInputChannel()

    return adapter
}
Wildcard Patterns:
  • + = Single-level wildcard (e.g., GREENHOUSE/+ matches GREENHOUSE/SARA)
  • # = Multi-level wildcard (e.g., sensors/# matches sensors/TEMP_01/value)

Message Processing

Inbound Message Handler

Messages are routed to appropriate listeners based on topic:
MqttConfig.kt
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
fun mqttMessageHandler(): MessageHandler {
    return MessageHandler { message: Message<*> ->
        try {
            // Extract message information FIRST (before processing)
            val topic = message.headers[MqttHeaders.RECEIVED_TOPIC] as? String ?: ""
            val payload = message.payload as String
            val qos = message.headers[MqttHeaders.RECEIVED_QOS] as? Int

            // Logging for debugging
            logger.debug("MQTT message received - Topic: {}, QoS: {}, Payload: {}",
                topic, qos, payload)

            // Route message to appropriate listener
            when {
                // Legacy topic (backward compatibility)
                topic == "GREENHOUSE" -> {
                    logger.debug("Processing legacy GREENHOUSE topic")
                    greenhouseDataListener.handleGreenhouseData(message)
                }

                // Multi-tenant topic: GREENHOUSE/empresaID
                topic.startsWith("GREENHOUSE/") && topic.split("/").size == 2 -> {
                    val tenantId = topic.substringAfter("GREENHOUSE/")
                    logger.debug("Processing multi-tenant topic for tenant: {}", tenantId)
                    greenhouseDataListener.handleGreenhouseData(message)
                }

                topic.contains("/sensors/") -> sensorDataListener.handleSensorData(message)
                topic.contains("/actuators/status") -> actuatorStatusListener.handleActuatorStatus(message)
                topic.contains("/alerts/") -> logger.info("Alert received on topic: {}", topic)
                else -> logger.warn("Unhandled topic: {}", topic)
            }

        } catch (e: Exception) {
            logger.error("Error processing MQTT message: {}", e.message, e)
            throw e  // Re-throw so error channel handles it
        }
    }
}

JSON Payload Format

Greenhouse messages use a hybrid JSON format with mixed key styles:
{
  "TEMPERATURA INVERNADERO 01": 24.5,
  "HUMEDAD INVERNADERO 01": 62.3,
  "TEMPERATURA INVERNADERO 02": 23.8,
  "HUMEDAD INVERNADERO 02": 65.1
}
Key Format Inconsistency (Intentional)The mixed key format (spaces vs underscores) matches the actual hardware output from the greenhouse system. Do not “fix” this inconsistency as it would break hardware integration.

QoS Levels

MQTT Quality of Service levels determine message delivery guarantees:
QoSNameDelivery GuaranteeUse Case
0At most onceFire and forgetSensor data (frequent updates)
1At least onceAcknowledged deliveryActuator commands
2Exactly onceGuaranteed deliveryCritical alerts

Default Configuration

application.yaml
mqtt:
  qos:
    default: 0  # Sensor data (high frequency, loss acceptable)
For critical operations, override QoS in code:
// Publish actuator command with QoS 1 (guaranteed delivery)
messageHandler.setDefaultQos(1)
messagingTemplate.convertAndSend(
    "greenhouse/001/actuators/FAN_01/command",
    "{\"action\": \"ON\", \"speed\": 75}"
)

Outbound Publishing

MQTT Echo Feature

The system automatically echoes received messages back to the broker for verification:
GreenhouseDataListener.kt
fun handleGreenhouseData(message: Message<*>) {
    val topic = message.headers[MqttHeaders.RECEIVED_TOPIC] as? String ?: return
    val payload = message.payload as String
    
    // Extract tenant ID from topic
    val tenantId = when {
        topic.startsWith("GREENHOUSE/") -> topic.substringAfter("GREENHOUSE/").takeWhile { it != '/' }
        topic == "GREENHOUSE" -> "DEFAULT"
        else -> "UNKNOWN"
    }
    
    // Process the message
    messageProcessor.processGreenhouseData(payload, tenantId)
    
    // ✅ Echo message back to MQTT broker for verification
    try {
        val messageDto = payload.toRealDataDto(
            timestamp = Instant.now(),
            greenhouseId = tenantId
        )
        
        val published = mqttPublishService.publishGreenhouseData(messageDto)
        
        if (published) {
            logger.info("✅ MQTT echo sent successfully - TenantID: {}", tenantId)
        }
    } catch (echoError: Exception) {
        logger.error("❌ Error sending MQTT echo: {}", echoError.message)
    }
}

Outbound Adapter Configuration

MqttConfig.kt
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
fun mqttOutbound(): MessageHandler {
    // Generate a unique client ID for outbound adapter
    val clientId = "$clientIdPrefix-outbound-${UUID.randomUUID()}"

    // Create the handler
    val messageHandler = MqttPahoMessageHandler(clientId, mqttClientFactory())

    // Configure the handler
    messageHandler.setAsync(true)  // Async sending for better performance
    messageHandler.setDefaultQos(defaultQos)

    // Note: Topic can be specified per message using MqttHeaders.TOPIC header
    // If not specified, can configure default topic:
    // messageHandler.setDefaultTopic("default/topic")

    return messageHandler
}

Connection Management

Automatic Reconnection

The Paho client handles reconnection automatically:
options.isAutomaticReconnect = true  // Enabled by default
options.connectionTimeout = 10       // Max 10 seconds for initial connection
options.keepAliveInterval = 60       // Ping every 60 seconds

Session Persistence

options.isCleanSession = false  // Persist subscriptions across restarts
Clean Session = false means:
  • MQTT broker remembers subscriptions after disconnect
  • Queued messages (QoS 1/2) are delivered on reconnect
  • Client ID must be unique and consistent

Health Check

Monitor MQTT connection status via Spring Boot Actuator:
curl http://localhost:8080/actuator/health
Response:
{
  "status": "UP",
  "components": {
    "mqtt": {
      "status": "UP",
      "details": {
        "clientId": "greenhouse-api-inbound-550e8400-e29b-41d4-a716-446655440000",
        "connected": true
      }
    }
  }
}

Testing MQTT Integration

Manual Testing with mosquitto_pub

# Publish to tenant SARA
mosquitto_pub -h mqtt.invernaderos.com -p 8883 \
  --cafile ca.crt \
  -u api-client -P "password" \
  -t "GREENHOUSE/SARA" \
  -m '{"TEMPERATURA INVERNADERO 01": 24.5, "HUMEDAD INVERNADERO 01": 62.3}'

Integration Test

@SpringBootTest
class MqttIntegrationTest {

    @Autowired
    lateinit var mqttGateway: MqttOutboundGateway

    @Test
    fun `should process GREENHOUSE message and store in database`() {
        // Given
        val payload = """
            {
              "TEMPERATURA INVERNADERO 01": 24.5,
              "HUMEDAD INVERNADERO 01": 62.3
            }
        """.trimIndent()

        // When
        mqttGateway.sendToMqtt("GREENHOUSE/SARA", payload)

        // Then
        await().atMost(Duration.ofSeconds(5)).untilAsserted {
            val readings = sensorReadingRepository.findByGreenhouseIdSince(
                greenhouseId = "SARA",
                since = Instant.now().minusSeconds(10)
            )
            assertThat(readings).isNotEmpty
            assertThat(readings.first().value).isEqualTo(24.5)
        }
    }
}

Best Practices

  1. Use QoS 0 for high-frequency sensor data (fire and forget)
  2. Use QoS 1 for actuator commands (guaranteed delivery)
  3. Enable automatic reconnection for production resilience
  4. Set cleanSession = false to persist subscriptions
  5. Use unique client IDs with UUID suffix to avoid conflicts
  6. Log all MQTT operations for debugging and auditing
  7. Monitor connection health via Actuator endpoints
For multi-tenant data isolation, see Multi-Tenant Architecture.For real-time data broadcasting, see WebSocket Real-Time Updates.

Build docs developers (and LLMs) love