Skip to content

mqtt

MQTT 3.1.1 client. Connect to brokers, publish messages, subscribe to topics, receive messages. Hand-written binary protocol implementation with no external dependencies.

Version v0.1
Platform PC, ESP32
Type Native (C)

star.mod

require dev-libs/mqtt v0.1

Usage

import "mqtt"

Functions

Open / Close

Function Signature Description
mqtt.open(host, port, clientId) (string, i32, string) -> i32 Start async connection to broker, returns handle (-1 on error)
mqtt.openAuth(host, port, clientId, user, pass) (string, i32, string, string, string) -> i32 Start async connection with username/password, returns handle (-1 on error)
mqtt.disconnect(conn) (i32) -> void Send DISCONNECT and close
var conn: i32 = mqtt.open("192.168.1.10", 1883, "sensor-01")
# ... use connection ...
mqtt.disconnect(conn)

mqtt.open() and mqtt.openAuth() are non-blocking — the call returns the handle immediately and connection proceeds asynchronously. The connected event fires when the broker connection is established. Register events with mqtt.on() before or after calling open().

Open only after the network is up. On ESP32 the IP stack does not exist until WiFi has an address, so calling open()/openAuth() before then returns -1 rather than connecting (it used to crash the device). The idiomatic pattern is to open from the WiFi gotip handler — see wifi. Enable auto-reconnect and the connection is also rebuilt on its own after a WiFi outage.

Publish / Subscribe

Function Signature Description
mqtt.publish(conn, topic, payload, qos) (i32, string, string, i32) -> i32 Publish message (0=queued, -1=outbound buffer full)
mqtt.subscribe(conn, topic, qos) (i32, string, i32) -> i32 Subscribe to topic filter (0=queued, -1=outbound buffer full)
mqtt.subscribe(conn, "sensors/temp", 0)
mqtt.publish(conn, "sensors/temp", "23.5", 0)

For receiving messages, use mqtt.on() with the event system — see Lifecycle Events below.

mqtt.publish() never blocks and never writes a partial packet. The packet is appended to a per-connection outbound buffer and flushed to the socket as it drains; if the buffer is full (the network cannot keep up with the publish rate), the call returns -1 and the packet is not queued — this is backpressure, not a fatal error, so retry it on a later event-loop turn. The connection stays open. A return 0 means the packet was queued whole, not that the broker has received it yet.

mqtt.subscribe() is fire-and-forget — the SUBACK is handled by the watcher and emits a Subscribed event.

Lifecycle Events

Function Signature Description
mqtt.on(conn, event, EventType) (i32, string, i32) -> i32 Bind lifecycle/data event to declared event type (0=success, -1=error)

Supported events: "connected", "disconnected", "subscribed", "message".

event MqttConnected {
    conn: i32
}

event MqttDisconnected {
    conn: i32
}

event MqttSubscribed {
    conn: i32
    topic: string
}

event MqttMessage {
    topic: string
    payload: string
}

mqtt.on(conn, "connected", MqttConnected)
mqtt.on(conn, "disconnected", MqttDisconnected)
mqtt.on(conn, "subscribed", MqttSubscribed)
mqtt.on(conn, "message", MqttMessage)

on MqttConnected fn(e: MqttConnected): void {
    mqtt.subscribe(e.conn, "sensors/#", 0)
}

on MqttMessage fn(e: MqttMessage): void {
    console.log(e.topic)
    console.log(e.payload)
}

runtime.keepAlive()

mqtt.on registers lifecycle and data events for the connection. The native layer emits the appropriate event struct when the lifecycle transition or incoming data occurs. See Events for full details.

Auto-Reconnect

Function Signature Description
mqtt.setAutoReconnect(conn, enabled) (i32, bool) -> void Enable/disable automatic reconnection on unexpected disconnect
mqtt.setReconnectDelays(conn, delays) (i32, array) -> void Set backoff delay schedule in seconds (max 8 entries)
var conn: i32 = mqtt.open("192.168.1.10", 1883, "sensor-01")
mqtt.setAutoReconnect(conn, true)
mqtt.setReconnectDelays(conn, [1, 2, 5, 10, 30])

When auto-reconnect is enabled and the connection drops unexpectedly:

  1. MqttDisconnected event fires
  2. After the first delay (1s), the native layer attempts to reconnect
  3. If reconnect fails, the next delay in the array is used (2s, 5s, 10s...)
  4. The last delay value repeats indefinitely until connection succeeds
  5. On successful reconnect, MqttConnected fires again
  6. The retry index resets to 0 on success

The MqttConnected event fires on every connect, including reconnects, so subscribe from its handler — that single handler covers both the initial connect and every reconnect. The library does not replay subscriptions itself, which keeps one owner of (re)subscription and avoids a duplicate SUBSCRIBE on reconnect.

