BLE Peripheral

這篇文章以 Pico W 實做藍牙周邊端,用來提供藍牙相關服務。這篇文章並未解釋 BLE 與 GATT 架構,不熟悉這部分的讀者,請先從其他書籍或文件熟悉藍牙低功耗相關背景知識,再來看這篇文章會比較適合。

雖然 MicroPython 現在已經內建 bluetooth 函數庫(請參考文件),但還是需要下載一個好心人函數庫,負責發送廣告封包好讓中央端(例如手機)可以掃描到這部藍牙裝置。只需要下載 ble_advertising.py 這個檔案並存在 Pico W 中就可以了。

除了上面這支用來廣播的程式外,再下載下面這支程式,並存成 ble_peripheral.py。這支程式是我寫好的函數庫,方便主程式呼叫使用。

#### BLE library, created by KoKang Chu, version: 0.1
#### must save as "ble_peripheral.py"

from ble_advertising import advertising_payload
from micropython import const
import struct
import bluetooth

_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_GATTS_INDICATE_DONE = const(20)
_IRQ_MTU_EXCHANGED = const(21)

class BGATTService:
    def __init__(self, uuidString):
        self.uuidString = uuidString
        self.uuid = bluetooth.UUID(uuidString)
        self.chars = []
        
    def add(self, char):
        self.chars.append(char)
        
    def combine(self):
        tmp = []
        for char in self.chars:
            tmp.append(char.combine())
            
        return (self.uuid, tuple(tmp))            

class BGATTCharacteristic:
    def __init__(self, uuidString, properties):
        self.uuidString = uuidString
        self.uuid = bluetooth.UUID(uuidString)
        self.props = properties
        
    def combine(self):
        return (self.uuid, self.props)

class BPeripheralManager:
    central_connection = None
    chars = {}

    def __init__(self, services, device_name, read_callback=None, write_callback=None):
        self.read_callback = read_callback
        self.write_callback = write_callback
        self.init_gatt(services, device_name)
        
    def init_gatt(self, services, device_name):                
        self.peripheral = bluetooth.BLE()
        self.peripheral.active(True)
        self.peripheral.irq(self.bt_irq)
        
        all_services = []
        all_services_uuid = []
        for service in services:
            all_services.append(service.combine())
            all_services_uuid.append(service.uuid)
            
        ret = self.peripheral.gatts_register_services(all_services)
        for service_index, handles in enumerate(ret):
            for char_index, handle in enumerate(handles):
                self.chars[services[service_index].chars[char_index].uuidString] = handle            
        payload = advertising_payload(name=device_name, services=all_services_uuid)
        print('======== characteristic list =======')
        print(self.chars)
        print('=====================================')
        self.start_advertising(payload=payload)

    def __get_uuid_string(self, char_handle):
        for index, handle in enumerate(self.chars.values()):
            if handle == char_handle:
                return list(self.chars.keys())[index]

    def start_advertising(self, millisecond=500, payload=None):
        print('start advertising')
        self.peripheral.gap_advertise(millisecond * 1000, adv_data=payload)
        
    def stop_advertising(self):
        print('stop advertising')
        self.peripheral.gap_advertise(None)
        
    def write(self, uuidString, data):
        # write data to central
        self.peripheral.gatts_write(self.chars[uuidString], data)
        
    def notify(self, uuidString):
        # notify data
        if self.connection_ok():
            data = self.read_callback(uuidString)
            self.peripheral.gatts_notify(self.central_connection, self.chars[uuidString], data)

    def indicate(self, uuidString):
        # indicate data
        if self.connection_ok():
            data = self.read_callback(uuidString)
            self.write(uuidString, data)
            self.peripheral.gatts_indicate(self.central_connection, self.chars[uuidString])

    def connection_ok(self):
        return self.central_connection != None
    
    def bt_irq(self, event, data):
        if event == _IRQ_CENTRAL_CONNECT:
            (self.central_connection, addr_type, addr) = data
            print(f'central {self.central_connection} connect, addr type is {addr_type}, addr is {bytes(addr)}')
            self.stop_advertising()

        elif event == _IRQ_CENTRAL_DISCONNECT:
            print('central disconnect')
            self.central_connection = None
            self.start_advertising()
            
        elif event == _IRQ_GATTS_WRITE:
            (_, at) = data
            print(f'"{self.__get_uuid_string(at)}" received data from central')
            body = self.peripheral.gatts_read(at)
            uuidString = self.__get_uuid_string(at)
            self.write_callback(uuidString, body)
                
        elif event == _IRQ_GATTS_READ_REQUEST:
            (_, at) = data
            print(f'received "{self.__get_uuid_string(at)}" read request')
            uuidString = self.__get_uuid_string(at)
            if uuidString != None:
                data = self.read_callback(uuidString)
                self.write(uuidString, data)
            else:
                print(f'uuid not found: {data}')
            
        elif event == _IRQ_GATTS_INDICATE_DONE:
            (_, at, value) = data
            print(f'received "{self.__get_uuid_string(at)}" indicate done, status code is {value}')
    
        elif event == _IRQ_MTU_EXCHANGED:
            (_, mtu) = data
            print(f'MTU: {mtu}')
            
        else:
            print(f'other event number: {event}')

