Skip to main content
NetPOS uses MQTT (Message Queuing Telemetry Transport) over TLS for real-time event streaming, enabling instant notifications for transactions, device events, and system status updates.

Overview

MQTT integration provides:
  • Real-time transaction notifications
  • Device status monitoring
  • Authentication events
  • Battery and power status
  • SMS delivery status
  • Print job status

Connection Configuration

MQTT Broker Details

SERVER_HOST
string
required
MQTT broker hostname from BuildConfig.BASE_URL_NETPOS_MQTT
PORT
number
required
MQTTS port: 8883 (MQTT over TLS)
CLIENT_ID
string
required
Format: {DeviceModel}-{TerminalID}{RandomNumber}Example: Samsung-SM-G950F-2058XU2314523678

SSL/TLS Configuration

MQTT uses mutual TLS authentication:
MqttHelper.kt
import com.hivemq.client.mqtt.MqttClient
import com.hivemq.client.mqtt.mqtt3.Mqtt3RxClient
import com.hivemq.client.internal.mqtt.MqttClientSslConfigImplBuilder

private const val SERVER_HOST = BuildConfig.BASE_URL_NETPOS_MQTT
private const val PORT = 8883

private fun getMqttClientSSLConfigImpl(context: Context): MqttClientSslConfigImpl {
    return MqttClientSslConfigImplBuilder.Default()
        .apply {
            hostnameVerifier { _, _ -> true }
            trustManagerFactory(SSLUtil.getTrustManagerFactory(context))
            keyManagerFactory(SSLUtil.getKeyMangerFactory(context))
        }.build()
}

Client Initialization

Basic Setup

import android.content.Context
import android.os.Build
import com.hivemq.client.mqtt.MqttClient
import com.hivemq.client.mqtt.mqtt3.Mqtt3RxClient
import com.woleapp.netpos.model.User
import com.woleapp.netpos.util.Singletons
import timber.log.Timber

object MqttHelper {
    private var client: Mqtt3RxClient? = null
    
    fun <T> init(context: Context, event: MqttEvent<T>? = null, topic: MqttTopics? = null) {
        // Check if already connected
        if (client != null && client!!.state.isConnected) {
            checkDatabaseForFailedEvents(context)
            return
        }
        
        val user: User? = Singletons.getCurrentlyLoggedInUser()
        user?.let { u ->
            if (u.terminal_id.isNullOrEmpty()) {
                Timber.e("Terminal ID Null")
                return@let
            }
            
            // Generate unique client ID
            val clientId = "${Build.MODEL}-${u.terminal_id!!}${(10000..999999999).random()}"
            
            // Build MQTT client
            val clientBuilder = MqttClient.builder()
                .identifier(clientId)
                .sslConfig(getMqttClientSSLConfigImpl(context))
                .serverHost(SERVER_HOST)
                .serverPort(PORT)
                .automaticReconnectWithDefaultConfig()
                .addConnectedListener {
                    checkDatabaseForFailedEvents(context)
                    Timber.e("Client $clientId Connected Successfully to $SERVER_HOST")
                }
                .addDisconnectedListener {
                    Timber.e("Disconnected::cause - ${it.cause}")
                }
            
            // Create RxJava-enabled MQTT client
            client = clientBuilder.useMqttVersion3().buildRx().apply {
                connect().subscribe { t1, t2 ->
                    t1?.let {
                        event?.let {
                            sendPayload(topic!!, it)
                        }
                        Timber.e("Connected:")
                    }
                    t2?.let {
                        Timber.e("Connection Failed")
                        Timber.e(it)
                    }
                }.disposeWith(disposables)
            }
        }
    }
}

Automatic Reconnection

The client automatically reconnects using HiveMQ’s default configuration:
.automaticReconnectWithDefaultConfig()
Default reconnect behavior:
  • Initial delay: 1 second
  • Max delay: 120 seconds
  • Exponential backoff with jitter

MQTT Topics

NetPOS publishes events to predefined topics:

Topic Structure

