NetPOS uses MQTT (Message Queuing Telemetry Transport) over TLS for real-time event streaming, enabling instant notifications for transactions, device events, and system status updates.
Overview
MQTT integration provides:
Real-time transaction notifications
Device status monitoring
Authentication events
Battery and power status
SMS delivery status
Print job status
Connection Configuration
MQTT Broker Details
MQTT broker hostname from BuildConfig.BASE_URL_NETPOS_MQTT
MQTTS port: 8883 (MQTT over TLS)
Format: {DeviceModel}-{TerminalID}{RandomNumber} Example: Samsung-SM-G950F-2058XU2314523678
SSL/TLS Configuration
MQTT uses mutual TLS authentication:
import com.hivemq.client.mqtt.MqttClient
import com.hivemq.client.mqtt.mqtt3.Mqtt3RxClient
import com.hivemq.client.internal.mqtt.MqttClientSslConfigImplBuilder
private const val SERVER_HOST = BuildConfig.BASE_URL_NETPOS_MQTT
private const val PORT = 8883
private fun getMqttClientSSLConfigImpl (context: Context ): MqttClientSslConfigImpl {
return MqttClientSslConfigImplBuilder. Default ()
. apply {
hostnameVerifier { _, _ -> true }
trustManagerFactory (SSLUtil. getTrustManagerFactory (context))
keyManagerFactory (SSLUtil. getKeyMangerFactory (context))
}. build ()
}
Client Initialization
Basic Setup
import android.content.Context
import android.os.Build
import com.hivemq.client.mqtt.MqttClient
import com.hivemq.client.mqtt.mqtt3.Mqtt3RxClient
import com.woleapp.netpos.model.User
import com.woleapp.netpos.util.Singletons
import timber.log.Timber
object MqttHelper {
private var client: Mqtt3RxClient ? = null
fun < T > init (context: Context , event: MqttEvent < T >? = null , topic: MqttTopics ? = null ) {
// Check if already connected
if (client != null && client !! .state.isConnected) {
checkDatabaseForFailedEvents (context)
return
}
val user: User ? = Singletons. getCurrentlyLoggedInUser ()
user?. let { u ->
if (u.terminal_id. isNullOrEmpty ()) {
Timber. e ( "Terminal ID Null" )
return @let
}
// Generate unique client ID
val clientId = " ${ Build.MODEL } - ${ u.terminal_id !! }${ ( 10000 .. 999999999 ). random () } "
// Build MQTT client
val clientBuilder = MqttClient. builder ()
. identifier (clientId)
. sslConfig ( getMqttClientSSLConfigImpl (context))
. serverHost (SERVER_HOST)
. serverPort (PORT)
. automaticReconnectWithDefaultConfig ()
. addConnectedListener {
checkDatabaseForFailedEvents (context)
Timber. e ( "Client $clientId Connected Successfully to $SERVER_HOST " )
}
. addDisconnectedListener {
Timber. e ( "Disconnected::cause - ${ it.cause } " )
}
// Create RxJava-enabled MQTT client
client = clientBuilder. useMqttVersion3 (). buildRx (). apply {
connect (). subscribe { t1, t2 ->
t1?. let {
event?. let {
sendPayload (topic !! , it)
}
Timber. e ( "Connected:" )
}
t2?. let {
Timber. e ( "Connection Failed" )
Timber. e (it)
}
}. disposeWith (disposables)
}
}
}
}
Automatic Reconnection
The client automatically reconnects using HiveMQ’s default configuration:
. automaticReconnectWithDefaultConfig ()
Default reconnect behavior:
Initial delay: 1 second
Max delay: 120 seconds
Exponential backoff with jitter
MQTT Topics
NetPOS publishes events to predefined topics:
Topic Structure
enum class MqttTopics ( val topic: String ) {
AUTHENTICATION ( "mqtt.pos.authentication.event" ),
TERMINAL_CONFIGURATION ( "mqtt.pos.terminal_config.event" ),
TRANSACTIONS ( "mqtt.pos.transaction.event" ),
PRINTING_RECEIPT ( "mqtt.pos.device.event" ),
NIP_PULL ( "mqtt.pos.bank_transfer.event" ),
NIP_NEW ( "mqtt.pos.generate_session_code.event" ),
NIP_SEARCH ( "mqtt.pos.verify_session_code.event" ),
CARD_READER_EVENTS ( "mqtt.pos.device.event" ),
POWER_EVENTS ( "mqtt.pos.device.event" ),
BATTERY_EVENTS ( "mqtt.pos.device.event" ),
SMS_EVENTS ( "mqtt.pos.sms.event" )
}
Topic Reference
mqtt.pos.authentication.event
Event Type: User authentication and session eventsPayload: {
"storm_id" : "USR123456" ,
"business_name" : "Acme Store" ,
"terminalId" : "2058XU23" ,
"serial_number" : "DEVICE123" ,
"event" : "AUTHENTICATION" ,
"status" : "SUCCESS" ,
"code" : "00" ,
"timestamp" : 1638360000000 ,
"geo" : "lat:6.5244 long:3.3792" ,
"data" : {
"business_name" : "Acme Store" ,
"storm_id" : "USR123456" ,
"serial_number" : "DEVICE123"
}
}
mqtt.pos.transaction.event
Event Type: All payment transactionsPayload: {
"storm_id" : "USR123456" ,
"business_name" : "Acme Store" ,
"terminalId" : "2058XU23" ,
"serial_number" : "DEVICE123" ,
"event" : "TRANSACTION" ,
"status" : "SUCCESS" ,
"code" : "00" ,
"timestamp" : 1638360000000 ,
"geo" : "lat:6.5244 long:3.3792" ,
"transactionType" : "PURCHASE" ,
"data" : {
"amount" : 5000.00 ,
"reference" : "TXN202112011234" ,
"cardPan" : "506066******1234" ,
"cardHolder" : "JOHN DOE" ,
"authCode" : "123456"
}
}
mqtt.pos.bank_transfer.event
Event Type: NIP (Nigeria Instant Payment) transfersPayload: {
"storm_id" : "USR123456" ,
"terminalId" : "2058XU23" ,
"event" : "BANK_TRANSFER" ,
"status" : "SUCCESS" ,
"code" : "00" ,
"timestamp" : 1638360000000 ,
"data" : {
"session_code" : "123456" ,
"code_verified" : true ,
"start_date" : "2021-12-01T10:00:00Z" ,
"end_date" : "2021-12-01T10:05:00Z"
}
}
Event Type: Device-related events (printer, card reader, power, battery)Printer Event: {
"event" : "PRINTING_RECEIPT" ,
"status" : "SUCCESS" ,
"code" : "00" ,
"data" : {
"transactionRef" : "TXN202112011234" ,
"printerCode" : "00"
}
}
Battery Event: {
"event" : "BATTERY_EVENTS" ,
"data" : {
"battery_percentage" : 75 ,
"status" : "CHARGING"
}
}
Card Reader Event: {
"event" : "CARD_READER_EVENTS" ,
"data" : {
"cardExpiry" : "12/25" ,
"cardHolder" : "JOHN DOE" ,
"maskedPan" : "506066******1234" ,
"readerError" : null
}
}
Event Type: SMS delivery statusPayload: {
"event" : "SMS_EVENTS" ,
"status" : "SUCCESS" ,
"code" : "00" ,
"data" : {
"to" : "+2348012345678" ,
"status" : "DELIVERED" ,
"serverResponse" : "Message sent successfully"
}
}
Publishing Events
Send Event to Topic
import com.hivemq.client.mqtt.datatypes.MqttQos
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish
import com.woleapp.netpos.util.Singletons.gson
import io.reactivex.Flowable
fun < T > sendPayload (
mqttTopic: MqttTopics ,
event: MqttEvent < T >? = null ,
failedEvent: MqttEventsLocal ? = null
) {
if (event == null && failedEvent == null ) {
Timber. e ( "Nothing to publish" )
return
}
// Add geolocation
event?. apply {
geo = Prefs. getString (PREF_LAST_LOCATION, "lat:6.5244 long:3.3792" )
Timber. e ( "Sending to topic: ${ mqttTopic.topic } " )
}
client?. let { client ->
if ( ! client.state.isConnected) {
Timber. e ( "Not connected, save to local database" )
val local = event?. toLocal (mqttTopic.topic, "client not connected" ) ?: failedEvent
local?. let { savePayloadToLocalDatabase (local) }
return @let
}
// Build MQTT publish message
var flowable: Flowable < Mqtt3Publish >? = null
event?. let {
flowable = Flowable. just (
Mqtt3Publish. builder ()
. topic (mqttTopic.topic)
. qos (MqttQos.AT_LEAST_ONCE) // QoS 1
. payload (gson. toJson (event). toByteArray (Charset. forName ( "UTF-8" )))
. build ()
)
}
// Publish message
client. publish (flowable !! ). subscribe (
{
if (it.error.isPresent) {
Timber. e ( "Error: ${ it.error. get ().localizedMessage } " )
savePayloadToLocalDatabase (
MqttEventsLocal (
it.publish.topic. toString (),
String (it.publish.payloadAsBytes, StandardCharsets.UTF_8),
"error during publishing"
)
)
}
Timber. e ( "Published" )
},
{ error ->
Timber. e ( "Publish failed; saving to local database" )
val local = event?. toLocal (mqttTopic.topic, error.localizedMessage) ?: failedEvent
local?. let { savePayloadToLocalDatabase (local) }
},
{ Timber. e ( "Publish completed" ) }
). disposeWith (disposables)
}
}
Example: Transaction Event
// Create transaction event
val transactionEvent = MqttEvent < TransactionData >(). apply {
event = MqttEvents.TRANSACTIONS.event
status = "SUCCESS"
code = MqttStatus.SUCCESS.code
timestamp = System. currentTimeMillis ()
transactionType = "PURCHASE"
data = TransactionData (
amount = 5000.00 ,
reference = "TXN202112011234" ,
cardPan = "506066******1234"
)
}
// Publish to MQTT
MqttHelper. sendPayload (MqttTopics.TRANSACTIONS, transactionEvent)
Event Models
Base Event Structure
data class MqttEvent < T >(
var storm_id: String ? = null ,
var business_name: String ? = null ,
var terminalId: String ? = null ,
@SerializedName ( "serial_number" ) var deviceSerial: String ? = null ,
@Ignore var data : T ? = null ,
var event: String ? = null ,
var status: String ? = null ,
var code: String ? = null ,
var timestamp: Long ? = null ,
var geo: String ? = null ,
var transactionType: String ? = null
) {
init {
// Auto-populate from current user
val user = Singletons. getCurrentlyLoggedInUser ()
storm_id = user !! .netplus_id !!
business_name = user.business_name !!
terminalId = NetPosTerminalConfig. getTerminalId ()
deviceSerial = NetPosSdk. getDeviceSerial ()
}
}
Status Codes
enum class MqttStatus ( val code: String ) {
SUCCESS ( "00" ),
ERROR ( "01" )
}
Offline Support
NetPOS implements offline queue for failed MQTT events:
Local Event Storage
@Entity (tableName = "mqttEvents" )
data class MqttEventsLocal (
val topic: String ,
val data : String ,
val cause: String ? = null
) {
@PrimaryKey (autoGenerate = true ) var id: Int = 0
}
Retry Failed Events
When connection is restored:
private fun checkDatabaseForFailedEvents (context: Context ) {
if (mqttLocalDao == null ) {
mqttLocalDao = AppDatabase. getDatabaseInstance (context). mqttLocalDao ()
}
mqttLocalDao?. apply {
val subscribe = getLocalEvents (). flatMap {
deleteAllEvents (). toSingleDefault (it)
}. subscribeOn (Schedulers. io ())
. observeOn (AndroidSchedulers. mainThread ())
. subscribe { events, error ->
events?. let {
it. forEach { localEvent ->
sendPayload < Nothing >(
getTopic (localEvent.topic),
failedEvent = localEvent
)
}
}
}
}
}
Disconnection
fun disconnect () {
if (client == null ) {
Timber. e ( "Client is null or not connected" )
return
}
client?. disconnect ()?. subscribeOn (Schedulers. io ())
?. observeOn (AndroidSchedulers. mainThread ())
?. subscribe (
{ Timber. e ( "Disconnected" ) },
{ Timber. e (it) }
)?. disposeWith (disposables)
client = null
disposables. clear ()
}
Best Practices
QoS Level Use QoS 1 (AT_LEAST_ONCE) for transaction events to ensure delivery
Offline Queue Always save failed events to local database for retry
Reconnection Enable automatic reconnection for network resilience
SSL/TLS Use mutual TLS authentication for secure MQTT connections
Next Steps
API Endpoints Explore REST API integration for data queries and management