Skip to content

Commit ee0cc7b

Browse files
authored
Merge pull request #78 from Cumulocity-IoT/feature/better-get_last
Feature/better get last
2 parents c241ab3 + 688b4d8 commit ee0cc7b

File tree

8 files changed

+167
-53
lines changed

8 files changed

+167
-53
lines changed

c8y_api/app/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ def get_item(key: str, dictionary: dict) -> str | None:
9393
info = get_item('Authorization', headers)
9494

9595
if not info:
96-
9796
keys = ", ".join([*headers.keys(), *cookies.keys()]) or "None"
9897
raise KeyError(f"Unable to resolve Authorization information. Found keys: {keys}.")
98+
9999
return info
100100

101101
@staticmethod

c8y_api/model/_util.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ def encode_odata_text_value(value):
5050
encoded_quotes = sub('\'', '\'\'', value)
5151
return encoded_quotes if " " not in encoded_quotes else f"'{encoded_quotes}'"
5252

53+
5354
class _DateUtil(object):
5455

5556
@staticmethod

c8y_api/model/events.py

Lines changed: 89 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,16 @@ def _check_params(fragment, fragment_type, fragment_value):
250250
if fragment_value and not (fragment_type or fragment):
251251
raise ValueError("Fragment value filter also needs 'fragment_type' or 'fragment' filter.")
252252