接下來撰寫主程式。這裡將 Pico W 內建的溫度感測器資料,透過 BLE 中的 notify 與 read 方式送到 Central 端,並且接受 Central 端送來的 0 與 1 資料,點亮或關閉 Pico W 上內建的 LED。

from machine import *
from ble_peripheral import *
from time import *

led = Pin("LED", Pin.OUT)
sensor_temp = ADC(4)

SERV_UUID = 'F6706ACE-53AB-4F56-889B-1FF969DDB957'
CHAR_LED = 'B13BAAC6-D487-419E-9325-E475B14AE6B6'
CHAR_TEMP = 'DBE4BCD8-AF4E-43C0-8EF5-3426BD2B03D0'

## 取得溫度資料
def get_temperature():
    conversion_factor = 3.3 / 65535
    reading = sensor_temp.read_u16() * conversion_factor
    temperature = 27 - (reading - 0.706) / 0.001721
    return temperature

## 收到Central端的read request
def cb_read(uuidString):
    value = get_temperature()
    print('>> temperature is %0.2f' % value)
    data = struct.pack('>h', int(value * 100))
    return data

## 收到Central端的write request
def cb_write(uuidString, data):
    print((uuidString, data))
    if data == b'\x00':
        led.off()
    if data == b'\x01':
        led.on()
    
def main():
    service = BGATTService(SERV_UUID)

    service.add( BGATTCharacteristic( CHAR_LED,
        bluetooth.FLAG_WRITE_NO_RESPONSE
    ))
    service.add( BGATTCharacteristic( CHAR_TEMP,
        bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY | bluetooth.FLAG_INDICATE
    ))

    pm = BPeripheralManager((service,), 'mypico', cb_read, cb_write)
    while True:
        pm.notify(CHAR_TEMP)
        #pm.indicate(CHAR_TEMP)
        sleep(5)

main()

測試

請在手機上安裝 LightBlue App。這時 Pico W 上撰寫的程式先執行,然後開啟 LightBlue App,應該在藍牙清單中可以看到 mypico 這個裝置,點選 mypico 進行連線後可以看到兩個 Characteristic,接下來就可以試著讀取溫度資料與發送 0 或 1 來控制 LED 亮滅了。

後續

目前 MicroPython 在 Pico W 上的 bluetooth 函數庫雖然可以使用但還不完整,像是配對、加密或是 Direct Adv. 與收到訂閱或取消訂閱 notify 的通知都還欠缺,當然拿來進行專題應用或雛形系統應該綽綽有餘,但實際系統開發就還有一段不小的路要走了。

發表迴響