Skip to content

Commit 64c6830

Browse files
author
Tristan
committed
Use asnycio.loop reader rather than executor for notifications
1 parent b3b1721 commit 64c6830

File tree

1 file changed

+41
-28
lines changed

1 file changed

+41
-28
lines changed

bleak/backends/bluezdbus/client.py

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def __init__(
103103
self._disconnect_monitor_event: Optional[asyncio.Event] = None
104104
# map of characteristic D-Bus object path to notification callback
105105
self._notification_callbacks: dict[str, NotifyCallback] = {}
106-
self._notification_fd_stop_events: dict[str, asyncio.Event] = {}
106+
self._notification_fds: dict[str, int] = {}
107107

108108
# used to override mtu_size property
109109
self._mtu_size: Optional[int] = None
@@ -207,8 +207,15 @@ def on_value_changed(char_path: str, value: bytes) -> None:
207207

208208
async def disconnect_device() -> None:
209209
# Clean up open notification file descriptors
210-
for stop_event in self._notification_fd_stop_events.items():
211-
stop_event.set()
210+
loop = asyncio.get_running_loop()
211+
for fd in self._notification_fds.items():
212+
try:
213+
loop.remove_reader(fd)
214+
os.close(fd)
215+
except Exception as e:
216+
logger.error(
217+
"Failed to remove file descriptor %d: %d", fd, e
218+
)
212219

213220
# Calling Disconnect cancels any pending connect request. Also,
214221
# if connection was successful but _get_services() raises (e.g.
@@ -909,23 +916,26 @@ async def write_gatt_descriptor(
909916
"Write Descriptor %s | %s: %s", descriptor.handle, descriptor.obj[0], data
910917
)
911918

912-
async def _read_notify_fd(self, fd, callback, close_event):
919+
def _read_notify_fd(self, fd, callback):
913920
loop = asyncio.get_running_loop()
914-
with os.fdopen(fd, "rb", closefd=True) as f:
915-
while True:
921+
os.set_blocking(fd, False)
922+
923+
def on_data():
924+
try:
925+
data = os.read(fd, 1024)
926+
if not data: # EOF, close file descriptor
927+
os.close(fd)
928+
return
929+
callback(bytes(data))
930+
except Exception as e:
931+
logger.error("AcquireNotify: Read error on fd %d: %s", fd, e)
916932
try:
917-
if close_event.is_set():
918-
break
933+
loop.remove_reader(fd)
934+
os.close(fd)
935+
except OSError:
936+
pass
919937

920-
data = await loop.run_in_executor(None, f.read, 1024)
921-
if not data:
922-
continue
923-
callback(bytes(data))
924-
except Exception as e:
925-
logger.error(
926-
"Exception occured while using AcquireNotify fd: %s",
927-
e,
928-
)
938+
loop.add_reader(fd, on_data)
929939

930940
@override
931941
async def start_notify(
@@ -939,6 +949,10 @@ async def start_notify(
939949
"""
940950
assert self._bus is not None
941951

952+
# If using StartNotify and calling a read on the same
953+
# characteristic, BlueZ will return the response as
954+
# both a notification and read, duplicating the message.
955+
# Using NotifyAcquired on supported characteristics avoids this.
942956
if "NotifyAcquired" in characteristic.obj[1]:
943957
reply = await self._bus.call(
944958
Message(
@@ -954,12 +968,8 @@ async def start_notify(
954968
assert_reply(reply)
955969

956970
unix_fd = reply.unix_fds[0]
957-
stop_event = asyncio.Event()
958-
self._notification_fd_stop_events[characteristic.obj[0]] = stop_event
959-
task = asyncio.create_task(
960-
self._read_notify_fd(unix_fd, callback, stop_event)
961-
)
962-
_background_tasks.add(task)
971+
self._notification_fds[characteristic.obj[0]] = unix_fd
972+
self._read_notify_fd(unix_fd, callback)
963973
else:
964974
self._notification_callbacks[characteristic.obj[0]] = callback
965975
reply = await self._bus.call(
@@ -987,11 +997,14 @@ async def stop_notify(self, characteristic: BleakGATTCharacteristic) -> None:
987997
assert self._bus is not None
988998

989999
if "NotifyAcquired" in characteristic.obj[1]:
990-
stop_event = self._notification_fd_stop_events.pop(
991-
characteristic.obj[0], None
992-
)
993-
if stop_event:
994-
stop_event.set()
1000+
fd = self._notification_fds.pop(characteristic.obj[0])
1001+
if fd:
1002+
loop = asyncio.get_running_loop()
1003+
try:
1004+
loop.remove_reader(fd)
1005+
os.close(fd)
1006+
except Exception as e:
1007+
logger.error("Failed to remove file descriptor %d: %d", fd, e)
9951008
else:
9961009
reply = await self._bus.call(
9971010
Message(

0 commit comments

Comments
 (0)