253+
def _prepare_base_query(
254+
self,
255+
fragment: str = None,
256+
fragment_type: str = None,
257+
fragment_value: str = None,
258+
**kwargs) -> str:
259+
Events._check_params(fragment, fragment_type, fragment_value)
260+
base_query = self._prepare_query(**kwargs)
261+
return base_query
262+
253263
def select(self,
254264
expression: str = None,
255265
type: str = None, source: str = None, fragment: str = None, # noqa (type)
@@ -323,22 +333,26 @@ def select(self,
323333
Returns:
324334
Generator for Event objects
325335
"""
326-
Events._check_params(fragment, fragment_type, fragment_value)
327-
328-
base_query = self._prepare_query(
336+
base_query = self._prepare_base_query(
329337
expression=expression,
330-
type=type, source=source, fragment=fragment,
331-
fragment_type=fragment_type, fragment_value=fragment_value,
332-
before=before, after=after,
333-
date_from=date_from, date_to=date_to,
334-
created_before=created_before, created_after=created_after,
335-
created_from=created_from, created_to=created_to,
336-
updated_before=updated_before, updated_after=updated_after,
337-
last_updated_from=last_updated_from, last_updated_to=last_updated_to,
338-
min_age=min_age, max_age=max_age,
338+
type=type,
339+
source=source,
340+
fragment=fragment,
341+
fragment_type=fragment_type,
342+
fragment_value=fragment_value,
343+
before=before,
344+
after=after,
345+
created_before=created_before,
346+
created_after=created_after,
347+
updated_before=updated_before,
348+
updated_after=updated_after,
349+
min_age=min_age,
350+
max_age=max_age,
351+
date_from=date_from,
352+
date_to=date_to,
339353
reverse=reverse,
340-
with_source_devices=with_source_devices, with_source_assets=with_source_assets,
341-
page_size=page_size,
354+
with_source_assets=with_source_assets,
355+
with_source_devices=with_source_devices,
342356
**kwargs)
343357
return super()._iterate(
344358
base_query,
@@ -415,29 +429,73 @@ def get_count(
415429
Returns:
416430
Number of potential results
417431
"""
418-
Events._check_params(
432+
base_query = self._prepare_base_query(
433+
expression=expression,
434+
type=type,
435+
source=source,
419436
fragment=fragment,
420437
fragment_type=fragment_type,
421438
fragment_value=fragment_value,
422-
)
423-
base_query = self._prepare_query(
424-
expression=expression,
425-
type=type, source=source, fragment=fragment,
426-
fragment_type=fragment_type, fragment_value=fragment_value,
427-
before=before, after=after,
428-
date_from=date_from, date_to=date_to,
429-
created_before=created_before, created_after=created_after,
430-
created_from=created_from, created_to=created_to,
431-
updated_before=updated_before, updated_after=updated_after,
432-
last_updated_from=last_updated_from, last_updated_to=last_updated_to,
433-
min_age=min_age, max_age=max_age,
439+
before=before,
440+
after=after,
441+
created_before=created_before,
442+
created_after=created_after,
443+
updated_before=updated_before,
444+
updated_after=updated_after,
445+
min_age=min_age,
446+
max_age=max_age,
447+
date_from=date_from,
448+
date_to=date_to,
434449
reverse=reverse,
435-
with_source_devices=with_source_devices, with_source_assets=with_source_assets,
436-
limit=limit,
437-
**kwargs
438-
)
450+
with_source_assets=with_source_assets,
451+
with_source_devices=with_source_devices,
452+
**kwargs)
439453
return self._get_count(base_query)
440454

455+
def get_last(
456+
self,
457+
expression: str = None,
458+
type: str = None, source: str = None, fragment: str = None, # noqa (type)
459+
fragment_type: str = None, fragment_value: str = None,
460+
before: str | datetime = None, date_to: str | datetime = None,
461+
created_before: str | datetime = None, created_to: str | datetime = None,
462+
updated_before: str | datetime = None, last_updated_to: str | datetime = None,
463+
min_age: timedelta = None,
464+
with_source_assets: bool = None,
465+
with_source_devices: bool = None,
466+
**kwargs
467+
) -> Event | None:
468+
"""Retrieve the most recent event.
469+
"""
470+
after = None
471+
if not before and not date_to and not min_age:
472+
after = '1970-01-01'
473+
base_query = self._prepare_base_query(
474+
expression=expression,
475+
type=type,
476+
source=source,
477+
fragment=fragment,
478+
fragment_type=fragment_type,
479+
fragment_value=fragment_value,
480+
before=before,
481+
date_to=date_to,
482+
after=after, # fallback if no other is defined
483+
created_before=created_before,
484+
created_to=created_to,
485+
updated_before=updated_before,
486+
last_updated_to=last_updated_to,
487+
min_age=min_age,
488+
reverse=False, # newest first (non-standard)
489+
with_source_assets=with_source_assets,
490+
with_source_devices=with_source_devices,
491+
**kwargs)
492+
r = self._get_page(base_query, 1)
493+
if not r:
494+
return None
495+
e = Event.from_json(r[0])
496+
e.c8y = self.c8y # inject c8y connection into instance
497+
return e
498+
441499
def create(self, *events: Event):
442500
"""Create event objects within the database.
443501

c8y_api/model/operations.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -274,14 +274,14 @@ def get_last(
274274
before: str | datetime = None,
275275
min_age: timedelta = None,
276276
**kwargs
277-
) -> Operation:
277+
) -> Operation|None:
278278
""" Query the database and return the last matching operation.
279279
280280
This function is a special variant of the select function. Only
281281
the last matching result is returned.
282282
283283
Returns:
284-
Last matching Operation object
284+
Last matching Operation object or None
285285
"""
286286
# at least one date qualifier is required for this query to function,
287287
# so we enforce the 'after' filter if nothing else is specified
@@ -296,9 +296,12 @@ def get_last(
296296
reverse=True, page_size=1,
297297
**kwargs
298298
)
299-
m = Operation.from_json(self._get_page(base_query, 1)[0])
300-
m.c8y = self.c8y # inject c8y connection into instance
301-
return m
299+
r = self._get_page(base_query, 1)
300+
if not r:
301+
return None
302+
o = Operation.from_json(r[0])
303+
o.c8y = self.c8y # inject c8y connection into instance
304+
return o
302305

303306
def get_count(
304307
self,

c8y_tk/app/__init__.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ def __init__(
5151
startup_delay (float): A minimum delay before a newly added
5252
microservice is considered to be "added" (the callback
5353
invocation will be delayed by this).
54-
55-
5654
"""
5755
self._n = self._n + 1
5856
self._instance_name = f"{__name__}.{type(self).__name__}[{self._n}]"\
@@ -64,7 +62,7 @@ def __init__(
6462
self.callbacks = [(callback, blocking)] if callback else []
6563
self.callbacks_on_add = []
6664
self.callbacks_on_remove = []
67-
self._log = logging.Logger(self._instance_name)
65+
self._log = logging.getLogger(self._instance_name)
6866
self._executor = None
6967
self._callback_futures = set()
7068
self._listen_thread = None
@@ -121,10 +119,11 @@ def listen(self):
121119
This is blocking.
122120
"""
123121
# safely invoke a callback function blocking or non-blocking
124-
def invoke_callback(callback, is_blocking, arg):
122+
def invoke_callback(callback, is_blocking, _, arg):
125123
def safe_invoke(a):
126124
# pylint: disable=broad-exception-caught
127125
try:
126+
self._log.debug(f"Invoking callback: {callback}")
128127
callback(a)
129128
except Exception as error:
130129
self._log.error(f"Uncaught exception in callback: {error}", exc_info=error)
@@ -154,23 +153,24 @@ def safe_invoke(a):
154153
removed = last_subscribers - current_subscribers
155154
# run 'removed' callbacks
156155
for tenant_id in removed:
157-
self._log.info(f"Tenant subscription removed: ${tenant_id}.")
156+
self._log.info(f"Tenant subscription removed: {tenant_id}.")
158157
for fun, blocking in self.callbacks_on_remove:
159-
invoke_callback(fun, blocking, tenant_id)
158+
invoke_callback(fun, blocking, 'Removed', tenant_id)
160159
# wait remaining time for startup delay
161160
if added and self.startup_delay:
162161
min_startup_delay = self.startup_delay - (time.monotonic() - now)
163162
if min_startup_delay > 0:
164163
time.sleep(min_startup_delay)
165164
# run 'added' callbacks
166165
for tenant_id in added:
167-
self._log.info(f"Tenant subscription added: ${tenant_id}.")
166+
self._log.info(f"Tenant subscription added: {tenant_id}.")
168167
for fun, blocking in self.callbacks_on_add:
169-
invoke_callback(fun, blocking, tenant_id)
168+
invoke_callback(fun, blocking, 'Added', tenant_id)
170169
# run 'any' callbacks
171170
if added or removed:
171+
self._log.info(f"Tenant subscriptions changed: {current_subscribers}.")
172172
for fun, blocking in self.callbacks:
173-
invoke_callback(fun, blocking, current_subscribers)
173+
invoke_callback(fun, blocking, None, current_subscribers)
174174
# set new baseline
175175
last_subscribers = current_subscribers
176176
# schedule next run, skip if already exceeded

integration_tests/test_events.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
# Copyright (c) 2025 Cumulocity GmbH
22

3-
# pylint: disable=redefined-outer-name
3+
# pylint: disable=redefined-outer-name, protected-access
44

55
from __future__ import annotations
66

7+
import datetime as dt
78
import logging
89
import os
910
import tempfile
@@ -14,6 +15,7 @@
1415

1516
from c8y_api import CumulocityApi
1617
from c8y_api.model import Event, Device
18+
from c8y_api.model._util import _DateUtil
1719
from util.testing_util import RandomNameGenerator
1820

1921

@@ -37,9 +39,10 @@ def sample_events(factory, sample_device) -> List[Event]:
3739
be removed after the test function."""
3840
typename = RandomNameGenerator.random_name()
3941
result = []
42+
now = _DateUtil.now()
4043
for i in range(1, 6):
4144
event = Event(type=f'{typename}_{i}', text=f'{typename} text', source=sample_device.id,
42-
time='2020-12-31T11:33:55Z')
45+
time=now + dt.timedelta(minutes=i))
4346
result.append(factory(event))
4447
return result
4548

@@ -151,19 +154,23 @@ def test_filter_by_update_time(live_c8y: CumulocityApi, sample_device, sample_ev
151154
pivot = updated_datetimes[len(updated_datetimes)//2]
152155

153156
before_events = live_c8y.events.get_all(source=event.source, updated_before=pivot)
154-
after_events = live_c8y.events.get_all(source=event.source, updated_after=pivot)
157+
after_events = live_c8y.events.get_all(source=event.source, updated_after=pivot, reverse=True)
158+
last_event_before = live_c8y.events.get_last(source=event.source, updated_before=pivot)
159+
last_event_after = live_c8y.events.get_last(source=event.source, updated_after=pivot)
155160

156161
# -> selected events should match the update times from 'before'
157162
# upper boundary, i.e. before/to timestamp is exclusive -> does not include pivot
158163
before_datetimes = list(filter(lambda x: x < pivot, updated_datetimes))
159164
result_datetimes = [a.updated_datetime for a in before_events]
160165
assert sorted(result_datetimes) == sorted(before_datetimes)
166+
assert last_event_before.updated_datetime == max(before_datetimes)
161167

162168
# -> selected events should match the update times from 'after'
163169
# lower boundary, i.e. after/from timestamp is inclusive -> includes pivot
164170
after_datetimes = list(filter(lambda x: x >= pivot, updated_datetimes))
165171
result_datetimes = [a.updated_datetime for a in after_events]
166172
assert sorted(result_datetimes) == sorted(after_datetimes)
173+
assert last_event_after.updated_datetime == max(after_datetimes)
167174

168175

169176
def test_CRUD_attachments(live_c8y: CumulocityApi, sample_device: Device, sample_events: List[Event]): # noqa (case)

integration_tests/test_operations.py

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
# Copyright (c) 2025 Cumulocity GmbH
22

3-
43
from c8y_api import CumulocityApi
54
from c8y_api.model import Operation
65

@@ -18,12 +17,12 @@ def test_CRUD(live_c8y: CumulocityApi, sample_device):
1817
operation = operation.create()
1918

2019
# -> operation should have been created and in PENDING state
21-
operations = live_c8y.operations.get_all(agent_id=sample_device.id, status=Operation.Status.PENDING)
20+
operations = live_c8y.operations.get_all(device_id=sample_device.id, status=Operation.Status.PENDING)
2221
assert len(operations) == 1
2322
assert operations[0].id == operation.id
2423

2524
# -> same result with get_last
26-
operation2 = live_c8y.operations.get_last(agent_id=sample_device.id, status=Operation.Status.PENDING)
25+
operation2 = live_c8y.operations.get_last(device_id=sample_device.id, status=Operation.Status.PENDING)
2726
assert operation2.id == operation.id
2827

2928
# (2) update operation
@@ -45,3 +44,49 @@ def test_CRUD(live_c8y: CumulocityApi, sample_device):
4544

4645
# -> cannot be found anymore
4746
assert not live_c8y.operations.get_all(device_id=sample_device.id)
47+
48+
49+
def test_get(live_c8y: CumulocityApi, sample_device):
50+
"""Verify that query-like retrieval works as expected."""
51+
# (1) create operations
52+
operations = [
53+
Operation(live_c8y, sample_device.id, description=f'Description {i}',
54+
c8y_Command={'text': 'Command text'}).create()
55+
for i in range(5)
56+
]
57+
58+
# (2) all should have been created an in PENDING state
59+
result = live_c8y.operations.get_all(device_id=sample_device.id, status=Operation.Status.PENDING)
60+
assert len(result) == 5
61+
assert all(o.device_id == sample_device.id for o in result)
62+
63+
# (3) get last
64+
result = live_c8y.operations.get_last(device_id=sample_device.id)
65+
assert isinstance(result, Operation)
66+
assert result.device_id == sample_device.id
67+
68+
# (4) retrieving subsets
69+
operations[0].status = Operation.Status.EXECUTING
70+
operations[1].status = Operation.Status.EXECUTING
71+
operations[0].update()
72+
operations[1].update()
73+
74+
result = live_c8y.operations.get_all(device_id=sample_device.id, status=Operation.Status.PENDING)
75+
assert len(result) == 3
76+
result = live_c8y.operations.get_last(device_id=sample_device.id, status=Operation.Status.PENDING)
77+
assert result.status == Operation.Status.PENDING
78+
assert result.device_id == sample_device.id
79+
80+
result = live_c8y.operations.get_all(device_id=sample_device.id, status=Operation.Status.EXECUTING)
81+
assert len(result) == 2
82+
result = live_c8y.operations.get_last(device_id=sample_device.id, status=Operation.Status.EXECUTING)
83+
assert result.status == Operation.Status.EXECUTING
84+
assert result.device_id == sample_device.id
85+
86+
# (5) deleting subsets
87+
live_c8y.operations.delete_by(device_id=sample_device.id, status=Operation.Status.EXECUTING)
88+
assert live_c8y.operations.get_all(device_id=sample_device.id, status=Operation.Status.EXECUTING) == []
89+
assert len(live_c8y.operations.get_all(device_id=sample_device.id, status=Operation.Status.PENDING)) == 3
90+
91+
# (6) no match with get_last
92+
assert live_c8y.operations.get_last(device_id=sample_device.id, status=Operation.Status.EXECUTING) is None

tasks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def init(c):
4848
def lint(c, scope='all'):
4949
"""Run PyLint."""
5050
if scope == 'all':
51-
scope = 'c8y_api c8y_tk tests integration_tests samples'
51+
scope = 'c8y_api/ c8y_tk tests integration_tests samples'
5252
c.run(f'pylint --rcfile pylintrc --fail-under=9 {scope}')
5353

5454

0 commit comments

Comments
 (0)