Overview
The Invernaderos API uses Spring Integration MQTT with Eclipse Paho client to receive real-time sensor data from greenhouse hardware. The system supports multi-tenant topic routing, automatic reconnection, and bidirectional message verification.
MQTT Broker : EMQX (open-source MQTT broker)Protocol : MQTT 3.1.1 over WebSocket Secure (WSS)QoS Levels : Configurable per topic (default QoS 0)
Configuration
Connection Settings
The MQTT client is configured via MqttConfig.kt using Spring Integration:
@Configuration
class MqttConfig (
@param : Value ("\${ spring.mqtt.broker.url }")
private val brokerUrl: String , // e.g., wss://mqtt.example.com:8084/mqtt
@param : Value ("\${ spring.mqtt.username }")
private val username: String ,
@param : Value ("\${ spring.mqtt.password }")
private val password: String ,
@param : Value ("\${ spring.mqtt.client.id - prefix : greenhouse - api }")
private val clientIdPrefix: String ,
@param : Value ("\${ spring.mqtt.client.clean - session : false }")
private val cleanSession: Boolean ,
@param : Value ("\${ spring.mqtt.client.connection - timeout : 10 }")
private val connectionTimeout: Int ,
@param : Value ("\${ spring.mqtt.client.keep - alive - interval : 60 }")
private val keepAliveInterval: Int ,
@param : Value ("\${ spring.mqtt.client.automatic - reconnect : true }")
private val automaticReconnect: Boolean ,
// ... listener dependencies
) {
/**
* Factory for creating MQTT clients configured per official documentation
* https://docs.spring.io/spring-integration/reference/mqtt.html
*/
@Bean
fun mqttClientFactory (): MqttPahoClientFactory {
val factory = DefaultMqttPahoClientFactory ()
val options = MqttConnectOptions ()
// Configure connection options
options.serverURIs = arrayOf (brokerUrl)
options.userName = username
options.password = password. toCharArray ()
options.isCleanSession = cleanSession
options.connectionTimeout = connectionTimeout
options.keepAliveInterval = keepAliveInterval
options.isAutomaticReconnect = automaticReconnect
factory.connectionOptions = options
return factory
}
}
Application Properties
Configure MQTT connection in application.yaml:
spring :
mqtt :
broker :
url : ${MQTT_BROKER_URL:wss://mqtt.invernaderos.com:8084/mqtt}
username : ${MQTT_USERNAME:api-client}
password : ${MQTT_PASSWORD}
client :
id-prefix : greenhouse-api
clean-session : false # Persist session state
connection-timeout : 10 # seconds
keep-alive-interval : 60 # seconds
automatic-reconnect : true
topics :
greenhouse-multi-tenant : GREENHOUSE/+ # Wildcard for all tenants
sensors-pattern : greenhouse/+/sensors/#
actuators-pattern : greenhouse/+/actuators/#
system-events : system/events/#
qos :
default : 0 # At most once (fire and forget)
Connection Options Explained
brokerUrl : WebSocket Secure (WSS) URL for MQTT broker
cleanSession : false = persist subscriptions across restarts
connectionTimeout : Max seconds to wait for connection
keepAliveInterval : Ping interval to keep connection alive
automaticReconnect : Auto-reconnect on connection loss
id-prefix : Used to generate unique client IDs ({prefix}-inbound-{UUID})
Topic Structure
Multi-Tenant Topics
The system supports dynamic multi-tenant topic routing:
Greenhouse Data
Sensor-Specific
Actuator Status
Legacy (Deprecated)
Pattern: GREENHOUSE/{tenantId}
Examples:
- GREENHOUSE/SARA → Vivero Sara greenhouse company
- GREENHOUSE/001 → Generic tenant ID
- GREENHOUSE/NARANJOS → Los Naranjos farm
Payload Format:
{
"TEMPERATURA INVERNADERO 01": 24.5,
"HUMEDAD INVERNADERO 01": 62.3,
"INVERNADERO_01_SECTOR_01": 1,
"INVERNADERO_01_EXTRACTOR": 0
}
Pattern: greenhouse/{tenantId}/sensors/{sensorId}
Examples:
- greenhouse/001/sensors/TEMP_01
- greenhouse/SARA/sensors/HUM_02
Payload Format:
{
"value": 24.5,
"unit": "°C",
"timestamp": "2025-11-16T10:30:00Z"
}
Pattern: greenhouse/{tenantId}/actuators/{actuatorId}/status
Examples:
- greenhouse/001/actuators/FAN_01/status
- greenhouse/SARA/actuators/VALVE_02/status
Payload Format:
{
"state": "ON",
"power": 75,
"mode": "AUTOMATIC"
}
Topic: GREENHOUSE (no tenant suffix)
Maps to: tenant_id = "DEFAULT"
Use case: Backward compatibility during migration
This topic is deprecated. All new integrations should use the multi-tenant format GREENHOUSE/{tenantId}.
Topic Subscription
The MQTT adapter subscribes to multiple topics using wildcards:
@Bean
fun mqttInbound (): MessageProducer {
// Generate a unique client ID for the inbound adapter
val clientId = " $clientIdPrefix -inbound- ${ UUID. randomUUID () } "
// Configure topics to subscribe
val topics = arrayOf (
"GREENHOUSE" , // Legacy topic (backward compatibility)
greenhouseMultiTenantPattern, // Multi-tenant: GREENHOUSE/+ (wildcard)
sensorsTopicPattern, // greenhouse/+/sensors/#
actuatorsTopicPattern, // greenhouse/+/actuators/#
systemEventsTopicPattern // system/events/#
)
// Create the adapter with client ID, factory, and topics
val adapter = MqttPahoMessageDrivenChannelAdapter (
clientId,
mqttClientFactory (),
* topics
)
// Configure the adapter
adapter. setCompletionTimeout ( 5000 )
adapter. setConverter ( DefaultPahoMessageConverter ())
adapter. setQos (defaultQos) // QoS 0 by default
adapter.outputChannel = mqttInputChannel ()
return adapter
}
Wildcard Patterns :
+ = Single-level wildcard (e.g., GREENHOUSE/+ matches GREENHOUSE/SARA)
# = Multi-level wildcard (e.g., sensors/# matches sensors/TEMP_01/value)
Message Processing
Inbound Message Handler
Messages are routed to appropriate listeners based on topic:
@Bean
@ServiceActivator (inputChannel = "mqttInputChannel" )
fun mqttMessageHandler (): MessageHandler {
return MessageHandler { message: Message <*> ->
try {
// Extract message information FIRST (before processing)
val topic = message.headers[MqttHeaders.RECEIVED_TOPIC] as ? String ?: ""
val payload = message.payload as String
val qos = message.headers[MqttHeaders.RECEIVED_QOS] as ? Int
// Logging for debugging
logger. debug ( "MQTT message received - Topic: {}, QoS: {}, Payload: {}" ,
topic, qos, payload)
// Route message to appropriate listener
when {
// Legacy topic (backward compatibility)
topic == "GREENHOUSE" -> {
logger. debug ( "Processing legacy GREENHOUSE topic" )
greenhouseDataListener. handleGreenhouseData (message)
}
// Multi-tenant topic: GREENHOUSE/empresaID
topic. startsWith ( "GREENHOUSE/" ) && topic. split ( "/" ).size == 2 -> {
val tenantId = topic. substringAfter ( "GREENHOUSE/" )
logger. debug ( "Processing multi-tenant topic for tenant: {}" , tenantId)
greenhouseDataListener. handleGreenhouseData (message)
}
topic. contains ( "/sensors/" ) -> sensorDataListener. handleSensorData (message)
topic. contains ( "/actuators/status" ) -> actuatorStatusListener. handleActuatorStatus (message)
topic. contains ( "/alerts/" ) -> logger. info ( "Alert received on topic: {}" , topic)
else -> logger. warn ( "Unhandled topic: {}" , topic)
}
} catch (e: Exception ) {
logger. error ( "Error processing MQTT message: {}" , e.message, e)
throw e // Re-throw so error channel handles it
}
}
}
Greenhouse messages use a hybrid JSON format with mixed key styles:
Temperature/Humidity (Spaces in Keys)
Sectors/Extractors (Underscores in Keys)
Complete Message (22 Fields)
{
"TEMPERATURA INVERNADERO 01" : 24.5 ,
"HUMEDAD INVERNADERO 01" : 62.3 ,
"TEMPERATURA INVERNADERO 02" : 23.8 ,
"HUMEDAD INVERNADERO 02" : 65.1
}
Key Format Inconsistency (Intentional) The mixed key format (spaces vs underscores) matches the actual hardware output from the greenhouse system. Do not “fix” this inconsistency as it would break hardware integration.
QoS Levels
MQTT Quality of Service levels determine message delivery guarantees:
QoS Name Delivery Guarantee Use Case 0 At most once Fire and forget Sensor data (frequent updates) 1 At least once Acknowledged delivery Actuator commands 2 Exactly once Guaranteed delivery Critical alerts
Default Configuration
mqtt :
qos :
default : 0 # Sensor data (high frequency, loss acceptable)
For critical operations, override QoS in code:
// Publish actuator command with QoS 1 (guaranteed delivery)
messageHandler. setDefaultQos ( 1 )
messagingTemplate. convertAndSend (
"greenhouse/001/actuators/FAN_01/command" ,
"{ \" action \" : \" ON \" , \" speed \" : 75}"
)
Outbound Publishing
MQTT Echo Feature
The system automatically echoes received messages back to the broker for verification:
GreenhouseDataListener.kt
fun handleGreenhouseData (message: Message <*>) {
val topic = message.headers[MqttHeaders.RECEIVED_TOPIC] as ? String ?: return
val payload = message.payload as String
// Extract tenant ID from topic
val tenantId = when {
topic. startsWith ( "GREENHOUSE/" ) -> topic. substringAfter ( "GREENHOUSE/" ). takeWhile { it != '/' }
topic == "GREENHOUSE" -> "DEFAULT"
else -> "UNKNOWN"
}
// Process the message
messageProcessor. processGreenhouseData (payload, tenantId)
// ✅ Echo message back to MQTT broker for verification
try {
val messageDto = payload. toRealDataDto (
timestamp = Instant. now (),
greenhouseId = tenantId
)
val published = mqttPublishService. publishGreenhouseData (messageDto)
if (published) {
logger. info ( "✅ MQTT echo sent successfully - TenantID: {}" , tenantId)
}
} catch (echoError: Exception ) {
logger. error ( "❌ Error sending MQTT echo: {}" , echoError.message)
}
}
Outbound Adapter Configuration
@Bean
@ServiceActivator (inputChannel = "mqttOutboundChannel" )
fun mqttOutbound (): MessageHandler {
// Generate a unique client ID for outbound adapter
val clientId = " $clientIdPrefix -outbound- ${ UUID. randomUUID () } "
// Create the handler
val messageHandler = MqttPahoMessageHandler (clientId, mqttClientFactory ())
// Configure the handler
messageHandler. setAsync ( true ) // Async sending for better performance
messageHandler. setDefaultQos (defaultQos)
// Note: Topic can be specified per message using MqttHeaders.TOPIC header
// If not specified, can configure default topic:
// messageHandler.setDefaultTopic("default/topic")
return messageHandler
}
Connection Management
Automatic Reconnection
The Paho client handles reconnection automatically:
options.isAutomaticReconnect = true // Enabled by default
options.connectionTimeout = 10 // Max 10 seconds for initial connection
options.keepAliveInterval = 60 // Ping every 60 seconds
Session Persistence
options.isCleanSession = false // Persist subscriptions across restarts
Clean Session = false means:
MQTT broker remembers subscriptions after disconnect
Queued messages (QoS 1/2) are delivered on reconnect
Client ID must be unique and consistent
Health Check
Monitor MQTT connection status via Spring Boot Actuator:
curl http://localhost:8080/actuator/health
Response:
{
"status" : "UP" ,
"components" : {
"mqtt" : {
"status" : "UP" ,
"details" : {
"clientId" : "greenhouse-api-inbound-550e8400-e29b-41d4-a716-446655440000" ,
"connected" : true
}
}
}
}
Testing MQTT Integration
Manual Testing with mosquitto_pub
Publish to Multi-Tenant Topic
Subscribe to Response Topic
Test Legacy Topic
# Publish to tenant SARA
mosquitto_pub -h mqtt.invernaderos.com -p 8883 \
--cafile ca.crt \
-u api-client -P "password" \
-t "GREENHOUSE/SARA" \
-m '{"TEMPERATURA INVERNADERO 01": 24.5, "HUMEDAD INVERNADERO 01": 62.3}'
Integration Test
@SpringBootTest
class MqttIntegrationTest {
@Autowired
lateinit var mqttGateway: MqttOutboundGateway
@Test
fun `should process GREENHOUSE message and store in database` () {
// Given
val payload = """
{
"TEMPERATURA INVERNADERO 01": 24.5,
"HUMEDAD INVERNADERO 01": 62.3
}
""" . trimIndent ()
// When
mqttGateway. sendToMqtt ( "GREENHOUSE/SARA" , payload)
// Then
await (). atMost (Duration. ofSeconds ( 5 )). untilAsserted {
val readings = sensorReadingRepository. findByGreenhouseIdSince (
greenhouseId = "SARA" ,
since = Instant. now (). minusSeconds ( 10 )
)
assertThat (readings).isNotEmpty
assertThat (readings. first (). value ). isEqualTo ( 24.5 )
}
}
}
Best Practices
Use QoS 0 for high-frequency sensor data (fire and forget)
Use QoS 1 for actuator commands (guaranteed delivery)
Enable automatic reconnection for production resilience
Set cleanSession = false to persist subscriptions
Use unique client IDs with UUID suffix to avoid conflicts
Log all MQTT operations for debugging and auditing
Monitor connection health via Actuator endpoints