Skip to content

Commit cecd326

Browse files
JadeCaraJade Wibbels
andauthored
subtle dsr processing issue (#6469)
Co-authored-by: Jade Wibbels <[email protected]>
1 parent 74ba719 commit cecd326

File tree

5 files changed

+235
-1
lines changed

5 files changed

+235
-1
lines changed

src/fides/api/service/privacy_request/request_service.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,11 @@ def poll_for_exited_privacy_request_tasks(self: DatabaseTask) -> Set[str]:
191191
db.query(PrivacyRequest)
192192
.filter(
193193
PrivacyRequest.status.in_(
194-
[PrivacyRequestStatus.in_processing, PrivacyRequestStatus.approved]
194+
[
195+
PrivacyRequestStatus.in_processing,
196+
PrivacyRequestStatus.approved,
197+
PrivacyRequestStatus.requires_input,
198+
]
195199
)
196200
)
197201
# Only look at Privacy Requests that haven't been deleted

src/fides/api/task/manual/manual_task_graph_task.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
StatusType,
1616
)
1717
from fides.api.models.privacy_request import PrivacyRequest
18+
from fides.api.models.worker_task import ExecutionLogStatus
1819
from fides.api.schemas.policy import ActionType
1920
from fides.api.schemas.privacy_request import PrivacyRequestStatus
2021
from fides.api.task.graph_task import GraphTask, retry
@@ -40,6 +41,17 @@ def access_request(self, *inputs: list[Row]) -> list[Row]:
4041
db = self.resources.session
4142
collection_address = self.execution_node.address
4243

44+
if self.resources.request.policy.get_action_type() == ActionType.erasure:
45+
# We're in an erasure privacy request's access phase - complete access task immediately
46+
# since access is just for data collection to support erasure, not for user data access
47+
self.update_status(
48+
"Access task completed immediately for erasure privacy request (data collection only)",
49+
[],
50+
ActionType.access,
51+
ExecutionLogStatus.complete,
52+
)
53+
return []
54+
4355
# Verify this is a manual task address
4456
if not ManualTaskAddress.is_manual_task_address(collection_address):
4557
raise ValueError(f"Invalid manual task address: {collection_address}")

tests/api/task/manual/test_manual_task_graph_task.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
ManualTaskInstance,
88
ManualTaskSubmission,
99
)
10+
from fides.api.models.worker_task import ExecutionLogStatus
11+
from fides.api.schemas.policy import ActionType
12+
from fides.api.schemas.privacy_request import PrivacyRequestStatus
1013

1114

