1
+ from __future__ import absolute_import
2
+
1
3
import json
2
4
from threading import Thread
3
5
4
- import time
6
+ import backoff
7
+ import requests
5
8
from sseclient import SSEClient
6
-
7
9
from ldclient .interfaces import UpdateProcessor
8
10
from ldclient .util import _stream_headers , log
9
11
@@ -13,44 +15,47 @@ def __init__(self, sdk_key, config, requester, store, ready):
13
15
Thread .__init__ (self )
14
16
self .daemon = True
15
17
self ._sdk_key = sdk_key
18
+ self ._uri = config .stream_uri
16
19
self ._config = config
17
20
self ._requester = requester
18
21
self ._store = store
19
22
self ._running = False
20
23
self ._ready = ready
24
+ self ._headers = _stream_headers (self ._sdk_key )
21
25
22
26
def run (self ):
23
- log .info ("Starting StreamingUpdateProcessor connecting to uri: " + self ._config . stream_uri )
27
+ log .info ("Starting StreamingUpdateProcessor connecting to uri: " + self ._uri )
24
28
self ._running = True
25
- hdrs = _stream_headers (self ._sdk_key )
26
- uri = self ._config .stream_uri
27
29
while self ._running :
28
- try :
29
- messages = SSEClient (uri , verify = self ._config .verify_ssl , headers = hdrs )
30
- for msg in messages :
31
- if not self ._running :
32
- break
33
- if self .process_message (self ._store , self ._requester , msg , self ._ready ) is True :
34
- self ._ready .set ()
35
- except Exception as e :
36
- log .error ("Could not connect to LaunchDarkly stream: " + str (e .message ) +
37
- " waiting 1 second before trying again." )
38
- time .sleep (1 )
30
+ self ._connect ()
31
+
32
+ def _backoff_expo ():
33
+ return backoff .expo (max_value = 30 )
34
+
35
+ @backoff .on_exception (_backoff_expo , requests .exceptions .RequestException , max_tries = None , jitter = backoff .full_jitter )
36
+ def _connect (self ):
37
+ messages = SSEClient (self ._uri , verify = self ._config .verify_ssl , headers = self ._headers )
38
+ for msg in messages :
39
+ if not self ._running :
40
+ break
41
+ message_ok = self .process_message (self ._store , self ._requester , msg , self ._ready )
42
+ if message_ok is True and self ._ready .is_set () is False :
43
+ self ._ready .set ()
39
44
40
45
def stop (self ):
41
46
log .info ("Stopping StreamingUpdateProcessor" )
42
47
self ._running = False
43
48
44
49
def initialized (self ):
45
- return self ._running and self ._ready .is_set () and self ._store .initialized
50
+ return self ._running and self ._ready .is_set () is True and self ._store .initialized is True
46
51
47
52
@staticmethod
48
53
def process_message (store , requester , msg , ready ):
49
54
log .debug ("Received stream event {} with data: {}" .format (msg .event , msg .data ))
50
55
if msg .event == 'put' :
51
56
payload = json .loads (msg .data )
52
57
store .init (payload )
53
- if not ready .is_set () and store .initialized :
58
+ if not ready .is_set () is True and store .initialized is True :
54
59
log .info ("StreamingUpdateProcessor initialized ok" )
55
60
return True
56
61
elif msg .event == 'patch' :
@@ -63,7 +68,7 @@ def process_message(store, requester, msg, ready):
63
68
store .upsert (key , requester .get_one (key ))
64
69
elif msg .event == "indirect/put" :
65
70
store .init (requester .get_all ())
66
- if not ready .is_set () and store .initialized :
71
+ if not ready .is_set () is True and store .initialized is True :
67
72
log .info ("StreamingUpdateProcessor initialized ok" )
68
73
return True
69
74
elif msg .event == 'delete' :
0 commit comments