enum class MqttTopics(val topic: String) {
    AUTHENTICATION("mqtt.pos.authentication.event"),
    TERMINAL_CONFIGURATION("mqtt.pos.terminal_config.event"),
    TRANSACTIONS("mqtt.pos.transaction.event"),
    PRINTING_RECEIPT("mqtt.pos.device.event"),
    NIP_PULL("mqtt.pos.bank_transfer.event"),
    NIP_NEW("mqtt.pos.generate_session_code.event"),
    NIP_SEARCH("mqtt.pos.verify_session_code.event"),
    CARD_READER_EVENTS("mqtt.pos.device.event"),
    POWER_EVENTS("mqtt.pos.device.event"),
    BATTERY_EVENTS("mqtt.pos.device.event"),
    SMS_EVENTS("mqtt.pos.sms.event")
}

Topic Reference

Event Type: User authentication and session eventsPayload:
{
  "storm_id": "USR123456",
  "business_name": "Acme Store",
  "terminalId": "2058XU23",
  "serial_number": "DEVICE123",
  "event": "AUTHENTICATION",
  "status": "SUCCESS",
  "code": "00",
  "timestamp": 1638360000000,
  "geo": "lat:6.5244 long:3.3792",
  "data": {
    "business_name": "Acme Store",
    "storm_id": "USR123456",
    "serial_number": "DEVICE123"
  }
}
Event Type: All payment transactionsPayload:
{
  "storm_id": "USR123456",
  "business_name": "Acme Store",
  "terminalId": "2058XU23",
  "serial_number": "DEVICE123",
  "event": "TRANSACTION",
  "status": "SUCCESS",
  "code": "00",
  "timestamp": 1638360000000,
  "geo": "lat:6.5244 long:3.3792",
  "transactionType": "PURCHASE",
  "data": {
    "amount": 5000.00,
    "reference": "TXN202112011234",
    "cardPan": "506066******1234",
    "cardHolder": "JOHN DOE",
    "authCode": "123456"
  }
}
Event Type: NIP (Nigeria Instant Payment) transfersPayload:
{
  "storm_id": "USR123456",
  "terminalId": "2058XU23",
  "event": "BANK_TRANSFER",
  "status": "SUCCESS",
  "code": "00",
  "timestamp": 1638360000000,
  "data": {
    "session_code": "123456",
    "code_verified": true,
    "start_date": "2021-12-01T10:00:00Z",
    "end_date": "2021-12-01T10:05:00Z"
  }
}
Event Type: Device-related events (printer, card reader, power, battery)Printer Event:
{
  "event": "PRINTING_RECEIPT",
  "status": "SUCCESS",
  "code": "00",
  "data": {
    "transactionRef": "TXN202112011234",
    "printerCode": "00"
  }
}
Battery Event:
{
  "event": "BATTERY_EVENTS",
  "data": {
    "battery_percentage": 75,
    "status": "CHARGING"
  }
}
Card Reader Event:
{
  "event": "CARD_READER_EVENTS",
  "data": {
    "cardExpiry": "12/25",
    "cardHolder": "JOHN DOE",
    "maskedPan": "506066******1234",
    "readerError": null
  }
}
Event Type: SMS delivery statusPayload:
{
  "event": "SMS_EVENTS",
  "status": "SUCCESS",
  "code": "00",
  "data": {
    "to": "+2348012345678",
    "status": "DELIVERED",
    "serverResponse": "Message sent successfully"
  }
}

Publishing Events

Send Event to Topic

import com.hivemq.client.mqtt.datatypes.MqttQos
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish
import com.woleapp.netpos.util.Singletons.gson
import io.reactivex.Flowable

