Raspberrypi Pico Bluetooth Communication
RaspberryPi PICO Bluetooth communication
Suyash Singh
Posted by Suyash Singh
on March 15, 2024
Photo by Vishnu on Unsplash

Communicating with Bluetooth on Raspberry Pi Pico W

The Raspberry Pi Pico W offers powerful capabilities for wireless communication, making it an excellent choice for IoT projects. In this post, we will explore how to use Bluetooth to communicate with other systems, utilizing the Raspberry Pi Pico W and the provided Python code.

We will explore how to harness the capabilities of the Raspberry Pi Pico for Bluetooth communication to develop a personalized product. Using a simple reminders application as an example, we’ll delve into setting up Bluetooth functionality on the Pico. This will include tasks such as advertising services, handling connections, and exchanging data wirelessly. By the end, you’ll have a foundational understanding of integrating Bluetooth into your own projects using the Raspberry Pi Pico, paving the way for creating custom IoT solutions and applications.

Setting Up the Environment

We will use MicroPython with Thonny IDE for developing and deploying Bluetooth communication on the Raspberry Pi Pico W.

main.py

We are going to leverage several essential packages that are already integrated into our MicroPython environment: uasyncio, aioble, bluetooth, and machine.

  • uasyncio: Enables asynchronous programming, crucial for managing concurrent tasks such as Bluetooth communication and device interactions.

  • aioble: Provides Bluetooth Low Energy (BLE) functionalities, allowing our device to advertise services, handle connections, and exchange data wirelessly.

  • bluetooth: Offers basic Bluetooth functionalities, including UUID handling and connection management for setting up custom GATT services.

  • machine: Facilitates hardware control, essential for managing GPIO pins, controlling onboard LEDs, and interfacing with sensors or actuators connected to the Raspberry Pi Pico W.

These packages collectively empower us to create robust IoT applications that can communicate wirelessly, manage tasks, synchronize time, and control physical devices, all using the efficient and resource-constrained environment of MicroPython on the Raspberry Pi Pico W.

Here’s the complete main.py:

main.py
 
import sys
 
sys.path.append("")
 
from micropython import const
 
import uasyncio as asyncio
import aioble
import bluetooth
import machine
import random
import struct
import time
 
from reminders import Reminder
 
BLUETOOTH_DEVICE_NAME = "RPi Pico Remind Me"
TASK_SERVICE_UUID = bluetooth.UUID("0492fcec-7194-11eb-9439-0242ac130002")
TASK_CHARACTERISTIC_UUID = bluetooth.UUID("d4fe65e5-42e7-4616-9d13-06f24f72465f")
NEW_TASK_CHARACTERISTIC_UUID = bluetooth.UUID("d4fe65e5-42e7-4616-9d13-06f24f7246ff")
TIME_CHARACTERISTIC_UUID = bluetooth.UUID("d4fe65e5-42e7-4616-9d13-06f24f724fff")
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
_ADV_INTERVAL_MS = 250_000
TASKS = []
 
led = machine.Pin("LED", machine.Pin.OUT)
# Register GATT server.
task_service = aioble.Service(TASK_SERVICE_UUID)
task_characteristic = aioble.Characteristic(
    task_service, TASK_CHARACTERISTIC_UUID, read=True, notify=True)
new_task_characteristic = aioble.Characteristic(
    task_service, NEW_TASK_CHARACTERISTIC_UUID, read=True, notify=True, write=True, capture=True
)
time_characteristic = aioble.Characteristic(
    task_service, TIME_CHARACTERISTIC_UUID, read=True, notify=True, write=True, capture=True
)
aioble.register_services(task_service)
CONNECTION = None
 
async def peripheral_task():
    global CONNECTION
    while True:
        async with await aioble.advertise(
            _ADV_INTERVAL_MS,
            name=BLUETOOTH_DEVICE_NAME,
            services=[TASK_SERVICE_UUID],
            appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER,
        ) as connection:
            CONNECTION = connection
            print("Connection from", connection.device)
            
            #await connection.disconnected()
            while connection.is_connected():
                await new_task_characteristic.written()
                task = new_task_characteristic.read().decode('utf-8').split("&")
                append_to_tasks(task)
                await asyncio.sleep_ms(500)
            
            print("Disconnected")  
 
async def time_task():
    global CONNECTION
    while True:
        while CONNECTION and CONNECTION.is_connected():
            await time_characteristic.written()
            time = time_characteristic.read().decode('utf-8').split("&")
            set_time(time)
            await asyncio.sleep_ms(500)
        
        await asyncio.sleep_ms(500)
 
def set_time(_time):
    rtc = machine.RTC()
    year = int(_time[0])
    month = int(_time[1])
    date = int(_time[2])
    hours = int(_time[3])
    mins = int(float(_time[4])/1)
    seconds = int((float(_time[4])* 60) % 60)
    try:
        rtc.datetime((year, month, date, 0, hours, mins, seconds, 1))
    except:
        rtc.datetime((year, month, date, 0, hours, mins, 0, 20))
 
