diff --git a/cdisc_rules_engine/constants/permissibility.py b/cdisc_rules_engine/constants/permissibility.py index 029705f01..5ca672c5d 100644 --- a/cdisc_rules_engine/constants/permissibility.py +++ b/cdisc_rules_engine/constants/permissibility.py @@ -1,6 +1,4 @@ REQUIRED = "Req" EXPECTED = "Exp" PERMISSIBLE = "Perm" -REQUIRED_MODEL_VARIABLES = {"STUDYID", "USUBJID", "DOMAIN"} -SEQ_VARIABLE = "--SEQ" PERMISSIBILITY_KEY = "core" diff --git a/cdisc_rules_engine/operations/base_operation.py b/cdisc_rules_engine/operations/base_operation.py index 2c9b4efbc..ed66cdb96 100644 --- a/cdisc_rules_engine/operations/base_operation.py +++ b/cdisc_rules_engine/operations/base_operation.py @@ -1,9 +1,6 @@ from cdisc_rules_engine.models.operation_params import OperationParams from cdisc_rules_engine.constants.permissibility import ( - REQUIRED, PERMISSIBLE, - REQUIRED_MODEL_VARIABLES, - SEQ_VARIABLE, PERMISSIBILITY_KEY, ) from abc import abstractmethod @@ -256,16 +253,8 @@ def get_allowed_variable_permissibility(self, variable_metadata: dict): """ Returns the permissibility value of a variable allowed in the current domain """ - variable_name = variable_metadata.get("name") if PERMISSIBILITY_KEY in variable_metadata: return variable_metadata[PERMISSIBILITY_KEY] - elif variable_name in REQUIRED_MODEL_VARIABLES: - return REQUIRED - elif variable_name.replace("--", self.params.domain) == SEQ_VARIABLE.replace( - "--", self.params.domain - ): - return REQUIRED - return PERMISSIBLE def _get_variable_names_list(self, domain, dataframe): diff --git a/cdisc_rules_engine/utilities/sdtm_utilities.py b/cdisc_rules_engine/utilities/sdtm_utilities.py index 0a574c4a6..97168e72f 100644 --- a/cdisc_rules_engine/utilities/sdtm_utilities.py +++ b/cdisc_rules_engine/utilities/sdtm_utilities.py @@ -225,7 +225,7 @@ def group_class_variables_by_role( return identifier_vars, timing_vars -def get_variables_metadata_from_standard_model( +def get_variables_metadata_from_standard_model( # noqa domain: str, dataframe, datasets: Iterable[SDTMDatasetMetadata], @@ -234,84 +234,63 @@ def get_variables_metadata_from_standard_model( library_metadata: LibraryMetadataContainer, ) -> List[dict]: """ - Gets variables metadata for the given class and domain from cache. - The cache stores CDISC Library metadata. - Retrieves variables metadata from IG, - unless the dataset class is a GENERAL OBSERVATIONS domain. - In this case variables metadata is pulled from the model. - - Args: - standard: Standard to validate against - standard_version: Version of the standard to validate against - domain: The domain being validated - dataframe: The dataset being a evaluated. - datasets: List of all datasets in the study - dataset_path: File path of the target dataset - cache: Cache service for retrieving previously cached library data - data_service: Data service instance - Returns: - [ - { - "label":"Study Identifier", - "name":"STUDYID", - "ordinal":"1", - "role":"Identifier", - ... - }, - { - "label":"Domain Abbreviation", - "name":"DOMAIN", - "ordinal":"2", - "role":"Identifier" - }, - ... - ] + gets class via the IG then uses the class to get the variables via the model + classes outside of general observation, we check the model for their definition + if they are not there, differ to the standard definition of the domain """ - # get model details from cache - model_details = library_metadata.model_metadata if ( domain and (domain.upper().startswith("SUPP") or domain.upper().startswith("SQ")) and len(domain) > 2 ): domain = "SUPPQUAL" - domain_details = get_model_domain_metadata(model_details, domain) - variables_metadata = [] - - if domain_details: - # Domain found in the model - class_name = convert_library_class_name_to_ct_class( - domain_details["_links"]["parentClass"]["title"] - ) - class_details = get_class_metadata(model_details, class_name) - variables_metadata = domain_details.get("datasetVariables", []) - if variables_metadata: - variables_metadata.sort(key=lambda item: int(item["ordinal"])) - else: - # Domain not found in the model. Detect class name from data - domain_details = search_in_list_of_dicts( - datasets, - lambda item: domain == (item.domain or item.name), - ) - class_name = data_service.get_dataset_class( - dataframe, dataset_path, datasets, domain_details - ) - class_name = convert_library_class_name_to_ct_class(class_name) - class_details = get_class_metadata(model_details, class_name) + standard_details = library_metadata.standard_metadata + model_details = library_metadata.model_metadata + IG_class_details, IG_domain_details = get_class_and_domain_metadata( + standard_details, domain + ) + class_name = convert_library_class_name_to_ct_class(IG_class_details.get("name")) if class_name in DETECTABLE_CLASSES: + model_class_details = get_class_metadata(model_details, class_name) ( identifiers_metadata, - variables_metadata, + class_variables_metadata, timing_metadata, - ) = get_allowed_class_variables(model_details, class_details) - # Identifiers are added to the beginning and Timing to the end + ) = get_allowed_class_variables(model_details, model_class_details) + variables_metadata = [] if identifiers_metadata: - variables_metadata = identifiers_metadata + variables_metadata + variables_metadata = identifiers_metadata + variables_metadata = variables_metadata + class_variables_metadata if timing_metadata: variables_metadata = variables_metadata + timing_metadata - - return variables_metadata + return variables_metadata + else: + # First, try to get class metadata and check for classVariables i.e. AP class + class_details = get_class_metadata(model_details, class_name) + class_variables = class_details.get("classVariables", []) + if class_variables: + class_variables.sort(key=lambda item: int(item["ordinal"])) + return class_variables + else: + # Second, check if domain exists in model datasets + domain_details = get_model_domain_metadata(model_details, domain) + if domain_details: + dataset_variables = domain_details.get("datasetVariables", []) + if dataset_variables: + dataset_variables.sort(key=lambda item: int(item["ordinal"])) + return dataset_variables + # Third, fall back to standard datasets + for cls in standard_details.get("classes", []): + for dataset in cls.get("datasets", []): + if dataset.get("name") == domain: + dataset_variables = dataset.get("datasetVariables", []) + if dataset_variables: + dataset_variables.sort( + key=lambda item: int(item["ordinal"]) + ) + return dataset_variables + return None def get_model_domain_metadata(model_details: dict, domain_name: str) -> dict: diff --git a/tests/unit/test_operations/test_permissible_variables.py b/tests/unit/test_operations/test_permissible_variables.py index 2e9b29d73..18da9e0b9 100644 --- a/tests/unit/test_operations/test_permissible_variables.py +++ b/tests/unit/test_operations/test_permissible_variables.py @@ -339,6 +339,13 @@ def mock_cached_method(*args, **kwargs): side_effect=mock_cached_method, ): result: pd.DataFrame = operation.execute() - variables: List[str] = ["AETERM", "AEPERM", "TIMING_VAR"] + variables: List[str] = [ + "STUDYID", + "DOMAIN", + "AETERM", + "AESEQ", + "AEPERM", + "TIMING_VAR", + ] for result_array in result[operation_params.operation_id]: assert sorted(result_array) == sorted(variables) diff --git a/tests/unit/test_operations/test_required_variables.py b/tests/unit/test_operations/test_required_variables.py index c267e3ca4..16c33f741 100644 --- a/tests/unit/test_operations/test_required_variables.py +++ b/tests/unit/test_operations/test_required_variables.py @@ -206,6 +206,6 @@ def mock_cached_method(*args, **kwargs): side_effect=mock_cached_method, ): result: pd.DataFrame = operation.execute() - variables: List[str] = sorted(["STUDYID", "DOMAIN", "AESEQ", "AETEST"]) + variables: List[str] = sorted(["AETEST"]) for result_array in result[operation_params.operation_id]: assert sorted(result_array) == variables