-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbt_csc.py
More file actions
354 lines (292 loc) · 11.9 KB
/
bt_csc.py
File metadata and controls
354 lines (292 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# This module is derived from the the official uPython bluetooth example:
# https://github.com/micropython/micropython/blob/master/examples/bluetooth/ble_temperature_central.py
# Accroding to the Bluetooth GATT CSC profile, instead of read a value, we enable notification of the
# CSC data
import bluetooth
import random
import struct
import time
import micropython
import sys
import machine
from ble_advertising import decode_services, decode_name
from micropython import const
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)
_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)
_ADV_SCAN_IND = const(0x02)
_ADV_NONCONN_IND = const(0x03)
_CSC_SENSE_UUID = bluetooth.UUID(0x1816)
_CSC_UUID = bluetooth.UUID(0x2a5b)
_CONF_UUID = bluetooth.UUID(0x2902)
# _TEMP_CHAR = (
# _CSC_UUID,
# bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,
# )
# _ENV_SENSE_SERVICE = (
# _CSC_SENSE_UUID,
# (_TEMP_CHAR,),
# )
class BLECSCCentral:
def __init__(self, ble, msg_callback=None):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
self._show_msg = msg_callback
self._reset()
def _reset(self):
# Cached name and address from a successful scan.
self._name = None
self._addr_type = None
self._addr = None
# Cached value (if we have one)
self._value = None
# Callbacks for completion of various operations.
# These reset back to None after being invoked.
self._scan_callback = None
self._conn_callback = None
self._read_callback = None
# Persistent callback for when new data is notified from the device.
self._notify_callback = None
# Connected device.
self._conn_handle = None
self._start_handle = None
self._end_handle = None
self._value_handle = None
self._dsc_handle = None
self._config_handle = None
# timer
self._connection_timer = time.time()
def _irq(self, event, data):
# print("IRQ:", event, data)
if event == _IRQ_SCAN_RESULT:
if self._show_msg:
self._show_msg("Scanning ...")
addr_type, addr, adv_type, rssi, adv_data = data
if adv_type in (_ADV_IND, _ADV_DIRECT_IND) and _CSC_SENSE_UUID in decode_services(adv_data):
# Found a potential device, remember it and stop scanning.
self._addr_type = addr_type
self._addr = bytes(addr) # Note: addr buffer is owned by caller so need to copy it.
self._name = decode_name(adv_data) or "?"
self._ble.gap_scan(None)
elif event == _IRQ_SCAN_DONE:
if self._scan_callback:
if self._addr:
# Found a device during the scan (and the scan was explicitly stopped).
self._scan_callback(self._addr_type, self._addr, self._name)
self._scan_callback = None
else:
# Scan timed out.
self._scan_callback(None, None, None)
elif event == _IRQ_PERIPHERAL_CONNECT:
# Connect successful.
conn_handle, addr_type, addr = data
if addr_type == self._addr_type and addr == self._addr:
self._conn_handle = conn_handle
self._ble.gattc_discover_services(self._conn_handle)
elif event == _IRQ_PERIPHERAL_DISCONNECT:
# Disconnect (either initiated by us or the remote end).
conn_handle, _, _ = data
if conn_handle == self._conn_handle:
# If it was initiated by us, it'll already be reset.
self._reset()
elif event == _IRQ_GATTC_SERVICE_RESULT:
# Connected device returned a service.
if self._show_msg:
self._show_msg("Setting up sensor ...")
conn_handle, start_handle, end_handle, uuid = data
# print("service", data)
if conn_handle == self._conn_handle and uuid == _CSC_SENSE_UUID:
self._start_handle, self._end_handle = start_handle, end_handle
elif event == _IRQ_GATTC_SERVICE_DONE:
# Service query complete.
if self._start_handle and self._end_handle:
self._ble.gattc_discover_characteristics(
self._conn_handle, self._start_handle, self._end_handle)
else:
print("Failed to find CSC sensing service.")
self._show_msg("Failed to find ", "CSC sensing service.")
machine.reset()
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
# Connected device returned a characteristic.
if self._show_msg:
self._show_msg("Setting up sensor ...")
conn_handle, def_handle, value_handle, properties, uuid = data
# print("characteristic", data)
if conn_handle == self._conn_handle and uuid == _CSC_UUID:
# print("Found CSC char")
self._value_handle = value_handle
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
# Characteristic query complete.
if self._value_handle:
self._ble.gattc_discover_descriptors(
self._conn_handle, self._start_handle, self._end_handle)
# We've finished connecting and discovering device, fire the connect callback.
if self._conn_callback:
self._conn_callback()
else:
print("Failed to find CSC characteristic.")
self._show_msg("Failed to find ", "CSC characteristic.")
machine.reset()
elif event == _IRQ_GATTC_DESCRIPTOR_RESULT:
if self._show_msg:
self._show_msg("Setting up sensor ...")
conn_handle, dsc_handle, uuid = data
# print(data)
if conn_handle == self._conn_handle and uuid == _CONF_UUID:
self._dsc_handle = dsc_handle
elif event == _IRQ_GATTC_DESCRIPTOR_DONE:
conn_handle, status = data
# print(data)
if not self._dsc_handle:
machine.reset()
elif event == _IRQ_GATTC_READ_RESULT:
# A read completed successfully.
conn_handle, value_handle, char_data = data
if conn_handle == self._conn_handle and value_handle == self._value_handle:
self._update_value(char_data)
if self._read_callback:
self._read_callback(self._value)
self._read_callback = None
elif event == _IRQ_GATTC_READ_DONE:
# Read completed (no-op).
conn_handle, value_handle, status = data
elif event == _IRQ_GATTC_NOTIFY:
# The ble_temperature.py demo periodically notifies its value.
conn_handle, value_handle, notify_data = data
if conn_handle == self._conn_handle and value_handle == self._value_handle:
self._update_value(notify_data)
if self._notify_callback:
self._notify_callback(self._value)
# Returns true if we've successfully connected and discovered characteristics.
def is_connected(self):
return self._conn_handle is not None and self._value_handle is not None
# Find a device advertising the environmental sensor service.
def scan(self, callback=None):
self._addr_type = None
self._addr = None
self._scan_callback = callback
self._ble.gap_scan(15000, 30000, 30000)
# Connect to the specified device (otherwise use cached address from a scan).
def connect(self, addr_type=None, addr=None, callback=None):
self._connection_timer = time.time()
self._addr_type = addr_type or self._addr_type
self._addr = addr or self._addr
self._conn_callback = callback
if self._addr_type is None or self._addr is None:
return False
self._ble.gap_connect(self._addr_type, self._addr)
return True
# Disconnect from current device.
def disconnect(self):
if not self._conn_handle:
return
self._ble.gap_disconnect(self._conn_handle)
self._reset()
# Issues an (asynchronous) read, will invoke callback with data.
def read(self, callback):
if not self.is_connected():
return
self._read_callback = callback
self._ble.gattc_read(self._conn_handle, self._value_handle)
# Sets a callback to be invoked when the device notifies us.
def on_notify(self, callback):
self._notify_callback = callback
def _update_value(self, data):
# print("Updating:", len(data))
# for data_point in data:
# print(data_point)
self._value = struct.unpack("<bIH", data)
# print(self._value)
return self._value
def value(self):
return self._value
def enable_notification(self):
self._ble.gattc_write(self._conn_handle, self._dsc_handle, struct.pack('h', 0x0001))
def on_receive_csc_print(csc_data):
print("Wheel Rev:", csc_data[1])
print("Since last:", csc_data[2], "ms")
def demo():
ble = bluetooth.BLE()
central = BLECSCCentral(ble)
not_found = False
def on_scan(addr_type, addr, name):
if addr_type is not None:
print("Found sensor:", addr_type, addr, name)
central.connect()
else:
nonlocal not_found
not_found = True
print("No sensor found.")
central.scan(callback=on_scan)
# Wait for connection...
while not central.is_connected():
time.sleep_ms(100)
if not_found:
return
print("Connected")
time.sleep(2)
central.on_notify(on_receive_csc_print)
central.enable_notification()
# Explicitly issue reads, using "print" as the callback.
while central.is_connected():
# central.read(callback=print)
time.sleep_ms(2000)
# Alternative to the above, just show the most recently notified value.
# while central.is_connected():
# print(central.value())
# time.sleep_ms(2000)
print("Disconnected")
# notify data: [status, wheelRev, lastUpdate]
def run_CSC(data_callback, msg_callback):
ble = bluetooth.BLE()
central = BLECSCCentral(ble, msg_callback)
not_found = False
def on_scan(addr_type, addr, name):
if addr_type is not None:
print("Found sensor:", addr_type, addr, name)
msg_callback("Found sensor:", str(addr), name)
central.connect()
else:
nonlocal not_found
not_found = True
print("No sensor found.")
msg_callback("No sensor found.")
machine.reset()
central.scan(callback=on_scan)
# Wait for connection...
while not central.is_connected():
time.sleep_ms(100)
if not_found:
return
print("Connected")
msg_callback("Connected")
time.sleep(2)
central.on_notify(data_callback)
central.enable_notification()
# Explicitly issue reads, using "print" as the callback.
while central.is_connected():
# central.read(callback=print)
time.sleep_ms(2000)
print("Disconnected")
msg_callback("Disconnected")
if __name__ == "__main__":
demo()