Skip to content

Created _get_streams_generator(), get_all_streams(). Uses paging. #26

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 52 additions & 2 deletions devicecloud/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ def __init__(self, *args, **kwargs):

def _get_streams(self, uri_suffix=None):
"""Clear and update internal cache of stream objects"""
# TODO: handle paging, perhaps change this to be a generator
if uri_suffix is not None and not uri_suffix.startswith('/'):
uri_suffix = '/' + uri_suffix
elif uri_suffix is None:
Expand All @@ -130,6 +129,39 @@ def _get_streams(self, uri_suffix=None):
streams[stream_id] = stream
return streams

def _get_streams_generator(self, uri_suffix=None, page_size=1000):
"""
Handles the paging to return a generator of all streams.
"""
if uri_suffix is not None and not uri_suffix.startswith('/'):
uri_suffix = '/' + uri_suffix
elif uri_suffix is None:
uri_suffix = ""

query_parameters = {}

result_size = page_size
while result_size == page_size:
# request the next page of data or first if pageCursor is not set as query param
try:
query = urllib.parse.urlencode(query_parameters)
call = "/ws/DataStream{}?{}".format(uri_suffix, query)
response = self._conn.get_json(call)
except DeviceCloudHttpException as http_exception:
if http_exception.response.status_code == 404:
raise NoSuchStreamException()
raise http_exception

result_size = int(response["resultSize"]) # how many are actually included here?
query_parameters["pageCursor"] = response.get("pageCursor") # will not be present if result set is empty

streams = {}
for stream_data in response["items"]:
stream_id = stream_data["streamId"]
stream = DataStream(self._conn, stream_id, stream_data)
streams[stream_id] = stream
yield streams

def create_stream(self, stream_id, data_type, description=None, data_ttl=None,
rollup_ttl=None, units=None):
"""Create a new data stream on Device Cloud
Expand Down Expand Up @@ -182,9 +214,27 @@ def get_streams(self, stream_prefix=None):
:return: iterator over all :class:`.DataStream` instances on Device Cloud

"""
# TODO: deal with paging. We now return a generator, so the interface should look the same
return iter(self._get_streams(stream_prefix).values())

def get_all_streams(self, stream_prefix=None, page_size=1000):
"""Return the generator over streams preset on device cloud.

:param stream_prefix: An optional prefix to limit the iterator to; all streams are returned if it is not specified.

:param int page_size: The number of results that we should attempt to retrieve from the
device cloud in each page. Generally, this can be left at its default value unless
you have a good reason to change the parameter for performance reasons.

:return: list of all :class:`.DataStream` instances on Device Cloud

"""
streams = []

for page in self._get_streams_generator(stream_prefix, page_size):
streams.extend(page.values())

return streams

def get_stream(self, stream_id):
"""Return a reference to a stream with the given ``stream_id``

Expand Down