From 0afe16870117f91b92a169603f8971328f0db821 Mon Sep 17 00:00:00 2001 From: XxSURYANSHxX Date: Fri, 5 Jun 2026 14:48:29 +0530 Subject: [PATCH 1/2] Add Presidio text anonymization scaffold --- python_backend/main.py | 33 +- python_backend/services/ingestion.py | 57 ++- python_backend/services/text_anonymization.py | 451 ++++++++++++++++++ python_backend/tests/test_ingestion.py | 66 ++- .../tests/test_text_anonymization.py | 155 ++++++ 5 files changed, 748 insertions(+), 14 deletions(-) create mode 100644 python_backend/services/text_anonymization.py create mode 100644 python_backend/tests/test_text_anonymization.py diff --git a/python_backend/main.py b/python_backend/main.py index 279e4f3..ccb88e5 100644 --- a/python_backend/main.py +++ b/python_backend/main.py @@ -27,7 +27,13 @@ # Preview factory imports from services.preview.factory import PreviewFactory from services.audit_logger import AuditLogger -from services.ingestion import HEADER_READ_LIMIT, IngestionError, route_for_ingestion +from services.ingestion import ( + HEADER_READ_LIMIT, + TEXT_READ_LIMIT_BYTES, + IngestionError, + detect_modality, + route_for_ingestion, +) # PDF extraction imports try: @@ -300,16 +306,34 @@ async def ingest_file( profile: str = Form("strict"), ): """ - Privacy-safe Week 1 upload entry point. + Privacy-safe upload entry point. - This endpoint only detects modality and selects a placeholder handler. - It does not store raw uploads, index metadata, or trigger downstream flows. + Text uploads are anonymized. Other modalities still route to placeholder + handlers until their later milestones. """ try: header = await file.read(HEADER_READ_LIMIT) if not header: raise HTTPException(status_code=400, detail="Uploaded file is empty") + text_content = None + try: + modality = detect_modality(file.filename, file.content_type, header) + except IngestionError: + modality = None + + if modality == "text": + await file.seek(0) + text_content = await file.read(TEXT_READ_LIMIT_BYTES + 1) + if len(text_content) > TEXT_READ_LIMIT_BYTES: + raise HTTPException( + status_code=413, + detail=( + f"Text uploads must be {TEXT_READ_LIMIT_BYTES} " + "bytes or smaller" + ), + ) + await file.seek(0) return route_for_ingestion( @@ -317,6 +341,7 @@ async def ingest_file( content_type=file.content_type, header=header, profile=profile, + text_content=text_content, ) except IngestionError as e: raise HTTPException(status_code=e.status_code, detail=e.detail) diff --git a/python_backend/services/ingestion.py b/python_backend/services/ingestion.py index 89a52b8..5acd812 100644 --- a/python_backend/services/ingestion.py +++ b/python_backend/services/ingestion.py @@ -1,9 +1,14 @@ from typing import Any, Callable, Dict, Optional +from services.text_anonymization import ( + TextAnonymizationError, + anonymize_clinical_text, +) SUPPORTED_PROFILES = {"strict", "research"} SUPPORTED_MODALITIES = {"csv", "text", "dicom", "nifti", "wsi"} HEADER_READ_LIMIT = 4096 +TEXT_READ_LIMIT_BYTES = 256 * 1024 class IngestionError(ValueError): @@ -93,8 +98,33 @@ def anonymize_csv() -> Dict[str, str]: return _placeholder_result("anonymize_csv") -def anonymize_text() -> Dict[str, str]: - return _placeholder_result("anonymize_text") +def anonymize_text( + text_content: bytes, + profile: str, + study_salt: Optional[str] = None, +) -> Dict[str, Any]: + try: + text = text_content.decode("utf-8") + except UnicodeDecodeError as exc: + raise IngestionError("Text uploads must be UTF-8 encoded") from exc + + try: + result = anonymize_clinical_text( + text=text, + profile=profile, + study_salt=study_salt, + ) + except TextAnonymizationError as exc: + raise IngestionError(exc.detail, status_code=exc.status_code) from exc + + return { + "handler": "anonymize_text", + "routing_status": "handler_selected", + "anonymization_status": result["anonymization_status"], + "message": "Text anonymization completed.", + "anonymized_text": result["anonymized_text"], + "detected_entities": result["detected_entities"], + } def anonymize_dicom() -> Dict[str, str]: @@ -109,7 +139,7 @@ def anonymize_wsi() -> Dict[str, str]: return _placeholder_result("anonymize_wsi") -HANDLER_REGISTRY: Dict[str, Callable[[], Dict[str, str]]] = { +HANDLER_REGISTRY: Dict[str, Callable[..., Dict[str, Any]]] = { "csv": anonymize_csv, "text": anonymize_text, "dicom": anonymize_dicom, @@ -123,6 +153,8 @@ def route_for_ingestion( content_type: Optional[str], header: bytes, profile: str, + text_content: Optional[bytes] = None, + study_salt: Optional[str] = None, ) -> Dict[str, Any]: safe_name = _safe_filename(filename) privacy_profile = validate_privacy_profile(profile) @@ -135,8 +167,17 @@ def route_for_ingestion( status_code=500, ) - handler_result = handler() - return { + if modality == "text": + if text_content is None: + raise IngestionError( + "Text content was not provided for anonymization", + status_code=500, + ) + handler_result = handler(text_content, privacy_profile, study_salt) + else: + handler_result = handler() + + response = { "status": "success", "filename": safe_name, "detected_modality": modality, @@ -152,3 +193,9 @@ def route_for_ingestion( "blockchain_transaction": "pending", }, } + if "anonymized_text" in handler_result: + response["anonymized_text"] = handler_result["anonymized_text"] + if "detected_entities" in handler_result: + response["detected_entities"] = handler_result["detected_entities"] + + return response diff --git a/python_backend/services/text_anonymization.py b/python_backend/services/text_anonymization.py new file mode 100644 index 0000000..f2c12e2 --- /dev/null +++ b/python_backend/services/text_anonymization.py @@ -0,0 +1,451 @@ +import hashlib +import re +from functools import lru_cache +from typing import Any, Dict, Iterable, Iterator, List, Tuple + +SUPPORTED_PROFILES = {"strict", "research"} +DEFAULT_STUDY_SALT = "bio-block-week2-development-salt" +HASH_LENGTH = 8 + +_ID_ENTITY_TYPES = { + "MEDICAL_RECORD_NUMBER", + "PATIENT_ID", + "HEALTH_PLAN_ID", + "INSURANCE_ID", + "ACCESSION_NUMBER", + "DEVICE_ID", +} +_IDENTIFIER_AT_END = re.compile(r"([A-Z0-9][A-Z0-9-]{3,30})\b", re.IGNORECASE) + + +class TextAnonymizationError(ValueError): + def __init__(self, detail: str, status_code: int = 400): + super().__init__(detail) + self.detail = detail + self.status_code = status_code + + +def _blank_english_nlp_engine(): + """ + Minimal Presidio NLP engine backed by a blank spaCy tokenizer. + + Presidio's default engine may try to download a spaCy model. This keeps + Week 2 text analysis deterministic and offline for pattern recognizers. + """ + try: + import spacy + from presidio_analyzer.nlp_engine import NlpArtifacts, NlpEngine + except ImportError as exc: + raise TextAnonymizationError( + "Text anonymization NLP dependency is not available.", + status_code=503, + ) from exc + + class BlankEnglishNlpEngine(NlpEngine): + def __init__(self) -> None: + self._nlp = spacy.blank("en") + + def load(self) -> None: + return None + + def is_loaded(self) -> bool: + return True + + def get_supported_entities(self) -> List[str]: + return [] + + def get_supported_languages(self) -> List[str]: + return ["en"] + + def process_text(self, text: str, language: str): + doc = self._nlp(text) + return NlpArtifacts( + entities=[], + tokens=doc, + tokens_indices=[token.idx for token in doc], + lemmas=[token.text.lower() for token in doc], + nlp_engine=self, + language=language, + ) + + def process_batch( + self, + texts: Iterable[str], + language: str, + batch_size: int = 1, + n_process: int = 1, + **kwargs: Any, + ) -> Iterator[Tuple[str, Any]]: + for text in texts: + yield text, self.process_text(text, language) + + def is_stopword(self, word: str, language: str) -> bool: + return False + + def is_punct(self, word: str, language: str) -> bool: + return bool(word) and all(not char.isalnum() for char in word) + + return BlankEnglishNlpEngine() + + +def stable_hash(value: str, salt: str, length: int = HASH_LENGTH) -> str: + digest = hashlib.sha256(f"{salt}:{value.strip().lower()}".encode("utf-8")) + return digest.hexdigest().upper()[:length] + + +def pseudonymize_person(value: str, salt: str) -> str: + return f"Patient_{stable_hash(value, salt)}" + + +def pseudonymize_mrn(value: str, salt: str) -> str: + return f"MRN_{stable_hash(value, salt)}" + + +def pseudonymize_patient_id(value: str, salt: str) -> str: + return f"PATIENT_ID_{stable_hash(value, salt)}" + + +def pseudonymize_health_plan(value: str, salt: str) -> str: + return f"HEALTH_PLAN_{stable_hash(value, salt)}" + + +def pseudonymize_accession(value: str, salt: str) -> str: + return f"ACCESSION_{stable_hash(value, salt)}" + + +def pseudonymize_device(value: str, salt: str) -> str: + return f"DEVICE_{stable_hash(value, salt)}" + + +def _pattern(name: str, regex: str, score: float): + from presidio_analyzer import Pattern + + return Pattern(name=name, regex=regex, score=score) + + +def _clinical_recognizers() -> List[Any]: + from presidio_analyzer import PatternRecognizer + + return [ + PatternRecognizer( + supported_entity="MEDICAL_RECORD_NUMBER", + name="Clinical MRN Recognizer", + patterns=[ + _pattern( + "mrn_with_context", + ( + r"\b(?:MRN|medical\s+record(?:\s+number)?|" + r"hospital\s+number|chart\s+number)\s*[:#-]?\s*" + r"[A-Z0-9][A-Z0-9-]{4,20}\b" + ), + 0.85, + ) + ], + context=[ + "mrn", + "medical record", + "medical record number", + "hospital number", + "chart number", + ], + ), + PatternRecognizer( + supported_entity="PATIENT_ID", + name="Clinical Patient ID Recognizer", + patterns=[ + _pattern( + "patient_id_with_context", + ( + r"\b(?:patient\s+(?:id|identifier|number)|pt\s*id)" + r"\s*[:#-]?\s*[A-Z0-9][A-Z0-9-]{3,24}\b" + ), + 0.82, + ) + ], + context=[ + "patient id", + "patient number", + "patient identifier", + "pt id", + ], + ), + PatternRecognizer( + supported_entity="HEALTH_PLAN_ID", + name="Clinical Health Plan ID Recognizer", + patterns=[ + _pattern( + "health_plan_id_with_context", + ( + r"\b(?:health\s+plan(?:\s+(?:beneficiary\s+)?" + r"(?:id|number))?|beneficiary\s+id|insurance\s+" + r"(?:id|number)|policy\s+(?:id|number)|" + r"member\s+id|subscriber\s+id)\s*[:#-]?\s*" + r"[A-Z0-9][A-Z0-9-]{5,30}\b" + ), + 0.82, + ) + ], + context=[ + "health plan", + "beneficiary", + "insurance", + "policy", + "member id", + "subscriber id", + ], + ), + PatternRecognizer( + supported_entity="ACCESSION_NUMBER", + name="Clinical Accession Number Recognizer", + patterns=[ + _pattern( + "accession_with_context", + ( + r"\b(?:accession(?:\s+(?:number|no))?|acc\s*no)" + r"\s*[:#-]?\s*[A-Z0-9][A-Z0-9-]{4,30}\b" + ), + 0.8, + ) + ], + context=["accession", "accession number", "acc no", "accession no"], + ), + PatternRecognizer( + supported_entity="DEVICE_ID", + name="Clinical Device ID Recognizer", + patterns=[ + _pattern( + "device_id_with_context", + ( + r"\b(?:device(?:\s+id)?|serial(?:\s+number)?|" + r"implant|equipment)\s*[:#-]?\s*" + r"[A-Z0-9][A-Z0-9-]{5,30}\b" + ), + 0.78, + ) + ], + context=[ + "device", + "serial", + "serial number", + "device id", + "implant", + "equipment", + ], + ), + ] + + +def _common_phi_recognizers() -> List[Any]: + from presidio_analyzer import PatternRecognizer + + return [ + PatternRecognizer( + supported_entity="EMAIL_ADDRESS", + name="Email Address Recognizer", + patterns=[ + _pattern( + "email_address", + r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", + 0.95, + ) + ], + ), + PatternRecognizer( + supported_entity="PHONE_NUMBER", + name="Phone Number Recognizer", + patterns=[ + _pattern( + "us_phone_number", + ( + r"(? str: + normalized = (profile or "").strip().lower() + if normalized not in SUPPORTED_PROFILES: + raise TextAnonymizationError( + "Invalid privacy profile. Supported profiles: strict, research" + ) + return normalized + + +def _select_non_overlapping(results: Iterable[Any]) -> List[Any]: + ordered = sorted( + results, + key=lambda result: ( + -float(result.score or 0), + -(result.end - result.start), + result.start, + ), + ) + + selected = [] + occupied: List[Tuple[int, int]] = [] + for result in ordered: + overlaps = any( + result.start < end and start < result.end for start, end in occupied + ) + if overlaps: + continue + selected.append(result) + occupied.append((result.start, result.end)) + + return sorted(selected, key=lambda result: result.start) + + +def _hash_value(entity_type: str, value: str) -> str: + if entity_type not in _ID_ENTITY_TYPES: + return value + + match = _IDENTIFIER_AT_END.search(value.strip()) + if match: + return match.group(1) + return value + + +def _replacement_for(entity_type: str, value: str, salt: str) -> str: + hash_value = _hash_value(entity_type, value) + if entity_type == "PERSON": + return pseudonymize_person(hash_value, salt) + if entity_type == "MEDICAL_RECORD_NUMBER": + return pseudonymize_mrn(hash_value, salt) + if entity_type == "PATIENT_ID": + return pseudonymize_patient_id(hash_value, salt) + if entity_type in {"HEALTH_PLAN_ID", "INSURANCE_ID"}: + return pseudonymize_health_plan(hash_value, salt) + if entity_type == "ACCESSION_NUMBER": + return pseudonymize_accession(hash_value, salt) + if entity_type == "DEVICE_ID": + return pseudonymize_device(hash_value, salt) + if entity_type == "EMAIL_ADDRESS": + return "" + if entity_type == "PHONE_NUMBER": + return "" + if entity_type in {"US_SSN", "SSN"}: + return "" + if entity_type == "DATE_TIME": + return "" + return f"" + + +def _replace_entities(text: str, results: List[Any], salt: str) -> str: + anonymized_parts = [] + cursor = 0 + + for result in results: + anonymized_parts.append(text[cursor : result.start]) + original_value = text[result.start : result.end] + anonymized_parts.append( + _replacement_for(result.entity_type, original_value, salt) + ) + cursor = result.end + + anonymized_parts.append(text[cursor:]) + return "".join(anonymized_parts) + + +def _entity_summary(results: Iterable[Any]) -> Dict[str, int]: + summary: Dict[str, int] = {} + for result in results: + summary[result.entity_type] = summary.get(result.entity_type, 0) + 1 + return summary + + +def anonymize_clinical_text( + text: str, + profile: str = "strict", + study_salt: str | None = None, +) -> Dict[str, Any]: + if not isinstance(text, str): + raise TextAnonymizationError("Text input must be a string") + if not text.strip(): + raise TextAnonymizationError("Text input is empty") + + _normalize_profile(profile) + salt = study_salt or DEFAULT_STUDY_SALT + + analyzer = _get_analyzer() + try: + results = analyzer.analyze( + text=text, + language="en", + entities=[ + "PERSON", + "MEDICAL_RECORD_NUMBER", + "PATIENT_ID", + "HEALTH_PLAN_ID", + "INSURANCE_ID", + "ACCESSION_NUMBER", + "DEVICE_ID", + "EMAIL_ADDRESS", + "PHONE_NUMBER", + "US_SSN", + "SSN", + "DATE_TIME", + ], + ) + except Exception as exc: + raise TextAnonymizationError( + "Text anonymization failed", status_code=500 + ) from exc + + selected_results = _select_non_overlapping(results) + return { + "anonymization_status": "completed", + "anonymized_text": _replace_entities(text, selected_results, salt), + "detected_entities": _entity_summary(selected_results), + } diff --git a/python_backend/tests/test_ingestion.py b/python_backend/tests/test_ingestion.py index 8ff5633..a289d24 100644 --- a/python_backend/tests/test_ingestion.py +++ b/python_backend/tests/test_ingestion.py @@ -6,11 +6,10 @@ from fastapi.testclient import TestClient - sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from main import app, ingest_file # noqa: E402 - +from services.ingestion import TEXT_READ_LIMIT_BYTES # noqa: E402 client = TestClient(app) @@ -37,7 +36,9 @@ async def seek(self, offset): self.position = offset -def upload_file(filename, content, content_type="application/octet-stream", profile=None): +def upload_file( + filename, content, content_type="application/octet-stream", profile=None +): data = {} if profile is not None: data["profile"] = profile @@ -68,8 +69,44 @@ def test_csv_routes_to_csv_handler(self): self.assert_routes_to(response, "csv", "anonymize_csv") def test_text_routes_to_text_handler(self): - response = upload_file("note.txt", b"clinical note scaffold\n", "text/plain") - self.assert_routes_to(response, "text", "anonymize_text") + response = upload_file( + "note.txt", + (b"Patient has MRN: 123456 and diabetes. " b"Email john.doe@example.com."), + "text/plain", + ) + + self.assertEqual(response.status_code, 200) + body = response.json() + self.assertEqual(body["status"], "success") + self.assertEqual(body["detected_modality"], "text") + self.assertEqual(body["handler"], "anonymize_text") + self.assertEqual(body["routing_status"], "handler_selected") + self.assertEqual(body["anonymization_status"], "completed") + self.assertNotIn("123456", body["anonymized_text"]) + self.assertNotIn("john.doe@example.com", body["anonymized_text"]) + self.assertIn("MRN_", body["anonymized_text"]) + self.assertIn("", body["anonymized_text"]) + self.assertIn("diabetes", body["anonymized_text"]) + self.assertEqual(body["detected_entities"]["MEDICAL_RECORD_NUMBER"], 1) + self.assertEqual(body["detected_entities"]["EMAIL_ADDRESS"], 1) + self.assertEqual(body["downstream"]["ipfs_chunking"], "pending") + self.assertEqual(body["downstream"]["cid_encryption"], "pending") + self.assertEqual(body["downstream"]["metadata_indexing"], "pending") + self.assertEqual(body["downstream"]["blockchain_transaction"], "pending") + + def test_text_routes_by_extension_despite_mime_mismatch(self): + response = upload_file( + "note.txt", + b"Patient ID PT-1001 was admitted.", + "application/octet-stream", + ) + + self.assertEqual(response.status_code, 200) + body = response.json() + self.assertEqual(body["detected_modality"], "text") + self.assertEqual(body["anonymization_status"], "completed") + self.assertNotIn("PT-1001", body["anonymized_text"]) + self.assertIn("PATIENT_ID_", body["anonymized_text"]) def test_dicom_extension_routes_to_dicom_handler(self): response = upload_file("scan.dcm", b"not-real-dicom-routing-only") @@ -110,6 +147,25 @@ def test_invalid_profile_is_rejected(self): self.assertEqual(response.status_code, 400) self.assertIn("Invalid privacy profile", response.json()["detail"]) + def test_text_upload_with_unsupported_encoding_is_rejected(self): + response = upload_file("note.txt", b"\xff\xfe\x00", "text/plain") + + self.assertEqual(response.status_code, 400) + self.assertEqual( + response.json()["detail"], + "Text uploads must be UTF-8 encoded", + ) + + def test_large_text_upload_is_rejected(self): + response = upload_file( + "large-note.txt", + b"a" * (TEXT_READ_LIMIT_BYTES + 1), + "text/plain", + ) + + self.assertEqual(response.status_code, 413) + self.assertIn("Text uploads must be", response.json()["detail"]) + def test_dicom_preamble_detection_routes_to_dicom_handler(self): dicom_header = b"\x00" * 128 + b"DICM" + b"routing scaffold" response = upload_file("scan.bin", dicom_header, "application/octet-stream") diff --git a/python_backend/tests/test_text_anonymization.py b/python_backend/tests/test_text_anonymization.py new file mode 100644 index 0000000..682a7b8 --- /dev/null +++ b/python_backend/tests/test_text_anonymization.py @@ -0,0 +1,155 @@ +import os +import sys + +import pytest + +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from services.text_anonymization import ( # noqa: E402 + TextAnonymizationError, + anonymize_clinical_text, +) + + +def test_mrn_with_context_is_detected_and_replaced(): + result = anonymize_clinical_text( + "Patient has MRN: 123456.", + study_salt="study-a", + ) + + assert result["anonymization_status"] == "completed" + assert "123456" not in result["anonymized_text"] + assert "MRN_" in result["anonymized_text"] + assert result["detected_entities"]["MEDICAL_RECORD_NUMBER"] == 1 + + +def test_random_number_without_context_is_not_mrn(): + text = "The room number 123456 was cleaned." + result = anonymize_clinical_text(text, study_salt="study-a") + + assert result["anonymized_text"] == text + assert "MEDICAL_RECORD_NUMBER" not in result["detected_entities"] + + +def test_mrn_context_is_case_insensitive_and_accepts_separators(): + result = anonymize_clinical_text( + "mrn: 123456. Medical Record Number - 654321.", + study_salt="study-a", + ) + + assert "123456" not in result["anonymized_text"] + assert "654321" not in result["anonymized_text"] + assert result["detected_entities"]["MEDICAL_RECORD_NUMBER"] == 2 + + +def test_patient_id_is_detected_and_replaced(): + result = anonymize_clinical_text( + "Patient ID PT-1001 was admitted.", + study_salt="study-a", + ) + + assert "PT-1001" not in result["anonymized_text"] + assert "PATIENT_ID_" in result["anonymized_text"] + assert result["detected_entities"]["PATIENT_ID"] == 1 + + +def test_health_plan_id_is_detected_and_replaced(): + result = anonymize_clinical_text( + "Insurance ID ABC123456789 was verified.", + study_salt="study-a", + ) + + assert "ABC123456789" not in result["anonymized_text"] + assert "HEALTH_PLAN_" in result["anonymized_text"] + assert result["detected_entities"]["HEALTH_PLAN_ID"] == 1 + + +def test_deterministic_surrogate_consistency_with_same_salt(): + text = "Patient has MRN: 123456." + + first = anonymize_clinical_text(text, study_salt="study-a") + second = anonymize_clinical_text(text, study_salt="study-a") + + assert first["anonymized_text"] == second["anonymized_text"] + + +def test_different_salt_changes_surrogate(): + text = "Patient has MRN: 123456." + + first = anonymize_clinical_text(text, study_salt="study-a") + second = anonymize_clinical_text(text, study_salt="study-b") + + assert first["anonymized_text"] != second["anonymized_text"] + assert "123456" not in first["anonymized_text"] + assert "123456" not in second["anonymized_text"] + + +def test_email_is_redacted(): + result = anonymize_clinical_text( + "Contact john.doe@example.com after review.", + study_salt="study-a", + ) + + assert "john.doe@example.com" not in result["anonymized_text"] + assert "" in result["anonymized_text"] + assert result["detected_entities"]["EMAIL_ADDRESS"] == 1 + + +def test_phone_is_redacted(): + result = anonymize_clinical_text( + "Call 555-123-4567 for scheduling.", + study_salt="study-a", + ) + + assert "555-123-4567" not in result["anonymized_text"] + assert "" in result["anonymized_text"] + assert result["detected_entities"]["PHONE_NUMBER"] == 1 + + +def test_medical_terms_are_preserved(): + text = "History includes myocardial infarction, diabetes, aspirin, and CT scan." + result = anonymize_clinical_text(text, study_salt="study-a") + + assert "myocardial infarction" in result["anonymized_text"] + assert "diabetes" in result["anonymized_text"] + assert "aspirin" in result["anonymized_text"] + assert "CT scan" in result["anonymized_text"] + + +def test_no_phi_text_succeeds_and_is_preserved(): + text = "Patient was diagnosed with diabetes and prescribed metformin." + result = anonymize_clinical_text(text, study_salt="study-a") + + assert result["anonymization_status"] == "completed" + assert result["anonymized_text"] == text + assert result["detected_entities"] == {} + + +def test_empty_text_is_rejected(): + with pytest.raises(TextAnonymizationError) as exc: + anonymize_clinical_text(" ", study_salt="study-a") + + assert exc.value.status_code == 400 + assert exc.value.detail == "Text input is empty" + + +def test_entity_summary_does_not_include_raw_values(): + result = anonymize_clinical_text( + "Patient has MRN: 123456 and email john.doe@example.com.", + study_salt="study-a", + ) + + assert "123456" not in result["detected_entities"] + assert "john.doe@example.com" not in result["detected_entities"] + assert all(isinstance(key, str) for key in result["detected_entities"]) + assert all(isinstance(value, int) for value in result["detected_entities"].values()) + + +def test_overlapping_email_text_does_not_leave_email_exposed(): + result = anonymize_clinical_text( + "John Doe was notified.", + study_salt="study-a", + ) + + assert "john.doe@example.com" not in result["anonymized_text"] + assert "" in result["anonymized_text"] From 98da5a7c19e160533917cc9e73ff894343e79974 Mon Sep 17 00:00:00 2001 From: XxSURYANSHxX Date: Tue, 9 Jun 2026 14:57:39 +0530 Subject: [PATCH 2/2] Move text anonymization salt to environment config --- python_backend/.env.example | 5 +++ python_backend/services/text_anonymization.py | 37 ++++++++++++++++++- python_backend/tests/test_ingestion.py | 1 + .../tests/test_text_anonymization.py | 12 ++++++ 4 files changed, 53 insertions(+), 2 deletions(-) diff --git a/python_backend/.env.example b/python_backend/.env.example index 4d3c214..7379bdd 100644 --- a/python_backend/.env.example +++ b/python_backend/.env.example @@ -15,3 +15,8 @@ PORT=3002 # CORS Allowed Origin (default: * — restrict in production) # ALLOWED_ORIGIN=https://your-frontend-domain.com + +# Text anonymization surrogate salt. +# Keep the real value in your local .env or deployment secrets. +# Do not commit the real salt. +# BIOBLOCK_STUDY_SALT=replace-with-a-long-random-secret diff --git a/python_backend/services/text_anonymization.py b/python_backend/services/text_anonymization.py index f2c12e2..d34d710 100644 --- a/python_backend/services/text_anonymization.py +++ b/python_backend/services/text_anonymization.py @@ -1,10 +1,12 @@ import hashlib +import os import re from functools import lru_cache +from pathlib import Path from typing import Any, Dict, Iterable, Iterator, List, Tuple SUPPORTED_PROFILES = {"strict", "research"} -DEFAULT_STUDY_SALT = "bio-block-week2-development-salt" +STUDY_SALT_ENV_VAR = "BIOBLOCK_STUDY_SALT" HASH_LENGTH = 8 _ID_ENTITY_TYPES = { @@ -405,6 +407,37 @@ def _entity_summary(results: Iterable[Any]) -> Dict[str, int]: return summary +def _read_local_env_salt() -> str | None: + env_path = Path(__file__).resolve().parent.parent / ".env" + if not env_path.exists(): + return None + + for line in env_path.read_text(encoding="utf-8").splitlines(): + cleaned = line.strip() + if not cleaned or cleaned.startswith("#") or "=" not in cleaned: + continue + key, value = cleaned.split("=", 1) + if key.strip() == STUDY_SALT_ENV_VAR: + return value.strip().strip("\"'") + + return None + + +def _resolve_study_salt(study_salt: str | None) -> str: + configured_salt = ( + study_salt + or os.getenv(STUDY_SALT_ENV_VAR) + or _read_local_env_salt() + ) + if configured_salt and configured_salt.strip(): + return configured_salt.strip() + + raise TextAnonymizationError( + f"Text anonymization salt is not configured. Set {STUDY_SALT_ENV_VAR}.", + status_code=500, + ) + + def anonymize_clinical_text( text: str, profile: str = "strict", @@ -416,7 +449,7 @@ def anonymize_clinical_text( raise TextAnonymizationError("Text input is empty") _normalize_profile(profile) - salt = study_salt or DEFAULT_STUDY_SALT + salt = _resolve_study_salt(study_salt) analyzer = _get_analyzer() try: diff --git a/python_backend/tests/test_ingestion.py b/python_backend/tests/test_ingestion.py index a289d24..6eafff7 100644 --- a/python_backend/tests/test_ingestion.py +++ b/python_backend/tests/test_ingestion.py @@ -7,6 +7,7 @@ from fastapi.testclient import TestClient sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +os.environ.setdefault("BIOBLOCK_STUDY_SALT", "week2-test-salt") from main import app, ingest_file # noqa: E402 from services.ingestion import TEXT_READ_LIMIT_BYTES # noqa: E402 diff --git a/python_backend/tests/test_text_anonymization.py b/python_backend/tests/test_text_anonymization.py index 682a7b8..0b636c4 100644 --- a/python_backend/tests/test_text_anonymization.py +++ b/python_backend/tests/test_text_anonymization.py @@ -5,6 +5,7 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from services import text_anonymization # noqa: E402 from services.text_anonymization import ( # noqa: E402 TextAnonymizationError, anonymize_clinical_text, @@ -133,6 +134,17 @@ def test_empty_text_is_rejected(): assert exc.value.detail == "Text input is empty" +def test_missing_salt_is_rejected(monkeypatch): + monkeypatch.delenv(text_anonymization.STUDY_SALT_ENV_VAR, raising=False) + monkeypatch.setattr(text_anonymization, "_read_local_env_salt", lambda: None) + + with pytest.raises(TextAnonymizationError) as exc: + anonymize_clinical_text("Patient has MRN: 123456.") + + assert exc.value.status_code == 500 + assert text_anonymization.STUDY_SALT_ENV_VAR in exc.value.detail + + def test_entity_summary_does_not_include_raw_values(): result = anonymize_clinical_text( "Patient has MRN: 123456 and email john.doe@example.com.",