fun <T> sendPayload(
    mqttTopic: MqttTopics,
    event: MqttEvent<T>? = null,
    failedEvent: MqttEventsLocal? = null
) {
    if (event == null && failedEvent == null) {
        Timber.e("Nothing to publish")
        return
    }
    
    // Add geolocation
    event?.apply {
        geo = Prefs.getString(PREF_LAST_LOCATION, "lat:6.5244 long:3.3792")
        Timber.e("Sending to topic: ${mqttTopic.topic}")
    }
    
    client?.let { client ->
        if (!client.state.isConnected) {
            Timber.e("Not connected, save to local database")
            val local = event?.toLocal(mqttTopic.topic, "client not connected") ?: failedEvent
            local?.let { savePayloadToLocalDatabase(local) }
            return@let
        }
        
        // Build MQTT publish message
        var flowable: Flowable<Mqtt3Publish>? = null
        event?.let {
            flowable = Flowable.just(
                Mqtt3Publish.builder()
                    .topic(mqttTopic.topic)
                    .qos(MqttQos.AT_LEAST_ONCE)  // QoS 1
                    .payload(gson.toJson(event).toByteArray(Charset.forName("UTF-8")))
                    .build()
            )
        }
        
        // Publish message
        client.publish(flowable!!).subscribe(
            {
                if (it.error.isPresent) {
                    Timber.e("Error: ${it.error.get().localizedMessage}")
                    savePayloadToLocalDatabase(
                        MqttEventsLocal(
                            it.publish.topic.toString(),
                            String(it.publish.payloadAsBytes, StandardCharsets.UTF_8),
                            "error during publishing"
                        )
                    )
                }
                Timber.e("Published")
            },
            { error ->
                Timber.e("Publish failed; saving to local database")
                val local = event?.toLocal(mqttTopic.topic, error.localizedMessage) ?: failedEvent
                local?.let { savePayloadToLocalDatabase(local) }
            },
            { Timber.e("Publish completed") }
        ).disposeWith(disposables)
    }
}

Example: Transaction Event

// Create transaction event
val transactionEvent = MqttEvent<TransactionData>().apply {
    event = MqttEvents.TRANSACTIONS.event
    status = "SUCCESS"
    code = MqttStatus.SUCCESS.code
    timestamp = System.currentTimeMillis()
    transactionType = "PURCHASE"
    data = TransactionData(
        amount = 5000.00,
        reference = "TXN202112011234",
        cardPan = "506066******1234"
    )
}

// Publish to MQTT
MqttHelper.sendPayload(MqttTopics.TRANSACTIONS, transactionEvent)

Event Models

Base Event Structure

data class MqttEvent<T>(
    var storm_id: String? = null,
    var business_name: String? = null,
    var terminalId: String? = null,
    @SerializedName("serial_number") var deviceSerial: String? = null,
    @Ignore var data: T? = null,
    var event: String? = null,
    var status: String? = null,
    var code: String? = null,
    var timestamp: Long? = null,
    var geo: String? = null,
    var transactionType: String? = null
) {
    init {
        // Auto-populate from current user
        val user = Singletons.getCurrentlyLoggedInUser()
        storm_id = user!!.netplus_id!!
        business_name = user.business_name!!
        terminalId = NetPosTerminalConfig.getTerminalId()
        deviceSerial = NetPosSdk.getDeviceSerial()
    }
}

Status Codes

enum class MqttStatus(val code: String) {
    SUCCESS("00"),
    ERROR("01")
}

Offline Support

NetPOS implements offline queue for failed MQTT events:

Local Event Storage

@Entity(tableName = "mqttEvents")
data class MqttEventsLocal(
    val topic: String,
    val data: String,
    val cause: String? = null
) {
    @PrimaryKey(autoGenerate = true) var id: Int = 0
}

Retry Failed Events

When connection is restored:
private fun checkDatabaseForFailedEvents(context: Context) {
    if (mqttLocalDao == null) {
        mqttLocalDao = AppDatabase.getDatabaseInstance(context).mqttLocalDao()
    }
    
    mqttLocalDao?.apply {
        val subscribe = getLocalEvents().flatMap {
            deleteAllEvents().toSingleDefault(it)
        }.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe { events, error ->
                events?.let {
                    it.forEach { localEvent ->
                        sendPayload<Nothing>(
                            getTopic(localEvent.topic),
                            failedEvent = localEvent
                        )
                    }
                }
            }
    }
}

Disconnection

fun disconnect() {
    if (client == null) {
        Timber.e("Client is null or not connected")
        return
    }
    
    client?.disconnect()?.subscribeOn(Schedulers.io())
        ?.observeOn(AndroidSchedulers.mainThread())
        ?.subscribe(
            { Timber.e("Disconnected") },
            { Timber.e(it) }
        )?.disposeWith(disposables)
    
    client = null
    disposables.clear()
}

Best Practices

QoS Level

Use QoS 1 (AT_LEAST_ONCE) for transaction events to ensure delivery

Offline Queue

Always save failed events to local database for retry

Reconnection

Enable automatic reconnection for network resilience

SSL/TLS

Use mutual TLS authentication for secure MQTT connections

Next Steps

API Endpoints

Explore REST API integration for data queries and management

Build docs developers (and LLMs) love