diff --git a/src/viam/app/data_client.py b/src/viam/app/data_client.py index 3b78aff54..f2d3feab5 100644 --- a/src/viam/app/data_client.py +++ b/src/viam/app/data_client.py @@ -1883,7 +1883,7 @@ async def list_data_pipelines(self, organization_id: str) -> List[DataPipeline]: response: ListDataPipelinesResponse = await self._data_pipelines_client.ListDataPipelines(request, metadata=self._metadata) return [DataClient.DataPipeline.from_proto(pipeline) for pipeline in response.data_pipelines] - async def create_data_pipeline(self, organization_id: str, name: str, mql_binary: List[Dict[str, Any]], schedule: str) -> str: + async def create_data_pipeline(self, organization_id: str, name: str, mql_binary: List[Dict[str, Any]], schedule: str, enable_backfill: bool) -> str: """Create a new data pipeline. :: @@ -1902,12 +1902,13 @@ async def create_data_pipeline(self, organization_id: str, name: str, mql_binary mql_binary (List[Dict[str, Any]]):The MQL pipeline to run, as a list of MongoDB aggregation pipeline stages. schedule (str): A cron expression representing the expected execution schedule in UTC (note this also defines the input time window; an hourly schedule would process 1 hour of data at a time). + enable_backfill (bool): When true, pipeline runs will be scheduled for the organization's past data. Returns: str: The ID of the newly created pipeline. """ binary: List[bytes] = [bson.encode(query) for query in mql_binary] - request = CreateDataPipelineRequest(organization_id=organization_id, name=name, mql_binary=binary, schedule=schedule) + request = CreateDataPipelineRequest(organization_id=organization_id, name=name, mql_binary=binary, schedule=schedule, enable_backfill=enable_backfill) response: CreateDataPipelineResponse = await self._data_pipelines_client.CreateDataPipeline(request, metadata=self._metadata) return response.id diff --git a/tests/mocks/services.py b/tests/mocks/services.py index bf39ecb35..936ae250d 100644 --- a/tests/mocks/services.py +++ b/tests/mocks/services.py @@ -1147,6 +1147,7 @@ async def CreateDataPipeline(self, stream: Stream[CreateDataPipelineRequest, Cre self.mql_binary = request.mql_binary self.schedule = request.schedule self.org_id = request.organization_id + self.enable_backfill = request.enable_backfill await stream.send_message(CreateDataPipelineResponse(id=self.create_response)) async def GetDataPipeline(self, stream: Stream[GetDataPipelineRequest, GetDataPipelineResponse]) -> None: diff --git a/tests/test_data_pipelines.py b/tests/test_data_pipelines.py index a7430a6d7..b53113f91 100644 --- a/tests/test_data_pipelines.py +++ b/tests/test_data_pipelines.py @@ -15,6 +15,7 @@ SCHEDULE = "0 0 * * *" UPDATED_SCHEDULE = "0 1 * * *" MQL_BINARY = [] +ENABLE_BACKFILL = True TIMESTAMP = datetime.fromtimestamp(0) TIMESTAMP_PROTO = datetime_to_timestamp(TIMESTAMP) @@ -78,12 +79,13 @@ class TestClient: async def test_create_data_pipeline(self, service: MockDataPipelines): async with ChannelFor([service]) as channel: client = DataClient(channel, DATA_SERVICE_METADATA) - id = await client.create_data_pipeline(ORG_ID, NAME, MQL_BINARY, SCHEDULE) + id = await client.create_data_pipeline(ORG_ID, NAME, MQL_BINARY, SCHEDULE, ENABLE_BACKFILL) assert id == ID assert service.name == NAME assert service.org_id == ORG_ID assert service.schedule == SCHEDULE assert service.mql_binary == MQL_BINARY + assert service.enable_backfill == ENABLE_BACKFILL async def test_get_data_pipeline(self, service: MockDataPipelines): async with ChannelFor([service]) as channel: