diff --git a/src/sempy_labs/_ai.py b/src/sempy_labs/_ai.py index b569c1c3..a3098427 100644 --- a/src/sempy_labs/_ai.py +++ b/src/sempy_labs/_ai.py @@ -428,7 +428,7 @@ def generate_aggs( # viewName = view['viewName'] # isTemporary = view['isTemporary'] # new_data = {'Workspace Name': workspace, 'Lakehouse Name': lakehouse, 'View Name': viewName} -# dfView = pd.concat([dfView, pd.DataFrame(new_data, index=[0])], ignore_index=True) +# dfView = pd.([dfView, pd.DataFrame(new_data, index=[0])], ignore_index=True) # dfView # lakeT = get_lakehouse_tables(lakehouse, lakehouse_workspace) # if not dfP['Query'].isin(lakeT['Table Name'].values): diff --git a/src/sempy_labs/_dax.py b/src/sempy_labs/_dax.py index 34b407f4..038d4861 100644 --- a/src/sempy_labs/_dax.py +++ b/src/sempy_labs/_dax.py @@ -62,13 +62,13 @@ def evaluate_dax_impersonation( payload=payload, ) data = response.json()["results"][0]["tables"] - + # Get all possible column names from all rows because null columns aren't returned all_columns = set() for item in data: for row in item["rows"]: all_columns.update(row.keys()) - + # Create rows with all columns, filling missing values with None rows = [] for item in data: @@ -76,7 +76,7 @@ def evaluate_dax_impersonation( # Create a new row with all columns, defaulting to None new_row = {col: row.get(col) for col in all_columns} rows.append(new_row) - + # Create DataFrame from the processed rows df = pd.DataFrame(rows) diff --git a/src/sempy_labs/_generate_semantic_model.py b/src/sempy_labs/_generate_semantic_model.py index 7988a7cf..f1afc217 100644 --- a/src/sempy_labs/_generate_semantic_model.py +++ b/src/sempy_labs/_generate_semantic_model.py @@ -253,6 +253,7 @@ def deploy_semantic_model( refresh_target_dataset: bool = True, overwrite: bool = False, perspective: Optional[str] = None, + filters: Optional[dict] = None, ): """ Deploys a semantic model based on an existing semantic model. @@ -277,6 +278,20 @@ def deploy_semantic_model( If set to True, overwrites the existing semantic model in the workspace if it exists. perspective : str, default=None Set this to the name of a perspective in the model and it will reduce the deployed model down to the tables/columns/measures/hierarchies within that perspective. + filters : dict, default=None + A dictionary of filters to apply to the tables in the model. This is only supported for Direct Lake models and for tables in Direct Lake mode. + + Supported operators: [=, <>, !=, <, <=, >, >=, IN, NOT IN] + + Format: + { + "TableName": ["ColumnName", "Operator", "Value"], + } + Example: + filters = { + "Geography": ["Country", "=", "Canada"], + "Product": ["ModelName", "IN", ["Bikes", "Cars"]] + } """ (source_workspace_name, source_workspace_id) = resolve_workspace_name_and_id( @@ -310,15 +325,294 @@ def deploy_semantic_model( f"{icons.warning} The '{target_dataset}' semantic model already exists within the '{target_workspace_name}' workspace. The 'overwrite' parameter is set to False so the source semantic model was not deployed to the target destination." ) - if perspective is not None: + if filters is not None and perspective is None: + raise ValueError( + f"{icons.red_dot} The 'filters' parameter is only supported when the 'perspective' parameter is set to None. Please update the parameters." + ) + if filters is not None: + import sempy + + sempy.fabric._client._utils._init_analysis_services() + import Microsoft.AnalysisServices.Tabular as TOM + from sempy_labs._helper_functions import ( + find_transitive_incoming_relationships, + create_abfss_path, + _read_delta_table, + save_as_delta_table, + ) from sempy_labs.tom import connect_semantic_model + from sempy_labs.directlake._dl_helper import get_direct_lake_source + from functools import reduce + from pyspark.sql.functions import col + from sempy_labs._sql import ConnectLakehouse + + valid_operators = ["=", "!=", "<>", "<", ">", "<=", ">=", "IN", "NOT IN"] + write_mode = "overwrite" + + expanded_data = [] + for table_name, filter_details in filters.items(): + expanded_data.append([table_name] + filter_details) + + columns = ["TableName", "FilterColumn", "FilterOperator", "FilterValue"] + filters_df = pd.DataFrame(expanded_data, columns=columns) with connect_semantic_model( - dataset=source_dataset, workspace=source_workspace, readonly=True + dataset=source_dataset, workspace=source_workspace ) as tom: + if not tom.is_direct_lake(): + raise ValueError( + f"{icons.red_dot} This is only supported for semantic models in Direct Lake mode." + ) + + (artifact_type, artifact_name, lakehouse_id, lakehouse_workspace_id) = ( + get_direct_lake_source(source_dataset, source_workspace) + ) - df_added = tom._reduce_model(perspective_name=perspective) + if not artifact_type == "Lakehouse": + raise ValueError( + f"{icons.red_dot} This is only supported for semantic models in Direct Lake mode. Cannot find lakehouse." + ) + + fabric.refresh_tom_cache(workspace=source_workspace) + dfR = fabric.list_relationships( + dataset=source_dataset, workspace=source_workspace + ) + + filter_conditions = [] + suffix = perspective.replace(" ", "") + # Check object validity + for _, r in filters_df.iterrows(): + table_name = r["TableName"] + column_name = r["FilterColumn"] + operator, value = r["FilterOperator"].upper(), r["FilterValue"] + if not any(t.Name == table_name for t in tom.model.Tables): + raise ValueError(f"The '{table_name}' table does not exist.") + if not any( + c.Parent.Name == table_name and c.Name == column_name + for c in tom.all_columns() + ): + raise ValueError( + f"The '{table_name}'[{column_name}] column does not exist." + ) + if operator not in valid_operators: + raise ValueError(f"'{operator}' is not a valid operator.") + partition_name = next( + p.Name for p in tom.model.Tables[table_name].Partitions + ) + if ( + tom.model.Tables[table_name].Partitions[partition_name].Mode + != TOM.ModeType.DirectLake + ): + raise ValueError( + f"{icons.red_dot} Filtering is only valid for tables in Direct Lake mode." + ) + + for _, r in filters_df.iterrows(): + table_name = r["TableName"] + column_name = r["FilterColumn"] + operator, value = r["FilterOperator"].upper(), r["FilterValue"] + partition_name = next( + p.Name for p in tom.model.Tables[table_name].Partitions + ) + entity_name = ( + tom.model.Tables[table_name] + .Partitions[partition_name] + .Source.EntityName + ) + source_column_name = ( + tom.model.Tables[table_name].Columns[column_name].SourceColumn + ) + + path = create_abfss_path( + lakehouse_id, lakehouse_workspace_id, entity_name + ) + new_lake_table_name = f"{entity_name}_{suffix}" + dfL = _read_delta_table(path) + + # Filter dataframe + if operator == "=": + filter_conditions.append(col(source_column_name) == value) + elif operator in {"!=", "<>"}: + filter_conditions.append(col(source_column_name) != value) + elif operator == "IN": + if isinstance(value, str): + value = [value] + filter_conditions.append(col(source_column_name).isin(value)) + elif operator == "NOT IN": + if isinstance(value, str): + value = [value] + filter_conditions.append(~col(source_column_name).isin(value)) + elif operator == ">": + filter_conditions.append(col(source_column_name) > value) + elif operator == "<": + filter_conditions.append(col(source_column_name) < value) + else: + raise NotImplementedError + + dfL_filt = dfL.filter(reduce(lambda x, y: x & y, filter_conditions)) + # Create delta table for filtered data + save_as_delta_table( + dataframe=dfL_filt, + delta_table_name=new_lake_table_name, + write_mode=write_mode, + lakehouse=lakehouse_id, + workspace=lakehouse_workspace_id, + ) + + transitive_relations = find_transitive_incoming_relationships( + dfR, table_name + ).sort_values(by="Degree") + print(transitive_relations) + + alias_counter = 0 + alias_map = {entity_name: alias_counter} + alias_counter += 1 + + for _, r in transitive_relations.iterrows(): + from_column = r["From Column"] + to_column = r["To Column"] + from_table = r["From Table"] + to_table = r["To Table"] + degree = r["Degree"] + + from_partition = next( + p.Name for p in tom.model.Tables[from_table].Partitions + ) + from_entity = ( + tom.model.Tables[from_table] + .Partitions[from_partition] + .Source.EntityName + ) + from_source_column = ( + tom.model.Tables[from_table].Columns[from_column].SourceColumn + ) + + to_partition = next( + p.Name for p in tom.model.Tables[to_table].Partitions + ) + to_entity = ( + tom.model.Tables[to_table] + .Partitions[to_partition] + .Source.EntityName + ) + to_source_column = ( + tom.model.Tables[to_table].Columns[to_column].SourceColumn + ) + + new_lake_table_name = f"{from_entity}_{suffix}" + + if from_entity not in alias_map: + alias_map[from_entity] = alias_counter + alias_counter += 1 + if to_entity not in alias_map: + alias_map[to_entity] = alias_counter + alias_counter += 1 + + # Generate SQL query dynamically for multi-degree joins + join_conditions = [] + last_entity = from_entity + last_column = from_source_column + + filtered_table = ( + table_name # This is the final table where we apply the filter + ) + + # Build JOINs for each degree + for deg in range(degree, 0, -1): + rel = transitive_relations[ + transitive_relations["Degree"] == deg + ].iloc[0] + + join_from_table = rel["From Table"] + join_to_table = rel["To Table"] + join_from_column = rel["From Column"] + join_to_column = rel["To Column"] + + join_from_entity = ( + tom.model.Tables[join_from_table] + .Partitions[ + next( + p.Name + for p in tom.model.Tables[ + join_from_table + ].Partitions + ) + ] + .Source.EntityName + ) + join_to_entity = ( + tom.model.Tables[join_to_table] + .Partitions[ + next( + p.Name + for p in tom.model.Tables[join_to_table].Partitions + ) + ] + .Source.EntityName + ) + + join_from_source_column = ( + tom.model.Tables[join_from_table] + .Columns[join_from_column] + .SourceColumn + ) + join_to_source_column = ( + tom.model.Tables[join_to_table] + .Columns[join_to_column] + .SourceColumn + ) + + from_alias = f"{alias_map[join_from_entity]}" + to_alias = f"{alias_map[join_to_entity]}" + + join_conditions.append( + f"LEFT JOIN {join_to_entity} AS T{to_alias} ON T{from_alias}.{join_from_source_column} = T{to_alias}.{join_to_source_column}\n" + ) + + last_entity = join_to_entity + last_column = join_to_source_column + + max_value = max(alias_map.values()) + # Final query with multiple JOINs + query = f"""SELECT T0.*\nFROM {from_entity} AS T{max_value}\n{''.join(join_conditions)}WHERE T{alias_map.get(entity_name)}.{source_column_name} {operator} '{value}' + """ + # print(query) + with ConnectLakehouse( + lakehouse=lakehouse_id, workspace=lakehouse_workspace_id + ) as sql: + df_result = sql.query(query) + + save_as_delta_table( + dataframe=df_result, + delta_table_name=new_lake_table_name, + write_mode=write_mode, + lakehouse=lakehouse_id, + workspace=lakehouse_workspace_id, + ) + + model_reduced = False + if perspective is not None: + from sempy_labs.tom import connect_semantic_model + + with connect_semantic_model( + dataset=source_dataset, workspace=source_workspace, readonly=True + ) as tom: + perspectives = [p.Name for p in tom.model.Perspectives] + + # If invalid perspective, notify user + if filters is None and perspective in perspectives: + raise ValueError( + f"{icons.red_dot} The '{perspective}' is not a valid perspective in the source semantic model." + ) + elif filters is not None and perspective not in perspectives: + print( + f"{icons.info} The '{perspective}' is not a valid perspective in the source semantic model." + ) + # Only reduce model if the perspective does not contain all the objects in the model + elif not tom._is_perspective_full: + model_reduced = True + df_added = tom._reduce_model(perspective_name=perspective) bim = tom.get_bim() else: @@ -342,7 +636,7 @@ def deploy_semantic_model( if refresh_target_dataset: refresh_semantic_model(dataset=target_dataset, workspace=target_workspace_id) - if perspective is not None: + if perspective is not None and model_reduced: return df_added diff --git a/src/sempy_labs/_helper_functions.py b/src/sempy_labs/_helper_functions.py index 8a0a64be..f4d873f0 100644 --- a/src/sempy_labs/_helper_functions.py +++ b/src/sempy_labs/_helper_functions.py @@ -1666,3 +1666,40 @@ def _mount(lakehouse, workspace) -> str: ) return local_path + + +def find_transitive_incoming_relationships( + df, table_name, degree=1, visited=None, results=None +): + """ + Given a DataFrame containing relationships and a table name, + return all relationships where the table appears in the 'To Table' column, + considering transitive relationships (snowflake schema). + + Adds a 'Degree' column to indicate the number of hops from the target table. + """ + if visited is None: + visited = set() + if results is None: + results = [] + + # Avoid infinite loops + if table_name in visited: + return pd.DataFrame(results, columns=list(df.columns) + ["Degree"]) + + visited.add(table_name) + + # Find direct incoming relationships + direct_rels = df[df["To Table"] == table_name].copy() + direct_rels["Degree"] = degree # Assign current degree + + # Append direct relationships to results + results.extend(direct_rels.values.tolist()) + + # Recursively find indirect relationships for the 'From Table' columns + for from_table in direct_rels["From Table"].unique(): + find_transitive_incoming_relationships( + df, from_table, degree + 1, visited, results + ) + + return pd.DataFrame(results, columns=list(df.columns) + ["Degree"]) diff --git a/src/sempy_labs/_list_functions.py b/src/sempy_labs/_list_functions.py index 8be7a658..833d7d17 100644 --- a/src/sempy_labs/_list_functions.py +++ b/src/sempy_labs/_list_functions.py @@ -1239,7 +1239,14 @@ def list_shortcuts( uses_pagination=True, ) - sources = ["s3Compatible", "googleCloudStorage", "externalDataShare", "amazonS3", "adlsGen2", "dataverse"] + sources = [ + "s3Compatible", + "googleCloudStorage", + "externalDataShare", + "amazonS3", + "adlsGen2", + "dataverse", + ] sources_locpath = ["s3Compatible", "googleCloudStorage", "amazonS3", "adlsGen2"] for r in responses: @@ -1247,16 +1254,28 @@ def list_shortcuts( tgt = i.get("target", {}) one_lake = tgt.get("oneLake", {}) connection_id = next( - (tgt.get(source, {}).get("connectionId") for source in sources if tgt.get(source)), - None + ( + tgt.get(source, {}).get("connectionId") + for source in sources + if tgt.get(source) + ), + None, ) location = next( - (tgt.get(source, {}).get("location") for source in sources_locpath if tgt.get(source)), - None + ( + tgt.get(source, {}).get("location") + for source in sources_locpath + if tgt.get(source) + ), + None, ) sub_path = next( - (tgt.get(source, {}).get("subpath") for source in sources_locpath if tgt.get(source)), - None + ( + tgt.get(source, {}).get("subpath") + for source in sources_locpath + if tgt.get(source) + ), + None, ) source_workspace_id = one_lake.get("workspaceId") source_item_id = one_lake.get("itemId") diff --git a/src/sempy_labs/_semantic_models.py b/src/sempy_labs/_semantic_models.py index b5a80404..d690ea37 100644 --- a/src/sempy_labs/_semantic_models.py +++ b/src/sempy_labs/_semantic_models.py @@ -69,7 +69,9 @@ def get_semantic_model_refresh_schedule( def enable_semantic_model_scheduled_refresh( - dataset: str | UUID, workspace: Optional[str | UUID] = None, enable: bool = True, + dataset: str | UUID, + workspace: Optional[str | UUID] = None, + enable: bool = True, ): """ Enables the scheduled refresh for the specified dataset from the specified workspace. @@ -91,18 +93,18 @@ def enable_semantic_model_scheduled_refresh( (dataset_name, dataset_id) = resolve_dataset_name_and_id(dataset, workspace) df = get_semantic_model_refresh_schedule(dataset=dataset, workspace=workspace) - status = df['Enabled'].iloc[0] + status = df["Enabled"].iloc[0] if enable and status: - print(f"{icons.info} Scheduled refresh for the '{dataset_name}' within the '{workspace_name}' workspace is already enabled.") + print( + f"{icons.info} Scheduled refresh for the '{dataset_name}' within the '{workspace_name}' workspace is already enabled." + ) elif not enable and not status: - print(f"{icons.info} Scheduled refresh for the '{dataset_name}' within the '{workspace_name}' workspace is already disabled.") + print( + f"{icons.info} Scheduled refresh for the '{dataset_name}' within the '{workspace_name}' workspace is already disabled." + ) else: - payload = { - "value": { - "enabled": enable - } - } + payload = {"value": {"enabled": enable}} _base_api( request=f"/v1.0/myorg/groups/{workspace_id}/datasets/{dataset_id}/refreshSchedule", @@ -110,4 +112,6 @@ def enable_semantic_model_scheduled_refresh( payload=payload, ) - print(f"{icons.green_dot} Scheduled refresh for the '{dataset_name}' within the '{workspace_name}' workspace has been enabled.") + print( + f"{icons.green_dot} Scheduled refresh for the '{dataset_name}' within the '{workspace_name}' workspace has been enabled." + ) diff --git a/src/sempy_labs/_sql.py b/src/sempy_labs/_sql.py index 0d64d7ab..25f7653e 100644 --- a/src/sempy_labs/_sql.py +++ b/src/sempy_labs/_sql.py @@ -36,7 +36,7 @@ def __init__( self, item: str, workspace: Optional[Union[str, UUID]] = None, - timeout: Optional[int] = None, + timeout: Optional[int] = 10, endpoint_type: str = "warehouse", ): from sempy.fabric._token_provider import SynapseTokenProvider diff --git a/src/sempy_labs/directlake/_update_directlake_partition_entity.py b/src/sempy_labs/directlake/_update_directlake_partition_entity.py index a878ad8c..2e571fbd 100644 --- a/src/sempy_labs/directlake/_update_directlake_partition_entity.py +++ b/src/sempy_labs/directlake/_update_directlake_partition_entity.py @@ -79,7 +79,9 @@ def update_direct_lake_partition_entity( tom.model.Tables[tName].Partitions[part_name].Source.EntityName = eName # Update source lineage tag - schema = tom.model.Tables[tName].Partitions[part_name].Source.SchemaName or 'dbo' + schema = ( + tom.model.Tables[tName].Partitions[part_name].Source.SchemaName or "dbo" + ) tom.model.Tables[tName].SourceLineageTag = f"[{schema}].[{eName}]" print( f"{icons.green_dot} The '{tName}' table in the '{dataset_name}' semantic model within the '{workspace_name}' workspace has been updated to point to the '{eName}' table." diff --git a/src/sempy_labs/tom/_model.py b/src/sempy_labs/tom/_model.py index 2479e644..85579222 100644 --- a/src/sempy_labs/tom/_model.py +++ b/src/sempy_labs/tom/_model.py @@ -1667,6 +1667,47 @@ def remove_from_perspective( object.Parent.Name ].PerspectiveHierarchies.Remove(ph) + def _is_perspective_full(self, perspective_name: str) -> bool: + """ + Indicates whether a perspective contains all the objects in the model. + """ + + import Microsoft.AnalysisServices.Tabular as TOM + + result = True + + if not any(p.Name == perspective_name for p in self.model.Perspectives): + raise ValueError( + f"{icons.red_dot} The '{perspective_name}' perspective does not exist." + ) + + p = self.model.Perspectives[perspective_name] + for pt in p.PerspectiveTables: + columns_persp = sum(1 for pc in pt.PerspectiveColumns) + columns = sum( + 1 + for t in self.model.Tables + for c in t.Columns + if t.Name == pt.Name and c.Type != TOM.ColumnType.RowNumber + ) + + measures = sum(1 for m in self.all_measures() if m.Parent.Name == pt.Name) + measures_persp = sum(1 for pm in pt.PerspectiveMeasures) + + hierarchies = sum( + 1 for m in self.all_hierarchies() if m.Parent.Name == pt.Name + ) + hierarchies_persp = sum(1 for pm in pt.PerspectiveHierarchies) + + if ( + not (columns_persp == columns) + and (measures == measures_persp) + and (hierarchies == hierarchies_persp) + ): + result = False + + return result + def set_translation( self, object: Union[