Skip to content

DATA-4220: Add enableBackfill boolean to CreateDataPipeline #941

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/viam/app/data_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1883,7 +1883,7 @@ async def list_data_pipelines(self, organization_id: str) -> List[DataPipeline]:
response: ListDataPipelinesResponse = await self._data_pipelines_client.ListDataPipelines(request, metadata=self._metadata)
return [DataClient.DataPipeline.from_proto(pipeline) for pipeline in response.data_pipelines]

async def create_data_pipeline(self, organization_id: str, name: str, mql_binary: List[Dict[str, Any]], schedule: str) -> str:
async def create_data_pipeline(self, organization_id: str, name: str, mql_binary: List[Dict[str, Any]], schedule: str, enable_backfill: bool) -> str:
"""Create a new data pipeline.

::
Expand All @@ -1902,12 +1902,13 @@ async def create_data_pipeline(self, organization_id: str, name: str, mql_binary
mql_binary (List[Dict[str, Any]]):The MQL pipeline to run, as a list of MongoDB aggregation pipeline stages.
schedule (str): A cron expression representing the expected execution schedule in UTC (note this also
defines the input time window; an hourly schedule would process 1 hour of data at a time).
enable_backfill (bool): When true, pipeline runs will be scheduled for the organization's past data.

Returns:
str: The ID of the newly created pipeline.
"""
binary: List[bytes] = [bson.encode(query) for query in mql_binary]
request = CreateDataPipelineRequest(organization_id=organization_id, name=name, mql_binary=binary, schedule=schedule)
request = CreateDataPipelineRequest(organization_id=organization_id, name=name, mql_binary=binary, schedule=schedule, enable_backfill=enable_backfill)
response: CreateDataPipelineResponse = await self._data_pipelines_client.CreateDataPipeline(request, metadata=self._metadata)
return response.id

Expand Down
1 change: 1 addition & 0 deletions tests/mocks/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,7 @@ async def CreateDataPipeline(self, stream: Stream[CreateDataPipelineRequest, Cre
self.mql_binary = request.mql_binary
self.schedule = request.schedule
self.org_id = request.organization_id
self.enable_backfill = request.enable_backfill
await stream.send_message(CreateDataPipelineResponse(id=self.create_response))

async def GetDataPipeline(self, stream: Stream[GetDataPipelineRequest, GetDataPipelineResponse]) -> None:
Expand Down
4 changes: 3 additions & 1 deletion tests/test_data_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
SCHEDULE = "0 0 * * *"
UPDATED_SCHEDULE = "0 1 * * *"
MQL_BINARY = []
ENABLE_BACKFILL = True

TIMESTAMP = datetime.fromtimestamp(0)
TIMESTAMP_PROTO = datetime_to_timestamp(TIMESTAMP)
Expand Down Expand Up @@ -78,12 +79,13 @@ class TestClient:
async def test_create_data_pipeline(self, service: MockDataPipelines):
async with ChannelFor([service]) as channel:
client = DataClient(channel, DATA_SERVICE_METADATA)
id = await client.create_data_pipeline(ORG_ID, NAME, MQL_BINARY, SCHEDULE)
id = await client.create_data_pipeline(ORG_ID, NAME, MQL_BINARY, SCHEDULE, ENABLE_BACKFILL)
assert id == ID
assert service.name == NAME
assert service.org_id == ORG_ID
assert service.schedule == SCHEDULE
assert service.mql_binary == MQL_BINARY
assert service.enable_backfill == ENABLE_BACKFILL

async def test_get_data_pipeline(self, service: MockDataPipelines):
async with ChannelFor([service]) as channel:
Expand Down
Loading