diff --git a/devicecloud/streams.py b/devicecloud/streams.py index 83693c1..5eec178 100644 --- a/devicecloud/streams.py +++ b/devicecloud/streams.py @@ -117,7 +117,6 @@ def __init__(self, *args, **kwargs): def _get_streams(self, uri_suffix=None): """Clear and update internal cache of stream objects""" - # TODO: handle paging, perhaps change this to be a generator if uri_suffix is not None and not uri_suffix.startswith('/'): uri_suffix = '/' + uri_suffix elif uri_suffix is None: @@ -130,6 +129,39 @@ def _get_streams(self, uri_suffix=None): streams[stream_id] = stream return streams + def _get_streams_generator(self, uri_suffix=None, page_size=1000): + """ + Handles the paging to return a generator of all streams. + """ + if uri_suffix is not None and not uri_suffix.startswith('/'): + uri_suffix = '/' + uri_suffix + elif uri_suffix is None: + uri_suffix = "" + + query_parameters = {} + + result_size = page_size + while result_size == page_size: + # request the next page of data or first if pageCursor is not set as query param + try: + query = urllib.parse.urlencode(query_parameters) + call = "/ws/DataStream{}?{}".format(uri_suffix, query) + response = self._conn.get_json(call) + except DeviceCloudHttpException as http_exception: + if http_exception.response.status_code == 404: + raise NoSuchStreamException() + raise http_exception + + result_size = int(response["resultSize"]) # how many are actually included here? + query_parameters["pageCursor"] = response.get("pageCursor") # will not be present if result set is empty + + streams = {} + for stream_data in response["items"]: + stream_id = stream_data["streamId"] + stream = DataStream(self._conn, stream_id, stream_data) + streams[stream_id] = stream + yield streams + def create_stream(self, stream_id, data_type, description=None, data_ttl=None, rollup_ttl=None, units=None): """Create a new data stream on Device Cloud @@ -182,9 +214,27 @@ def get_streams(self, stream_prefix=None): :return: iterator over all :class:`.DataStream` instances on Device Cloud """ - # TODO: deal with paging. We now return a generator, so the interface should look the same return iter(self._get_streams(stream_prefix).values()) + def get_all_streams(self, stream_prefix=None, page_size=1000): + """Return the generator over streams preset on device cloud. + + :param stream_prefix: An optional prefix to limit the iterator to; all streams are returned if it is not specified. + + :param int page_size: The number of results that we should attempt to retrieve from the + device cloud in each page. Generally, this can be left at its default value unless + you have a good reason to change the parameter for performance reasons. + + :return: list of all :class:`.DataStream` instances on Device Cloud + + """ + streams = [] + + for page in self._get_streams_generator(stream_prefix, page_size): + streams.extend(page.values()) + + return streams + def get_stream(self, stream_id): """Return a reference to a stream with the given ``stream_id``