async def blink_task():
    """ Task to blink LED """
    
    global CONNECTION
    toggle = True
    while True:
        led.value(toggle)
        toggle = not toggle
        blink = 1000
        if CONNECTION and CONNECTION.is_connected():
            blink = 1000
        else:
            blink = 250
        await asyncio.sleep_ms(blink)
        
def append_to_tasks(task):
    global TASKS
    already_present = False
    id = task[0]
    if (task[3] == 'Y'):
        tasks = []
        for t in TASKS:
            if t.id == id:
                already_present = True
            if t.id != id:
                tasks.append(t)
        TASKS = tasks
        return
    
    if already_present:
        return
    
    interval_seconds = int(task[1])
    completed = 'N'
    periodic = task[2]
    new_task = Reminder(id, interval_seconds, completed, periodic)
    TASKS.append(new_task)
 
async def reminders_task():
    global CONNECTION
    while True:
        while CONNECTION and CONNECTION.is_connected():
            tasks_to_remind = get_tasks_to_remind()
            if not len(tasks_to_remind):
                task_characteristic.write('__$$NoTasks')
                await asyncio.sleep_ms(10)
            else:
                for v in tasks_to_remind:
                    task_characteristic.write(v)
                    await asyncio.sleep_ms(10)
            await asyncio.sleep_ms(100)
            tasks_to_remind = []
        await asyncio.sleep_ms(100)
 
def get_tasks_to_remind():
    global TASKS
    tasks_to_remind = []
    for T in TASKS:
        if (T.periodic == 'Y' or (T.periodic == 'N' and T.completed == 'N')) and T.is_time_for_reminder():
            tasks_to_remind.append(T.id)
            T.execute_reminder()
    return tasks_to_remind
 
async def main():
    t1 = asyncio.create_task(reminders_task())
    t2 = asyncio.create_task(peripheral_task())
    t3 = asyncio.create_task(time_task())
    t4 = asyncio.create_task(blink_task())
    await asyncio.gather(t1, t2, t3, t4)
 
asyncio.run(main())

Bluetooth Device and Service Initialization

We start by defining the Bluetooth device name and service UUIDs. The device will advertise itself under the name “RPi Pico Remind Me” and offer a custom service to manage tasks and time synchronization.

BLUETOOTH_DEVICE_NAME = "RPi Pico Remind Me"
TASK_SERVICE_UUID = bluetooth.UUID("0492fcec-7194-11eb-9439-0242ac130002")
TASK_CHARACTERISTIC_UUID = bluetooth.UUID("d4fe65e5-42e7-4616-9d13-06f24f72465f")
NEW_TASK_CHARACTERISTIC_UUID = bluetooth.UUID("d4fe65e5-42e7-4616-9d13-06f24f7246ff")
TIME_CHARACTERISTIC_UUID = bluetooth.UUID("d4fe65e5-42e7-4616-9d13-06f24f724fff")
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
_ADV_INTERVAL_MS = 250_000
TASKS = []
 
led = machine.Pin("LED", machine.Pin.OUT)

Next, we register the GATT (Generic Attribute Profile) server with the defined characteristics for tasks and time. This allows other Bluetooth devices to read, write, and receive notifications from these characteristics.

The Generic Attribute Profile (GATT) in Bluetooth defines a hierarchical data structure used to organize services, characteristics, and descriptors. It operates as a standardized way for Bluetooth Low Energy (BLE) devices to communicate data.

GATT organizes data into three main components:

  • Service: Represents a collection of related functionalities offered by a device. Services can include characteristics and descriptors.

  • Characteristic: Defines a data point with a UUID (Universal Unique Identifier) that represents a specific aspect of a service’s functionality. Characteristics can be read, written to, or notify/indicate data changes.

  • Descriptor: Provides additional metadata for a characteristic, such as its format or unit.

Together, these components enable devices to define and expose their capabilities in a structured manner, facilitating efficient and standardized communication between Bluetooth devices. GATT plays a crucial role in IoT and wearable applications, where devices need to interact seamlessly over low-power wireless connections.

# Register GATT server.
task_service = aioble.Service(TASK_SERVICE_UUID)
task_characteristic = aioble.Characteristic(
    task_service, TASK_CHARACTERISTIC_UUID, read=True, notify=True
)
new_task_characteristic = aioble.Characteristic(
    task_service, NEW_TASK_CHARACTERISTIC_UUID, read=True, notify=True, write=True, capture=True
)
time_characteristic = aioble.Characteristic(
    task_service, TIME_CHARACTERISTIC_UUID, read=True, notify=True, write=True, capture=True
)
aioble.register_services(task_service)
CONNECTION = None

Advertising and Handling Connections

