mqtt
MQTT 3.1.1 client. Connect to brokers, publish messages, subscribe to topics, receive messages. Hand-written binary protocol implementation with no external dependencies.
| Version | v0.1 |
| Platform | PC, ESP32 |
| Type | Native (C) |
star.mod
Usage
Functions
Open / Close
| Function | Signature | Description |
|---|---|---|
mqtt.open(host, port, clientId) |
(string, i32, string) -> i32 |
Start async connection to broker, returns handle (-1 on error) |
mqtt.openAuth(host, port, clientId, user, pass) |
(string, i32, string, string, string) -> i32 |
Start async connection with username/password, returns handle (-1 on error) |
mqtt.disconnect(conn) |
(i32) -> void |
Send DISCONNECT and close |
var conn: i32 = mqtt.open("192.168.1.10", 1883, "sensor-01")
# ... use connection ...
mqtt.disconnect(conn)
mqtt.open() and mqtt.openAuth() are non-blocking — the call returns the handle immediately and connection proceeds asynchronously. The connected event fires when the broker connection is established. Register events with mqtt.on() before or after calling open().
Open only after the network is up. On ESP32 the IP stack does not exist until WiFi has an address, so calling open()/openAuth() before then returns -1 rather than connecting (it used to crash the device). The idiomatic pattern is to open from the WiFi gotip handler — see wifi. Enable auto-reconnect and the connection is also rebuilt on its own after a WiFi outage.
Publish / Subscribe
| Function | Signature | Description |
|---|---|---|
mqtt.publish(conn, topic, payload, qos) |
(i32, string, string, i32) -> i32 |
Publish message (0=queued, -1=outbound buffer full) |
mqtt.subscribe(conn, topic, qos) |
(i32, string, i32) -> i32 |
Subscribe to topic filter (0=queued, -1=outbound buffer full) |
For receiving messages, use mqtt.on() with the event system — see Lifecycle Events below.
mqtt.publish() never blocks and never writes a partial packet. The packet is appended to a per-connection outbound buffer and flushed to the socket as it drains; if the buffer is full (the network cannot keep up with the publish rate), the call returns -1 and the packet is not queued — this is backpressure, not a fatal error, so retry it on a later event-loop turn. The connection stays open. A return 0 means the packet was queued whole, not that the broker has received it yet.
mqtt.subscribe() is fire-and-forget — the SUBACK is handled by the watcher and emits a Subscribed event.
Lifecycle Events
| Function | Signature | Description |
|---|---|---|
mqtt.on(conn, event, EventType) |
(i32, string, i32) -> i32 |
Bind lifecycle/data event to declared event type (0=success, -1=error) |
Supported events: "connected", "disconnected", "subscribed", "message".
event MqttConnected {
conn: i32
}
event MqttDisconnected {
conn: i32
}
event MqttSubscribed {
conn: i32
topic: string
}
event MqttMessage {
topic: string
payload: string
}
mqtt.on(conn, "connected", MqttConnected)
mqtt.on(conn, "disconnected", MqttDisconnected)
mqtt.on(conn, "subscribed", MqttSubscribed)
mqtt.on(conn, "message", MqttMessage)
on MqttConnected fn(e: MqttConnected): void {
mqtt.subscribe(e.conn, "sensors/#", 0)
}
on MqttMessage fn(e: MqttMessage): void {
console.log(e.topic)
console.log(e.payload)
}
runtime.keepAlive()
mqtt.on registers lifecycle and data events for the connection. The native layer emits the appropriate event struct when the lifecycle transition or incoming data occurs. See Events for full details.
Auto-Reconnect
| Function | Signature | Description |
|---|---|---|
mqtt.setAutoReconnect(conn, enabled) |
(i32, bool) -> void |
Enable/disable automatic reconnection on unexpected disconnect |
mqtt.setReconnectDelays(conn, delays) |
(i32, array) -> void |
Set backoff delay schedule in seconds (max 8 entries) |
var conn: i32 = mqtt.open("192.168.1.10", 1883, "sensor-01")
mqtt.setAutoReconnect(conn, true)
mqtt.setReconnectDelays(conn, [1, 2, 5, 10, 30])
When auto-reconnect is enabled and the connection drops unexpectedly:
MqttDisconnectedevent fires- After the first delay (1s), the native layer attempts to reconnect
- If reconnect fails, the next delay in the array is used (2s, 5s, 10s...)
- The last delay value repeats indefinitely until connection succeeds
- On successful reconnect,
MqttConnectedfires again - The retry index resets to 0 on success
The MqttConnected event fires on every connect, including reconnects, so subscribe from its handler — that single handler covers both the initial connect and every reconnect. The library does not replay subscriptions itself, which keeps one owner of (re)subscription and avoids a duplicate SUBSCRIBE on reconnect.
If no delays are configured, a default of 1 second is used. Calling mqtt.disconnect() explicitly disables auto-reconnect — only unexpected disconnects trigger it.
Status
| Function | Signature | Description |
|---|---|---|
mqtt.state(conn) |
(i32) -> string |
Connection state: "connecting", "connected", or "disconnected" |
mqtt.ping(conn) |
(i32) -> void |
Send PINGREQ |
mqtt.setTimeout(conn, ms) |
(i32, i32) -> void |
No-op, kept for API compatibility — the socket is permanently non-blocking |
QoS Levels
| Level | Description |
|---|---|
| 0 | At most once — fire and forget |
| 1 | At least once — PUBACK acknowledged |
QoS 2 (exactly once) is not supported in v0.1.
Pattern: Sensor Telemetry
package main
import "mqtt"
import "json"
import "time"
event MqttConnected {
conn: i32
}
fn main():void {
var conn: i32 = mqtt.open("192.168.1.10", 1883, "temp-sensor")
assert(conn >= 0, "mqtt open failed")
mqtt.on(conn, "connected", MqttConnected)
on MqttConnected fn(e: MqttConnected): void {
for (var i: i32 = 0; i < 10; i = i + 1) {
var payload: string = json.stringify({
"temp": 22.5,
"ts": time.now()
})
mqtt.publish(e.conn, "sensors/living-room/temp", payload, 0)
}
mqtt.disconnect(e.conn)
}
runtime.keepAlive()
}
Pattern: Event-driven Command Listener
package main
import "mqtt"
import "json"
event MqttConnected {
conn: i32
}
event MqttMessage {
topic: string
payload: string
}
fn main():void {
var conn: i32 = mqtt.open("192.168.1.10", 1883, "actuator-01")
assert(conn >= 0, "mqtt open failed")
mqtt.on(conn, "connected", MqttConnected)
mqtt.on(conn, "message", MqttMessage)
on MqttConnected fn(e: MqttConnected): void {
mqtt.subscribe(e.conn, "commands/actuator-01", 1)
}
on MqttMessage fn(e: MqttMessage): void {
var cmd: dict = json.parse(e.payload)
console.log(cmd["action"])
}
runtime.keepAlive()
}
Pattern: Resilient IoT Connection
package main
import "mqtt"
import "config"
event MqttConnected {
conn: i32
}
event MqttDisconnected {
conn: i32
}
event MqttMessage {
topic: string
payload: string
}
fn main():void {
var host: string = config.getString("mqtt.broker", "192.168.1.10")
var port: i32 = config.getNumber("mqtt.port", 1883)
var conn: i32 = mqtt.open(host, port, "sensor-01")
mqtt.setAutoReconnect(conn, true)
mqtt.setReconnectDelays(conn, [1, 2, 5, 10, 30])
mqtt.on(conn, "connected", MqttConnected)
mqtt.on(conn, "disconnected", MqttDisconnected)
mqtt.on(conn, "message", MqttMessage)
on MqttConnected fn(e: MqttConnected): void {
console.log("connected")
mqtt.subscribe(e.conn, "sensors/#", 0)
}
on MqttDisconnected fn(e: MqttDisconnected): void {
console.log("disconnected — will reconnect")
}
on MqttMessage fn(e: MqttMessage): void {
console.log(e.payload)
}
runtime.keepAlive()
}
After broker restart or network glitch, the connection is automatically restored and the connected handler re-subscribes.
Notes
- MQTT 3.1.1 binary protocol — no external library dependency
- Clean session flag is always set
- 60 second keepalive — the library sends PINGREQ automatically at half the interval, so a receive-only client is never dropped;
mqtt.ping()is also available for manual pings - Up to 16 concurrent connections on host, 2 on ESP32 (smaller receive buffers on-device)
- Non-blocking socket from open through close: connect, handshake, and packet reads are driven from a state machine on the event loop, so the VM never blocks on the network
- Outbound packets are queued per connection and flushed as the socket drains; a
packet is queued whole or rejected whole (
mqtt.publishreturns -1 under backpressure), so a slow network never tears a packet on the wire - Auto-reconnect with configurable backoff delays (max 8 entries, last value repeats)
- Subscribe from the
connectedhandler — it fires on every (re)connect, so subscriptions are re-established without the library replaying them - Explicit
mqtt.disconnect()disables auto-reconnect - QoS 0 and 1 supported; QoS 2 planned
- No TLS yet — planned for future version
- On ESP32, uses
lwIPsocket layer - Event field order is verified at runtime via signature hash — see Events