@@ -176,49 +176,78 @@ def _consume(self):
176
176
TODO: Only wait when another command comes in.
177
177
"""
178
178
while not self .is_closed :
179
- # Wait, until bridge is ready
180
- while not self .is_ready :
179
+ # Use the lock so we are sure is_ready is not changed during execution
180
+ # and the socket is not in use
181
+ with self ._lock :
182
+ # Check if bridge is ready and there are
183
+ if self .is_ready and not self ._command_queue .empty ():
184
+ # Get command from queue.
185
+ (command , reps , wait ) = self ._command_queue .get ()
186
+ # Select group if a different group is currently selected.
187
+ if command .select and self ._selected_number != command .group_number :
188
+ if not self ._send_raw (command .select_command .get_bytes (self )):
189
+ # Stop sending on socket error
190
+ self .is_ready = False
191
+ continue
192
+
193
+ time .sleep (SELECT_WAIT )
194
+ # Repeat command as necessary.
195
+ for _ in range (reps ):
196
+ if not self ._send_raw (command .get_bytes (self )):
197
+ # Stop sending on socket error
198
+ self .is_ready = False
199
+ continue
200
+ time .sleep (wait )
201
+
202
+ self ._selected_number = command .group_number
203
+
204
+ # Wait a little time if queue is empty
205
+ if self ._command_queue .empty ():
206
+ time .sleep (MIN_WAIT )
207
+
208
+ # Wait if bridge is not ready, we're only reading is_ready, no lock needed
209
+ if not self .is_ready :
181
210
if self .is_closed :
182
211
return
183
212
time .sleep (RECONNECT_TIME )
184
213
185
- # Get command from queue.
186
- (command , reps , wait ) = self ._command_queue .get ()
187
- # Select group if a different group is currently selected.
188
- if command .select and self ._selected_number != command .group_number :
189
- self ._send_raw (command .select_command .get_bytes (self ))
190
- time .sleep (SELECT_WAIT )
191
- # Repeat command as necessary.
192
- for _ in range (reps ):
193
- self ._send_raw (command .get_bytes (self ))
194
- time .sleep (wait )
195
- self ._selected_number = command .group_number
196
-
197
214
def _send_raw (self , command ):
198
215
"""
199
216
Sends an raw command directly to the physical bridge.
200
217
:param command: A bytearray.
201
218
"""
202
- self ._socket .send (bytearray (command ))
203
- self ._sn = (self ._sn + 1 ) % 256
219
+ try :
220
+ self ._socket .send (bytearray (command ))
221
+ self ._sn = (self ._sn + 1 ) % 256
222
+ return True
223
+ except (socket .error , socket .timeout ):
224
+ # We can get a socket.error or timeout exception if the bridge is disconnected,
225
+ # but we are still sending data. In that case, return False to indicate that data is not sent.
226
+ return False
204
227
205
228
def _init_connection (self ):
206
229
"""
207
230
Requests the session ids of the bridge.
208
231
:returns: True, if initialization was successful. False, otherwise.
209
232
"""
210
233
try :
234
+ # We are changing self.is_ready: lock it up!
235
+ self ._lock .acquire ()
236
+
211
237
response = bytearray (22 )
212
238
self ._send_raw (BRIDGE_INITIALIZATION_COMMAND )
213
239
self ._socket .recv_into (response )
214
240
self ._wb1 = response [19 ]
215
241
self ._wb2 = response [20 ]
216
242
self .is_ready = True
217
243
except socket .timeout :
244
+ # Connection timed out, bridge is not ready for us
218
245
self .is_ready = False
219
- return False
246
+ finally :
247
+ # Prevent deadlocks: always release the lock
248
+ self ._lock .release ()
220
249
221
- return True
250
+ return self . is_ready
222
251
223
252
def _reconnect (self ):
224
253
"""
@@ -239,33 +268,35 @@ def _keep_alive(self):
239
268
self ._reconnect ()
240
269
continue
241
270
242
- command = KEEP_ALIVE_COMMAND_PREAMBLE + [self .wb1 , self .wb2 ]
243
- self ._send_raw (command )
271
+ # Acquire the lock to make sure we don't change self.is_ready
272
+ # while _consume() is sending commands
273
+ with self ._lock :
274
+ command = KEEP_ALIVE_COMMAND_PREAMBLE + [self .wb1 , self .wb2 ]
275
+ self ._send_raw (command )
244
276
245
- start = datetime .now ()
246
- connection_alive = False
247
- while datetime .now () - start < timedelta (seconds = SOCKET_TIMEOUT ):
248
- response = bytearray (12 )
249
- try :
250
- self ._socket .recv_into (response )
251
- except socket .timeout :
252
- break
277
+ start = datetime .now ()
278
+ connection_alive = False
279
+ while datetime .now () - start < timedelta (seconds = SOCKET_TIMEOUT ):
280
+ response = bytearray (12 )
281
+ try :
282
+ self ._socket .recv_into (response )
283
+ except socket .timeout :
284
+ break
253
285
254
- if response [:5 ] == bytearray (KEEP_ALIVE_RESPONSE_PREAMBLE ):
255
- connection_alive = True
256
- break
286
+ if response [:5 ] == bytearray (KEEP_ALIVE_RESPONSE_PREAMBLE ):
287
+ connection_alive = True
288
+ break
257
289
258
- if not connection_alive :
259
- self .is_ready = False
260
- self ._reconnect ()
261
- continue
290
+ if not connection_alive :
291
+ self .is_ready = False
262
292
263
- time .sleep (KEEP_ALIVE_TIME )
293
+ # Wait for KEEP_ALIVE_TIME seconds before sending next keep-alive message
294
+ if self .is_ready :
295
+ time .sleep (KEEP_ALIVE_TIME )
264
296
265
297
def close (self ):
266
298
"""
267
299
Closes the connection to the bridge.
268
300
"""
269
301
self .is_closed = True
270
302
self .is_ready = False
271
-
0 commit comments