The peripheral_task function advertises the Bluetooth service and handles incoming connections. When a connection is established, it listens for new task data.

async def peripheral_task():
    global CONNECTION
    while True:
        async with await aioble.advertise(
            _ADV_INTERVAL_MS,
            name=BLUETOOTH_DEVICE_NAME,
            services=[TASK_SERVICE_UUID],
            appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER,
        ) as connection:
            CONNECTION = connection
            print("Connection from", connection.device)
            
            while connection.is_connected():
                await new_task_characteristic.written()
                task = new_task_characteristic.read().decode('utf-8').split("&")
                append_to_tasks(task)
                await asyncio.sleep_ms(500)
            
            print("Disconnected")

Time Synchronization

The time_task function synchronizes the reminder’s time with the connected Bluetooth device.

async def time_task():
    global CONNECTION
    while True:
        while CONNECTION and CONNECTION.is_connected():
            await time_characteristic.written()
            time = time_characteristic.read().decode('utf-8').split("&")
            set_time(time)
            await asyncio.sleep_ms(500)
        
        await asyncio.sleep_ms(500)
 
def set_time(_time):
    rtc = machine.RTC()
    year = int(_time[0])
    month = int(_time[1])
    date = int(_time[2])
    hours = int(_time[3])
    mins = int(float(_time[4])/1)
    seconds = int((float(_time[4]) * 60) % 60)
    try:
        rtc.datetime((year, month, date, 0, hours, mins, seconds, 1))
    except:
        rtc.datetime((year, month, date, 0, hours, mins, 0, 20))

Blinking LED Indicator

The blink_task function blinks the onboard LED to indicate the connection status.

async def blink_task():
    """ Task to blink LED """
    
    global CONNECTION
    toggle = True
    while True:
        led.value(toggle)
        toggle = not toggle
        blink = 1000
        if CONNECTION and CONNECTION.is_connected():
            blink = 1000
        else:
            blink = 250
        await asyncio.sleep_ms(blink)

Managing Tasks

Tasks are managed through the append_to_tasks and reminders_task functions. The append_to_tasks function adds new tasks, while reminders_task sends reminders to the connected device.

def append_to_tasks(task):
    global TASKS
    already_present = False
    id = task[0]
    if (task[3] == 'Y'):
        tasks = []
        for t in TASKS:
            if t.id == id:
                already_present = True
            if t.id != id:
                tasks.append(t)
        TASKS = tasks
        return
    
    if already_present:
        return
    
    interval_seconds = int(task[1])
    completed = 'N'
    periodic = task[2]
    new_task = Reminder(id, interval_seconds, completed, periodic)
    TASKS.append(new_task)
 
async def reminders_task():
    global CONNECTION
    while True:
        while CONNECTION and CONNECTION.is_connected():
            tasks_to_remind = get_tasks_to_remind()
            if not len(tasks_to_remind):
                task_characteristic.write('__$$NoTasks')
                await asyncio.sleep_ms(10)
            else:
                for v in tasks_to_remind:
                    task_characteristic.write(v)
                    await asyncio.sleep_ms(10)
            await asyncio.sleep_ms(100)
            tasks_to_remind = []
        await asyncio.sleep_ms(100)
 
def get_tasks_to_remind():
    global TASKS
    tasks_to_remind = []
    for T in TASKS:
        if (T.periodic == 'Y' or (T.periodic == 'N' and T.completed == 'N')) and T.is_time_for_reminder():
            tasks_to_remind.append(T.id)
            T.execute_reminder()
    return tasks_to_remind

Main Function

Finally, the main function coordinates all the tasks and starts the event loop.

async def main():
    t1 = asyncio.create_task(reminders_task())
    t2 = asyncio.create_task(peripheral_task())
    t3 = asyncio.create_task(time_task())
    t4 = asyncio.create_task(blink_task())
    await asyncio.gather(t1, t2, t3, t4)
 
asyncio.run(main())

reminders.py

To keep the structure of our reminders separate let’s create a new reminders.py class and define the Reminder class.

reminders.py
import time
 
class Reminder:
    def __init__(self, id, interval_seconds, completed, periodic):
        self.id = id
        self.interval_seconds = interval_seconds
        self.periodic = periodic
        self.completed = completed
        self.last_reminder_time = time.time()
 
    def is_time_for_reminder(self):
        current_time = time.time()
        elapsed_time = current_time - self.last_reminder_time
        return elapsed_time >= self.interval_seconds
 
    def execute_reminder(self):
        if self.is_time_for_reminder():
            self.last_reminder_time = time.time()
            if self.periodic == 'N':
                self.completed = 'Y'
  

By following these steps, you can set up Bluetooth communication on your Raspberry Pi Pico W, allowing it to manage tasks and synchronize time with other Bluetooth devices. This setup is highly customizable and can be adapted to suit various IoT applications.