6
6
from .base_command import BaseCommand
7
7
from ...common import Object
8
8
from ...lib import CursorResponse
9
- from ...exceptions import HTTPError , NotificationsError
9
+ from ...exceptions .transport import HTTPError
10
+ from ...exceptions .notifications import NotificationsError
11
+
12
+
13
+ logger = logging .getLogger ('cterasdk.notifications' )
10
14
11
15
12
16
class Notifications (BaseCommand ):
@@ -30,11 +34,11 @@ async def get(self, cloudfolders=None, cursor=None, max_results=None):
30
34
"""
31
35
param = await self ._create_parameter (cloudfolders , cursor )
32
36
param .max_results = max_results if max_results is not None else 2000
33
- logging . getLogger ( 'cterasdk.metadata.connector' ) .debug ('Listing updates.' )
37
+ logger .debug ('Listing updates.' )
34
38
response = await self ._core .v2 .api .post ('/metadata/list' , param )
35
39
if response is not None :
36
40
return CursorResponse (response )
37
- logging . getLogger ( 'cterasdk.metadata.connector' ) .error ('An error occurred while trying to retrieve notifications.' )
41
+ logger .error ('An error occurred while trying to retrieve notifications.' )
38
42
raise NotificationsError (cloudfolders , cursor )
39
43
40
44
async def _create_parameter (self , cloudfolders , cursor ):
@@ -64,7 +68,7 @@ async def changes(self, cursor, cloudfolders=None, timeout=None):
64
68
param = Object ()
65
69
param = await self ._create_parameter (cloudfolders , cursor )
66
70
param .timeout = timeout if timeout else 10000
67
- logging . getLogger ( 'cterasdk.metadata.connector' ) .debug ('Checking for updates. %s' , {'timeout' : param .timeout })
71
+ logger .debug ('Checking for updates. %s' , {'timeout' : param .timeout })
68
72
return (await self ._core .v2 .api .post ('/metadata/longpoll' , param )).changes
69
73
70
74
async def ancestors (self , descendant ):
@@ -78,12 +82,11 @@ async def ancestors(self, descendant):
78
82
param = Object ()
79
83
param .folder_id = descendant .folder_id
80
84
param .guid = descendant .guid
81
- logging . getLogger ( 'cterasdk.metadata.connector' ) .debug ('Getting ancestors. %s' , {'guid' : param .guid , 'folder_id' : param .folder_id })
85
+ logger .debug ('Getting ancestors. %s' , {'guid' : param .guid , 'folder_id' : param .folder_id })
82
86
try :
83
87
return await self ._core .v2 .api .post ('/metadata/ancestors' , param )
84
88
except HTTPError :
85
- logging .getLogger ('cterasdk.metadata.connector' ).error ('Could not retrieve ancestors. %s' ,
86
- {'folder_id' : param .folder_id , 'guid' : param .guid })
89
+ logger .error ('Could not retrieve ancestors. %s' , {'folder_id' : param .folder_id , 'guid' : param .guid })
87
90
raise
88
91
89
92
@@ -128,7 +131,7 @@ async def retrieve_events(server_queue, core, cloudfolders, cursor):
128
131
:param list[CloudFSFolderFindingHelper] cloudfolders: List of Cloud Drive folders.
129
132
:param str cursor: Cursor
130
133
"""
131
- logging . getLogger ( 'cterasdk.metadata.connector' ) .debug ('Event Retrieval Service.' )
134
+ logger .debug ('Event Retrieval Service.' )
132
135
last_response = LastResponse (cursor )
133
136
try :
134
137
while True :
@@ -142,9 +145,9 @@ async def retrieve_events(server_queue, core, cloudfolders, cursor):
142
145
except ConnectionError as error :
143
146
await on_connection_error (error )
144
147
except TimeoutError :
145
- logging . getLogger ( 'cterasdk.metadata.connector' ) .debug ('Request timed out. Retrying.' )
148
+ logger .debug ('Request timed out. Retrying.' )
146
149
except asyncio .CancelledError :
147
- logging . getLogger ( 'cterasdk.metadata.connector' ) .debug ('Cancelling Event Retrieval.' )
150
+ logger .debug ('Cancelling Event Retrieval.' )
148
151
149
152
150
153
async def forward_events (server_queue , client_queue , save_cursor ):
@@ -155,15 +158,15 @@ async def forward_events(server_queue, client_queue, save_cursor):
155
158
:param asyncio.Queue client_queue: Client queue.
156
159
:param callback save_cursor: Callback function to persist the cursor.
157
160
"""
158
- logging . getLogger ( 'cterasdk.metadata.connector' ) .debug ('Event Forwarder Service.' )
161
+ logger .debug ('Event Forwarder Service.' )
159
162
try :
160
163
while True :
161
164
batch = await server_queue .get ()
162
165
await enqueue_events (batch .objects , client_queue )
163
166
await process_events (client_queue )
164
167
await persist_cursor (save_cursor , batch .cursor )
165
168
except asyncio .CancelledError :
166
- logging . getLogger ( 'cterasdk.metadata.connector' ) .debug ('Cancelling Event Forwarding.' )
169
+ logger .debug ('Cancelling Event Forwarding.' )
167
170
168
171
169
172
async def enqueue_events (events , queue ):
@@ -174,9 +177,9 @@ async def enqueue_events(events, queue):
174
177
:param cterasdk.asynchronous.core.iterator.CursorAsyncIterator events: Event Iterator.
175
178
"""
176
179
for event in events :
177
- logging . getLogger ( 'cterasdk.metadata.connector' ) .debug ('Enqueuing Event.' )
180
+ logger .debug ('Enqueuing Event.' )
178
181
await queue .put (Event .from_server_object (event ))
179
- logging . getLogger ( 'cterasdk.metadata.connector' ) .debug ('Enqueued Event.' )
182
+ logger .debug ('Enqueued Event.' )
180
183
181
184
182
185
async def process_events (queue ):
@@ -185,9 +188,9 @@ async def process_events(queue):
185
188
186
189
:param asyncio.Queue queue: Queue.
187
190
"""
188
- logging . getLogger ( 'cterasdk.metadata.connector' ) .debug ('Joining Queue.' )
191
+ logger .debug ('Joining Queue.' )
189
192
await queue .join ()
190
- logging . getLogger ( 'cterasdk.metadata.connector' ) .debug ('Completed Processing.' )
193
+ logger .debug ('Completed Processing.' )
191
194
192
195
193
196
async def persist_cursor (save_cursor , cursor ):
@@ -197,19 +200,18 @@ async def persist_cursor(save_cursor, cursor):
197
200
:param callback save_cursor: Asynchronous callback function to persist the cursor.
198
201
:param str cursor: Cursor
199
202
"""
200
- logging . getLogger ( 'cterasdk.metadata.connector' ) .debug ("Persisting Cursor. Calling function: '%s'" , save_cursor )
203
+ logger .debug ("Persisting Cursor. Calling function: '%s'" , save_cursor )
201
204
try :
202
205
await save_cursor (cursor )
203
- logging . getLogger ( 'cterasdk.metadata.connector' ) .debug ("Called Persist Cursor Function." )
206
+ logger .debug ("Called Persist Cursor Function." )
204
207
except Exception : # pylint: disable=broad-exception-caught
205
- logging .getLogger ('cterasdk.metadata.connector' ).error ("An error occurred while trying to persist cursor. Function: '%s'" ,
206
- save_cursor )
208
+ logger .error ("An error occurred while trying to persist cursor. Function: '%s'" , save_cursor )
207
209
208
210
209
211
async def on_connection_error (error ):
210
212
seconds = 5
211
- logging . getLogger ( 'cterasdk.metadata.connector' ) .error ('Connection error. Reason: %s.' , str (error ))
212
- logging . getLogger ( 'cterasdk.metadata.connector' ) .debug ("Retrying in %s seconds." , seconds )
213
+ logger .error ('Connection error. Reason: %s.' , str (error ))
214
+ logger .debug ("Retrying in %s seconds." , seconds )
213
215
await asyncio .sleep (seconds )
214
216
215
217
0 commit comments