1215
class TestManualTaskDataAggregation:
@@ -322,3 +325,25 @@ def test_aggregate_submission_data_attachment_field_no_attachments(
322325
# Should return None for attachment field with no attachments
323326
assert "user_email" in result
324327
assert result["user_email"] is None
328+
329+
def test_access_request_early_return_for_erasure_policy(
330+
self, build_erasure_graph_task, db
331+
):
332+
"""Test that access_request returns early and completes immediately for erasure policy"""
333+
manual_task, graph_task = build_erasure_graph_task
334+
privacy_request = graph_task.resources.request
335+
336+
# Set privacy request to requires_input status
337+
privacy_request.status = PrivacyRequestStatus.requires_input
338+
privacy_request.save(db)
339+
340+
# Call access_request - should return early due to erasure policy
341+
result = graph_task.access_request([])
342+
343+
# Should return empty list (early return for erasure policy)
344+
assert result == []
345+
346+
# Privacy request status should remain requires_input since the early return path
347+
# does not call _return_to_in_processing()
348+
db.refresh(privacy_request)
349+
assert privacy_request.status == PrivacyRequestStatus.requires_input

tests/api/task/manual/test_manual_task_integration.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -974,3 +974,118 @@ def test_manual_task_traversal_integration_with_nested_groups(
974974

975975
# Verify traversal is valid
976976
assert traversal is not None
977+
978+
979+
@pytest.mark.integration
980+
class TestManualTaskIntegrationStatusUpdates:
981+
"""Test that manual task status updates correctly"""
982+
983+
@pytest.mark.usefixtures("erasure_privacy_request")
984+
def test_erasure_request_updates_privacy_request_status_when_manual_task_completed(
985+
self, build_erasure_graph_task, db
986+
):
987+
"""Test that erasure_request properly updates privacy request status when manual task is completed"""
988+
manual_task, graph_task = build_erasure_graph_task
989+
privacy_request = graph_task.resources.request
990+
991+
# Set privacy request to requires_input status to simulate the scenario
992+
privacy_request.status = PrivacyRequestStatus.requires_input
993+
privacy_request.save(db)
994+
995+
# Create a manual task instance for this privacy request
996+
instance = ManualTaskInstance.create(
997+
db=db,
998+
data={
999+
"task_id": manual_task.id,
1000+
"config_id": manual_task.configs[0].id, # Use the first config
1001+
"entity_id": privacy_request.id,
1002+
"entity_type": ManualTaskEntityType.privacy_request.value,
1003+
"status": StatusType.pending.value,
1004+
},
1005+
)
1006+
1007+
# Create a submission to complete the manual task
1008+
field = manual_task.configs[0].field_definitions[0] # Use the first field
1009+
submission = ManualTaskSubmission.create(
1010+
db=db,
1011+
data={
1012+
"task_id": manual_task.id,
1013+
"config_id": manual_task.configs[0].id,
1014+
"field_id": field.id,
1015+
"instance_id": instance.id,
1016+
"submitted_by": None,
1017+
"data": {
1018+
"field_type": ManualTaskFieldType.text.value,
1019+
"value": "test_value",
1020+
},
1021+
},
1022+
)
1023+
1024+
# Mark the instance as completed
1025+
instance.status = StatusType.completed.value
1026+
instance.save(db)
1027+
1028+
# Call erasure_request - should update status and return 0
1029+
result = graph_task.erasure_request([])
1030+
1031+
# Should return 0 (manual tasks don't mask data directly)
1032+
assert result == 0
1033+
1034+
# Privacy request status should remain requires_input since the early return path
1035+
db.refresh(privacy_request)
1036+
assert privacy_request.status == PrivacyRequestStatus.requires_input
1037+
1038+
def test_access_request_updates_privacy_request_status_when_manual_task_completed(
1039+
self, build_graph_task, db
1040+
):
1041+
"""Test that access_request properly updates privacy request status when manual task is completed"""
1042+
manual_task, graph_task = build_graph_task
1043+
privacy_request = graph_task.resources.request
1044+
1045+
# Set privacy request to requires_input status to simulate the scenario
1046+
privacy_request.status = PrivacyRequestStatus.requires_input
1047+
privacy_request.save(db)
1048+
1049+
# Create a manual task instance for this privacy request
1050+
instance = ManualTaskInstance.create(
1051+
db=db,
1052+
data={
1053+
"task_id": manual_task.id,
1054+
"config_id": manual_task.configs[0].id, # Use the first config
1055+
"entity_id": privacy_request.id,
1056+
"entity_type": ManualTaskEntityType.privacy_request.value,
1057+
"status": StatusType.pending.value,
1058+
},
1059+
)
1060+
1061+
# Create a submission to complete the manual task
1062+
field = manual_task.configs[0].field_definitions[0] # Use the first field
1063+
submission = ManualTaskSubmission.create(
1064+
db=db,
1065+
data={
1066+
"task_id": manual_task.id,
1067+
"config_id": manual_task.configs[0].id,
1068+
"field_id": field.id,
1069+
"instance_id": instance.id,
1070+
"submitted_by": None,
1071+
"data": {
1072+
"field_type": ManualTaskFieldType.text.value,
1073+
"value": "test_value",
1074+
},
1075+
},
1076+
)
1077+
1078+
# Mark the instance as completed
1079+
instance.status = StatusType.completed.value
1080+
instance.save(db)
1081+
1082+
# Call access_request - should update status and return data
1083+
result = graph_task.access_request([])
1084+
1085+
# Should return the data from the manual task
1086+
assert len(result) > 0
1087+
assert "user_email" in result[0] # The field key from the fixture
1088+
1089+
# Privacy request status should remain requires_input since the early return path
1090+
db.refresh(privacy_request)
1091+
assert privacy_request.status == PrivacyRequestStatus.requires_input

tests/ops/service/privacy_request/test_request_service.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,84 @@ def test_approved_privacy_request_task_with_errored_tasks(
187187
db.refresh(privacy_request)
188188
assert privacy_request.status == PrivacyRequestStatus.error
189189

190+
def test_requires_input_privacy_request_task_with_errored_tasks(
191+
self, db, privacy_request_requires_input
192+
):
193+
"""Privacy requests in requires_input status should be monitored for task errors
194+
and marked as errored if tasks fail.
195+
196+
The "poll_for_exited_privacy_request_tasks" task looks for Privacy Requests in
197+
"approved", "in_processing", and "requires_input" states.
198+
"""
199+
200+
# Create the necessary tasks for this privacy request (similar to request_task fixture)
201+
root_task = RequestTask.create(
202+
db,
203+
data={
204+
"action_type": ActionType.access,
205+
"status": "complete",
206+
"privacy_request_id": privacy_request_requires_input.id,
207+
"collection_address": "__ROOT__:__ROOT__",
208+
"dataset_name": "__ROOT__",
209+
"collection_name": "__ROOT__",
210+
"upstream_tasks": [],
211+
"downstream_tasks": ["test_dataset:test_collection"],
212+
"all_descendant_tasks": [
213+
"test_dataset:test_collection",
214+
"__TERMINATE__:__TERMINATE__",
215+
],
216+
},
217+
)
218+
219+
request_task = RequestTask.create(
220+
db,
221+
data={
222+
"action_type": ActionType.access,
223+
"status": "pending",
224+
"privacy_request_id": privacy_request_requires_input.id,
225+
"collection_address": "test_dataset:test_collection",
226+
"dataset_name": "test_dataset",
227+
"collection_name": "test_collection",
228+
"upstream_tasks": ["__ROOT__:__ROOT__"],
229+
"downstream_tasks": ["__TERMINATE__:__TERMINATE__"],
230+
"all_descendant_tasks": ["__TERMINATE__:__TERMINATE__"],
231+
},
232+
)
233+
234+
terminator_task = RequestTask.create(
235+
db,
236+
data={
237+
"action_type": ActionType.access,
238+
"status": "pending",
239+
"privacy_request_id": privacy_request_requires_input.id,
240+
"collection_address": "__TERMINATE__:__TERMINATE__",
241+
"dataset_name": "__TERMINATE__",
242+
"collection_name": "__TERMINATE__",
243+
"upstream_tasks": ["test_dataset:test_collection"],
244+
"downstream_tasks": [],
245+
"all_descendant_tasks": [],
246+
},
247+
)
248+
249+
# Put all tasks in an exited state - completed, errored, or skipped
250+
assert root_task.status == ExecutionLogStatus.complete
251+
request_task.update_status(db, ExecutionLogStatus.error)
252+
terminator_task.update_status(db, ExecutionLogStatus.error)
253+
254+
errored_prs = poll_for_exited_privacy_request_tasks.delay().get()
255+
assert errored_prs == {privacy_request_requires_input.id}
256+
257+
db.refresh(privacy_request_requires_input)
258+
assert privacy_request_requires_input.status == PrivacyRequestStatus.error
259+
260+
# Clean up created tasks
261+
try:
262+
root_task.delete(db)
263+
request_task.delete(db)
264+
terminator_task.delete(db)
265+
except Exception:
266+
pass
267+
190268
def test_request_tasks_all_exited_none_errored(
191269
self, db, privacy_request, request_task
192270
):

0 commit comments

Comments
 (0)