If no delays are configured, a default of 1 second is used. Calling mqtt.disconnect() explicitly disables auto-reconnect — only unexpected disconnects trigger it.

Status

Function Signature Description
mqtt.state(conn) (i32) -> string Connection state: "connecting", "connected", or "disconnected"
mqtt.ping(conn) (i32) -> void Send PINGREQ
mqtt.setTimeout(conn, ms) (i32, i32) -> void No-op, kept for API compatibility — the socket is permanently non-blocking
if (mqtt.state(conn) == "connected") {
    mqtt.ping(conn)
}

QoS Levels

Level Description
0 At most once — fire and forget
1 At least once — PUBACK acknowledged

QoS 2 (exactly once) is not supported in v0.1.

Pattern: Sensor Telemetry

package main

import "mqtt"
import "json"
import "time"

event MqttConnected {
    conn: i32
}

fn main():void {
    var conn: i32 = mqtt.open("192.168.1.10", 1883, "temp-sensor")
    assert(conn >= 0, "mqtt open failed")

    mqtt.on(conn, "connected", MqttConnected)

    on MqttConnected fn(e: MqttConnected): void {
        for (var i: i32 = 0; i < 10; i = i + 1) {
            var payload: string = json.stringify({
                "temp": 22.5,
                "ts": time.now()
            })
            mqtt.publish(e.conn, "sensors/living-room/temp", payload, 0)
        }
        mqtt.disconnect(e.conn)
    }

    runtime.keepAlive()
}

Pattern: Event-driven Command Listener

package main

import "mqtt"
import "json"

event MqttConnected {
    conn: i32
}

event MqttMessage {
    topic: string
    payload: string
}

fn main():void {
    var conn: i32 = mqtt.open("192.168.1.10", 1883, "actuator-01")
    assert(conn >= 0, "mqtt open failed")

    mqtt.on(conn, "connected", MqttConnected)
    mqtt.on(conn, "message", MqttMessage)

    on MqttConnected fn(e: MqttConnected): void {
        mqtt.subscribe(e.conn, "commands/actuator-01", 1)
    }

    on MqttMessage fn(e: MqttMessage): void {
        var cmd: dict = json.parse(e.payload)
        console.log(cmd["action"])
    }

    runtime.keepAlive()
}

Pattern: Resilient IoT Connection

package main

import "mqtt"
import "config"

event MqttConnected {
    conn: i32
}

event MqttDisconnected {
    conn: i32
}

event MqttMessage {
    topic: string
    payload: string
}

fn main():void {
    var host: string = config.getString("mqtt.broker", "192.168.1.10")
    var port: i32 = config.getNumber("mqtt.port", 1883)

    var conn: i32 = mqtt.open(host, port, "sensor-01")
    mqtt.setAutoReconnect(conn, true)
    mqtt.setReconnectDelays(conn, [1, 2, 5, 10, 30])

    mqtt.on(conn, "connected", MqttConnected)
    mqtt.on(conn, "disconnected", MqttDisconnected)
    mqtt.on(conn, "message", MqttMessage)

    on MqttConnected fn(e: MqttConnected): void {
        console.log("connected")
        mqtt.subscribe(e.conn, "sensors/#", 0)
    }

    on MqttDisconnected fn(e: MqttDisconnected): void {
        console.log("disconnected — will reconnect")
    }

    on MqttMessage fn(e: MqttMessage): void {
        console.log(e.payload)
    }

    runtime.keepAlive()
}

After broker restart or network glitch, the connection is automatically restored and the connected handler re-subscribes.

Notes

  • MQTT 3.1.1 binary protocol — no external library dependency
  • Clean session flag is always set
  • 60 second keepalive — the library sends PINGREQ automatically at half the interval, so a receive-only client is never dropped; mqtt.ping() is also available for manual pings
  • Up to 16 concurrent connections on host, 2 on ESP32 (smaller receive buffers on-device)
  • Non-blocking socket from open through close: connect, handshake, and packet reads are driven from a state machine on the event loop, so the VM never blocks on the network
  • Outbound packets are queued per connection and flushed as the socket drains; a packet is queued whole or rejected whole (mqtt.publish returns -1 under backpressure), so a slow network never tears a packet on the wire
  • Auto-reconnect with configurable backoff delays (max 8 entries, last value repeats)
  • Subscribe from the connected handler — it fires on every (re)connect, so subscriptions are re-established without the library replaying them
  • Explicit mqtt.disconnect() disables auto-reconnect
  • QoS 0 and 1 supported; QoS 2 planned
  • No TLS yet — planned for future version
  • On ESP32, uses lwIP socket layer
  • Event field order is verified at runtime via signature hash — see Events