3
3
import json
4
4
from threading import Thread
5
5
6
- import time
7
-
8
- from requests import HTTPError
6
+ import backoff
7
+ import requests
9
8
from sseclient import SSEClient
10
9
from ldclient .interfaces import UpdateProcessor
11
10
from ldclient .util import _stream_headers , log
@@ -16,38 +15,32 @@ def __init__(self, sdk_key, config, requester, store, ready):
16
15
Thread .__init__ (self )
17
16
self .daemon = True
18
17
self ._sdk_key = sdk_key
18
+ self ._uri = config .stream_uri
19
19
self ._config = config
20
20
self ._requester = requester
21
21
self ._store = store
22
22
self ._running = False
23
23
self ._ready = ready
24
+ self ._headers = _stream_headers (self ._sdk_key )
24
25
25
26
def run (self ):
26
- log .info ("Starting StreamingUpdateProcessor connecting to uri: " + self ._config . stream_uri )
27
+ log .info ("Starting StreamingUpdateProcessor connecting to uri: " + self ._uri )
27
28
self ._running = True
28
- hdrs = _stream_headers (self ._sdk_key )
29
- uri = self ._config .stream_uri
30
29
while self ._running :
31
- try :
32
- messages = SSEClient (uri , verify = self ._config .verify_ssl , headers = hdrs )
33
- for msg in messages :
34
- if not self ._running :
35
- break
36
- message_ok = self .process_message (self ._store , self ._requester , msg , self ._ready )
37
- if message_ok is True and self ._ready .is_set () is False :
38
- self ._ready .set ()
39
- except HTTPError as e :
40
- if e .response is not None and e .response .status_code is not None :
41
- if 400 <= e .response .status_code < 500 :
42
- log .error ("StreamingUpdateProcessor response: " + str (e ) + ". Retries will not be attempted." )
43
- if self ._ready .is_set () is False :
44
- self ._ready .set ()
45
- self ._running = False
46
- return
47
- except Exception as e :
48
- log .error ("Could not connect to LaunchDarkly stream: " + str (e .message ) +
49
- " waiting 1 second before trying again." )
50
- time .sleep (1 )
30
+ self ._connect ()
31
+
32
+ def _backoff_expo ():
33
+ return backoff .expo (max_value = 30 )
34
+
35
+ @backoff .on_exception (_backoff_expo , requests .exceptions .RequestException , max_tries = None , jitter = backoff .full_jitter )
36
+ def _connect (self ):
37
+ messages = SSEClient (self ._uri , verify = self ._config .verify_ssl , headers = self ._headers )
38
+ for msg in messages :
39
+ if not self ._running :
40
+ break
41
+ message_ok = self .process_message (self ._store , self ._requester , msg , self ._ready )
42
+ if message_ok is True and self ._ready .is_set () is False :
43
+ self ._ready .set ()
51
44
52
45
def stop (self ):
53
46
log .info ("Stopping StreamingUpdateProcessor" )
0 commit comments