Communicating with Bluetooth on Raspberry Pi Pico W
The Raspberry Pi Pico W offers powerful capabilities for wireless communication, making it an excellent choice for IoT projects. In this post, we will explore how to use Bluetooth to communicate with other systems, utilizing the Raspberry Pi Pico W and the provided Python code.
We will explore how to harness the capabilities of the Raspberry Pi Pico for Bluetooth communication to develop a personalized product. Using a simple reminders application as an example, we’ll delve into setting up Bluetooth functionality on the Pico. This will include tasks such as advertising services, handling connections, and exchanging data wirelessly. By the end, you’ll have a foundational understanding of integrating Bluetooth into your own projects using the Raspberry Pi Pico, paving the way for creating custom IoT solutions and applications.
Setting Up the Environment
We will use MicroPython
with Thonny IDE
for developing and deploying Bluetooth communication on the Raspberry Pi Pico W.
main.py
We are going to leverage several essential packages that are already integrated into our MicroPython environment: uasyncio
, aioble
, bluetooth
, and machine
.
-
uasyncio
: Enables asynchronous programming, crucial for managing concurrent tasks such as Bluetooth communication and device interactions. -
aioble
: Provides Bluetooth Low Energy (BLE) functionalities, allowing our device to advertise services, handle connections, and exchange data wirelessly. -
bluetooth
: Offers basic Bluetooth functionalities, including UUID handling and connection management for setting up custom GATT services. -
machine
: Facilitates hardware control, essential for managing GPIO pins, controlling onboard LEDs, and interfacing with sensors or actuators connected to the Raspberry Pi Pico W.
These packages collectively empower us to create robust IoT applications that can communicate wirelessly, manage tasks, synchronize time, and control physical devices, all using the efficient and resource-constrained environment of MicroPython on the Raspberry Pi Pico W.
Here’s the complete main.py
:
import sys
sys.path.append("")
from micropython import const
import uasyncio as asyncio
import aioble
import bluetooth
import machine
import random
import struct
import time
from reminders import Reminder
BLUETOOTH_DEVICE_NAME = "RPi Pico Remind Me"
TASK_SERVICE_UUID = bluetooth.UUID("0492fcec-7194-11eb-9439-0242ac130002")
TASK_CHARACTERISTIC_UUID = bluetooth.UUID("d4fe65e5-42e7-4616-9d13-06f24f72465f")
NEW_TASK_CHARACTERISTIC_UUID = bluetooth.UUID("d4fe65e5-42e7-4616-9d13-06f24f7246ff")
TIME_CHARACTERISTIC_UUID = bluetooth.UUID("d4fe65e5-42e7-4616-9d13-06f24f724fff")
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
_ADV_INTERVAL_MS = 250_000
TASKS = []
led = machine.Pin("LED", machine.Pin.OUT)
# Register GATT server.
task_service = aioble.Service(TASK_SERVICE_UUID)
task_characteristic = aioble.Characteristic(
task_service, TASK_CHARACTERISTIC_UUID, read=True, notify=True)
new_task_characteristic = aioble.Characteristic(
task_service, NEW_TASK_CHARACTERISTIC_UUID, read=True, notify=True, write=True, capture=True
)
time_characteristic = aioble.Characteristic(
task_service, TIME_CHARACTERISTIC_UUID, read=True, notify=True, write=True, capture=True
)
aioble.register_services(task_service)
CONNECTION = None
async def peripheral_task():
global CONNECTION
while True:
async with await aioble.advertise(
_ADV_INTERVAL_MS,
name=BLUETOOTH_DEVICE_NAME,
services=[TASK_SERVICE_UUID],
appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER,
) as connection:
CONNECTION = connection
print("Connection from", connection.device)
#await connection.disconnected()
while connection.is_connected():
await new_task_characteristic.written()
task = new_task_characteristic.read().decode('utf-8').split("&")
append_to_tasks(task)
await asyncio.sleep_ms(500)
print("Disconnected")
async def time_task():
global CONNECTION
while True:
while CONNECTION and CONNECTION.is_connected():
await time_characteristic.written()
time = time_characteristic.read().decode('utf-8').split("&")
set_time(time)
await asyncio.sleep_ms(500)
await asyncio.sleep_ms(500)
def set_time(_time):
rtc = machine.RTC()
year = int(_time[0])
month = int(_time[1])
date = int(_time[2])
hours = int(_time[3])
mins = int(float(_time[4])/1)
seconds = int((float(_time[4])* 60) % 60)
try:
rtc.datetime((year, month, date, 0, hours, mins, seconds, 1))
except:
rtc.datetime((year, month, date, 0, hours, mins, 0, 20))
async def blink_task():
""" Task to blink LED """
global CONNECTION
toggle = True
while True:
led.value(toggle)
toggle = not toggle
blink = 1000
if CONNECTION and CONNECTION.is_connected():
blink = 1000
else:
blink = 250
await asyncio.sleep_ms(blink)
def append_to_tasks(task):
global TASKS
already_present = False
id = task[0]
if (task[3] == 'Y'):
tasks = []
for t in TASKS:
if t.id == id:
already_present = True
if t.id != id:
tasks.append(t)
TASKS = tasks
return
if already_present:
return
interval_seconds = int(task[1])
completed = 'N'
periodic = task[2]
new_task = Reminder(id, interval_seconds, completed, periodic)
TASKS.append(new_task)
async def reminders_task():
global CONNECTION
while True:
while CONNECTION and CONNECTION.is_connected():
tasks_to_remind = get_tasks_to_remind()
if not len(tasks_to_remind):
task_characteristic.write('__$$NoTasks')
await asyncio.sleep_ms(10)
else:
for v in tasks_to_remind:
task_characteristic.write(v)
await asyncio.sleep_ms(10)
await asyncio.sleep_ms(100)
tasks_to_remind = []
await asyncio.sleep_ms(100)
def get_tasks_to_remind():
global TASKS
tasks_to_remind = []
for T in TASKS:
if (T.periodic == 'Y' or (T.periodic == 'N' and T.completed == 'N')) and T.is_time_for_reminder():
tasks_to_remind.append(T.id)
T.execute_reminder()
return tasks_to_remind
async def main():
t1 = asyncio.create_task(reminders_task())
t2 = asyncio.create_task(peripheral_task())
t3 = asyncio.create_task(time_task())
t4 = asyncio.create_task(blink_task())
await asyncio.gather(t1, t2, t3, t4)
asyncio.run(main())
Bluetooth Device and Service Initialization
We start by defining the Bluetooth device name and service UUIDs. The device will advertise itself under the name “RPi Pico Remind Me” and offer a custom service to manage tasks and time synchronization.
BLUETOOTH_DEVICE_NAME = "RPi Pico Remind Me"
TASK_SERVICE_UUID = bluetooth.UUID("0492fcec-7194-11eb-9439-0242ac130002")
TASK_CHARACTERISTIC_UUID = bluetooth.UUID("d4fe65e5-42e7-4616-9d13-06f24f72465f")
NEW_TASK_CHARACTERISTIC_UUID = bluetooth.UUID("d4fe65e5-42e7-4616-9d13-06f24f7246ff")
TIME_CHARACTERISTIC_UUID = bluetooth.UUID("d4fe65e5-42e7-4616-9d13-06f24f724fff")
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
_ADV_INTERVAL_MS = 250_000
TASKS = []
led = machine.Pin("LED", machine.Pin.OUT)
Next, we register the GATT (Generic Attribute Profile)
server with the defined characteristics for tasks and time. This allows other Bluetooth devices to read, write, and receive notifications from these characteristics.
The Generic Attribute Profile (GATT) in Bluetooth defines a hierarchical data structure used to organize services, characteristics, and descriptors. It operates as a standardized way for Bluetooth Low Energy (BLE) devices to communicate data.
GATT organizes data into three main components:
-
Service: Represents a collection of related functionalities offered by a device. Services can include characteristics and descriptors.
-
Characteristic: Defines a data point with a UUID (Universal Unique Identifier) that represents a specific aspect of a service’s functionality. Characteristics can be read, written to, or notify/indicate data changes.
-
Descriptor: Provides additional metadata for a characteristic, such as its format or unit.
Together, these components enable devices to define and expose their capabilities in a structured manner, facilitating efficient and standardized communication between Bluetooth devices. GATT plays a crucial role in IoT and wearable applications, where devices need to interact seamlessly over low-power wireless connections.
# Register GATT server.
task_service = aioble.Service(TASK_SERVICE_UUID)
task_characteristic = aioble.Characteristic(
task_service, TASK_CHARACTERISTIC_UUID, read=True, notify=True
)
new_task_characteristic = aioble.Characteristic(
task_service, NEW_TASK_CHARACTERISTIC_UUID, read=True, notify=True, write=True, capture=True
)
time_characteristic = aioble.Characteristic(
task_service, TIME_CHARACTERISTIC_UUID, read=True, notify=True, write=True, capture=True
)
aioble.register_services(task_service)
CONNECTION = None
Advertising and Handling Connections
The peripheral_task
function advertises the Bluetooth service and handles incoming connections. When a connection is established, it listens for new task data.
async def peripheral_task():
global CONNECTION
while True:
async with await aioble.advertise(
_ADV_INTERVAL_MS,
name=BLUETOOTH_DEVICE_NAME,
services=[TASK_SERVICE_UUID],
appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER,
) as connection:
CONNECTION = connection
print("Connection from", connection.device)
while connection.is_connected():
await new_task_characteristic.written()
task = new_task_characteristic.read().decode('utf-8').split("&")
append_to_tasks(task)
await asyncio.sleep_ms(500)
print("Disconnected")
Time Synchronization
The time_task
function synchronizes the reminder’s time with the connected Bluetooth device.
async def time_task():
global CONNECTION
while True:
while CONNECTION and CONNECTION.is_connected():
await time_characteristic.written()
time = time_characteristic.read().decode('utf-8').split("&")
set_time(time)
await asyncio.sleep_ms(500)
await asyncio.sleep_ms(500)
def set_time(_time):
rtc = machine.RTC()
year = int(_time[0])
month = int(_time[1])
date = int(_time[2])
hours = int(_time[3])
mins = int(float(_time[4])/1)
seconds = int((float(_time[4]) * 60) % 60)
try:
rtc.datetime((year, month, date, 0, hours, mins, seconds, 1))
except:
rtc.datetime((year, month, date, 0, hours, mins, 0, 20))
Blinking LED Indicator
The blink_task function blinks the onboard LED to indicate the connection status.
async def blink_task():
""" Task to blink LED """
global CONNECTION
toggle = True
while True:
led.value(toggle)
toggle = not toggle
blink = 1000
if CONNECTION and CONNECTION.is_connected():
blink = 1000
else:
blink = 250
await asyncio.sleep_ms(blink)
Managing Tasks
Tasks are managed through the append_to_tasks
and reminders_task
functions. The append_to_tasks function adds new tasks, while reminders_task
sends reminders to the connected device.
def append_to_tasks(task):
global TASKS
already_present = False
id = task[0]
if (task[3] == 'Y'):
tasks = []
for t in TASKS:
if t.id == id:
already_present = True
if t.id != id:
tasks.append(t)
TASKS = tasks
return
if already_present:
return
interval_seconds = int(task[1])
completed = 'N'
periodic = task[2]
new_task = Reminder(id, interval_seconds, completed, periodic)
TASKS.append(new_task)
async def reminders_task():
global CONNECTION
while True:
while CONNECTION and CONNECTION.is_connected():
tasks_to_remind = get_tasks_to_remind()
if not len(tasks_to_remind):
task_characteristic.write('__$$NoTasks')
await asyncio.sleep_ms(10)
else:
for v in tasks_to_remind:
task_characteristic.write(v)
await asyncio.sleep_ms(10)
await asyncio.sleep_ms(100)
tasks_to_remind = []
await asyncio.sleep_ms(100)
def get_tasks_to_remind():
global TASKS
tasks_to_remind = []
for T in TASKS:
if (T.periodic == 'Y' or (T.periodic == 'N' and T.completed == 'N')) and T.is_time_for_reminder():
tasks_to_remind.append(T.id)
T.execute_reminder()
return tasks_to_remind
Main Function
Finally, the main function coordinates all the tasks and starts the event loop.
async def main():
t1 = asyncio.create_task(reminders_task())
t2 = asyncio.create_task(peripheral_task())
t3 = asyncio.create_task(time_task())
t4 = asyncio.create_task(blink_task())
await asyncio.gather(t1, t2, t3, t4)
asyncio.run(main())
reminders.py
To keep the structure of our reminders separate let’s create a new reminders.py
class and define the Reminder
class.
import time
class Reminder:
def __init__(self, id, interval_seconds, completed, periodic):
self.id = id
self.interval_seconds = interval_seconds
self.periodic = periodic
self.completed = completed
self.last_reminder_time = time.time()
def is_time_for_reminder(self):
current_time = time.time()
elapsed_time = current_time - self.last_reminder_time
return elapsed_time >= self.interval_seconds
def execute_reminder(self):
if self.is_time_for_reminder():
self.last_reminder_time = time.time()
if self.periodic == 'N':
self.completed = 'Y'
By following these steps, you can set up Bluetooth communication on your Raspberry Pi Pico W, allowing it to manage tasks and synchronize time with other Bluetooth devices. This setup is highly customizable and can be adapted to suit various IoT applications.