MQTT

★本單元必須先對 MQTT 協定有基本認識才有辦法搞定。

首先需要在 Pico W 中安裝 MQTT 相關的函數庫。MicroPython 提供了兩個函數庫 simplerobust,差異在於如果與 MQTT server 斷線後,robust 模組會自動嘗試重新連線,而 simple 必須自行處理斷線後的重新連線。安裝方式為在 Pico W 中建立 /lib/umqtt 資料夾,然後從 GitHub 上將 simple 或 robust 的 umqtt 資料夾中的 .py 檔儲存到 /lib/umqtt/ 資料夾中。

使用 MQTT 發佈或訂閱資料前,網路必須先連線完成,下面的程式碼均不含網路連線程式,這部分請自行參考前面的 Wi-Fi 單元。

發佈

下面這個範例會送出一個訊息到 mytopic 這個主題。

import umqtt.simple as umqtt

CLIENT_ID = 'my_client_name'

client = umqtt.MQTTClient(CLIENT_ID, 'server_ip_address')
client.connect()
client.publish('mytopic', 'Hello, World!')
client.disconnect()

要特別注意的是,同一個 MQTT server,所有連線的 client 端 ID 都必須唯一,若重複會導致連線出問題,所以 client 端 ID 最好來自於晶片 ID,這樣就保證不重複了,程式碼如下,建議使用。

import ubinascii
import machine

CLIENT_ID = ubinascii.hexlify(machine.unique_id())

若以 mosquitto 指令為例,在終端機執行下列指令即可收到 Hello, World! 字串

$ mosquitto_sub -h [server_ip_address] -t mytopic

訂閱

訂閱時,最重要的是自行撰寫一個 callback 函數,例如 sub_cb(),固定接受兩個參數,第一個參數存放收到的 topic,第二個參數存放收到的 message,然後將這個 callback 函數透過 set_callback() 完成註冊。之後只要發佈者將訊息發佈到 sub_topic 這個主題時,sub_cb() 就會被呼叫了。

import ubinascii
import machine
import umqtt.simple as umqtt

CLIENT_ID = ubinascii.hexlify(machine.unique_id())

def sub_cb(topic, msg):
    print((topic, msg))
    
client = umqtt.MQTTClient(CLIENT_ID, 'server_ip_address')
client.set_callback(sub_cb)
client.connect()
client.subscribe('sub_topic')

while True:
    client.wait_msg()

除了 wait_msg() 外,另外還有一個函數 check_msg(),兩者差異在於 wait_msg() 為 blocking 模式,所以只要沒有訊息進來,程式碼會持續等在這一行,如果不希望等待,則可使用 check_msg(),這個函數為 non-blocking 模式。

若以 mosquitto 指令為例,在終端機執行下列指令即可發出訊息給 Pico W 晶片。

$ mosquitto_pub -h [server_ip_address] -t sub_topic -m "hello"

同時訂閱與發佈

下面這個範例會每隔 5 秒鐘將內建的溫度感測器資料傳出去(溫度計算公式為樹莓派官方提供,照抄即可),並且同時接收 LED on/off 的指令。這裡使用 umqtt 的 robust 模組,並且特別留意程式碼中不包含 Wi-Fi 連線有關的程式碼。

import ubinascii
import umqtt.simple as umqtt
import utime
from machine import *

led = Pin("LED", Pin.OUT)
sensor_temp = ADC(4)
conversion_factor = 3.3 / 65535

def get_temperature():
    reading = sensor_temp.read_u16() * conversion_factor
    temperature = 27 - (reading - 0.706) / 0.001721
    return str(temperature)
    
def send_message(client):
    temp = get_temperature()
    client.publish('temp', temp)
    print(temp)
    
def sub_cb(topic, msg):
    print((topic, msg))
    if topic == b'led':
        if msg == b'on':
            led.on()
        if msg == b'off':
            led.off()

def mqtt_init():
    CLIENT_ID = ubinascii.hexlify(unique_id())
    client = umqtt.MQTTClient(CLIENT_ID, 'server_ip_address')
    client.set_callback(sub_cb)
    client.connect(clean_session=False)
    client.subscribe('led')
    return client

def main():
    client = mqtt_init()
    begin_time = utime.ticks_ms()
    while True:
        client.check_msg()    
        if (utime.ticks_ms() - begin_time) > 5000:        
            send_message(client)
            begin_time = utime.ticks_ms()
        utime.sleep(0.5)    

main()

後續補充

過去在 ESP8266 上的經驗,若同時撰寫發佈與訂閱程式碼,程式運作會變的很不穩定,有時會讓整個晶片當掉或重置。也許現在 Pico 的 MicroPython 韌體穩定性會比 ESP8266 來的好,並且 Pico 為雙核心 CPU,若將發佈與訂閱分別透過多執行緒放到兩個不同的核心去執行,或許程式穩定性與效能會來的更高,但到目前為止,我還沒試過,有興趣的朋友可以試試看。

★本系列收錄於「第一次就上手」選單

發表迴響