Skip to content

added the filters parameter to deploy_semantic_model #504

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/sempy_labs/_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ def generate_aggs(
# viewName = view['viewName']
# isTemporary = view['isTemporary']
# new_data = {'Workspace Name': workspace, 'Lakehouse Name': lakehouse, 'View Name': viewName}
# dfView = pd.concat([dfView, pd.DataFrame(new_data, index=[0])], ignore_index=True)
# dfView = pd.([dfView, pd.DataFrame(new_data, index=[0])], ignore_index=True)
# dfView
# lakeT = get_lakehouse_tables(lakehouse, lakehouse_workspace)
# if not dfP['Query'].isin(lakeT['Table Name'].values):
Expand Down
6 changes: 3 additions & 3 deletions src/sempy_labs/_dax.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,21 @@ def evaluate_dax_impersonation(
payload=payload,
)
data = response.json()["results"][0]["tables"]

# Get all possible column names from all rows because null columns aren't returned
all_columns = set()
for item in data:
for row in item["rows"]:
all_columns.update(row.keys())

# Create rows with all columns, filling missing values with None
rows = []
for item in data:
for row in item["rows"]:
# Create a new row with all columns, defaulting to None
new_row = {col: row.get(col) for col in all_columns}
rows.append(new_row)

# Create DataFrame from the processed rows
df = pd.DataFrame(rows)

Expand Down
302 changes: 298 additions & 4 deletions src/sempy_labs/_generate_semantic_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ def deploy_semantic_model(
refresh_target_dataset: bool = True,
overwrite: bool = False,
perspective: Optional[str] = None,
filters: Optional[dict] = None,
):
"""
Deploys a semantic model based on an existing semantic model.
Expand All @@ -277,6 +278,20 @@ def deploy_semantic_model(
If set to True, overwrites the existing semantic model in the workspace if it exists.
perspective : str, default=None
Set this to the name of a perspective in the model and it will reduce the deployed model down to the tables/columns/measures/hierarchies within that perspective.
filters : dict, default=None
A dictionary of filters to apply to the tables in the model. This is only supported for Direct Lake models and for tables in Direct Lake mode.

Supported operators: [=, <>, !=, <, <=, >, >=, IN, NOT IN]

Format:
{
"TableName": ["ColumnName", "Operator", "Value"],
}
Example:
filters = {
"Geography": ["Country", "=", "Canada"],
"Product": ["ModelName", "IN", ["Bikes", "Cars"]]
}
"""

(source_workspace_name, source_workspace_id) = resolve_workspace_name_and_id(
Expand Down Expand Up @@ -310,15 +325,294 @@ def deploy_semantic_model(
f"{icons.warning} The '{target_dataset}' semantic model already exists within the '{target_workspace_name}' workspace. The 'overwrite' parameter is set to False so the source semantic model was not deployed to the target destination."
)

if perspective is not None:
if filters is not None and perspective is None:
raise ValueError(
f"{icons.red_dot} The 'filters' parameter is only supported when the 'perspective' parameter is set to None. Please update the parameters."
)

if filters is not None:
import sempy

sempy.fabric._client._utils._init_analysis_services()
import Microsoft.AnalysisServices.Tabular as TOM
from sempy_labs._helper_functions import (
find_transitive_incoming_relationships,
create_abfss_path,
_read_delta_table,
save_as_delta_table,
)
from sempy_labs.tom import connect_semantic_model
from sempy_labs.directlake._dl_helper import get_direct_lake_source
from functools import reduce
from pyspark.sql.functions import col
from sempy_labs._sql import ConnectLakehouse

valid_operators = ["=", "!=", "<>", "<", ">", "<=", ">=", "IN", "NOT IN"]
write_mode = "overwrite"

expanded_data = []
for table_name, filter_details in filters.items():
expanded_data.append([table_name] + filter_details)

columns = ["TableName", "FilterColumn", "FilterOperator", "FilterValue"]
filters_df = pd.DataFrame(expanded_data, columns=columns)

with connect_semantic_model(
dataset=source_dataset, workspace=source_workspace, readonly=True
dataset=source_dataset, workspace=source_workspace
) as tom:
if not tom.is_direct_lake():
raise ValueError(
f"{icons.red_dot} This is only supported for semantic models in Direct Lake mode."
)

(artifact_type, artifact_name, lakehouse_id, lakehouse_workspace_id) = (
get_direct_lake_source(source_dataset, source_workspace)
)

df_added = tom._reduce_model(perspective_name=perspective)
if not artifact_type == "Lakehouse":
raise ValueError(
f"{icons.red_dot} This is only supported for semantic models in Direct Lake mode. Cannot find lakehouse."
)

fabric.refresh_tom_cache(workspace=source_workspace)
dfR = fabric.list_relationships(
dataset=source_dataset, workspace=source_workspace
)

filter_conditions = []
suffix = perspective.replace(" ", "")
# Check object validity
for _, r in filters_df.iterrows():
table_name = r["TableName"]
column_name = r["FilterColumn"]
operator, value = r["FilterOperator"].upper(), r["FilterValue"]
if not any(t.Name == table_name for t in tom.model.Tables):
raise ValueError(f"The '{table_name}' table does not exist.")
if not any(
c.Parent.Name == table_name and c.Name == column_name
for c in tom.all_columns()
):
raise ValueError(
f"The '{table_name}'[{column_name}] column does not exist."
)
if operator not in valid_operators:
raise ValueError(f"'{operator}' is not a valid operator.")
partition_name = next(
p.Name for p in tom.model.Tables[table_name].Partitions
)
if (
tom.model.Tables[table_name].Partitions[partition_name].Mode
!= TOM.ModeType.DirectLake
):
raise ValueError(
f"{icons.red_dot} Filtering is only valid for tables in Direct Lake mode."
)

for _, r in filters_df.iterrows():
table_name = r["TableName"]
column_name = r["FilterColumn"]
operator, value = r["FilterOperator"].upper(), r["FilterValue"]
partition_name = next(
p.Name for p in tom.model.Tables[table_name].Partitions
)
entity_name = (
tom.model.Tables[table_name]
.Partitions[partition_name]
.Source.EntityName
)
source_column_name = (
tom.model.Tables[table_name].Columns[column_name].SourceColumn
)

path = create_abfss_path(
lakehouse_id, lakehouse_workspace_id, entity_name
)
new_lake_table_name = f"{entity_name}_{suffix}"
dfL = _read_delta_table(path)

# Filter dataframe
if operator == "=":
filter_conditions.append(col(source_column_name) == value)
elif operator in {"!=", "<>"}:
filter_conditions.append(col(source_column_name) != value)
elif operator == "IN":
if isinstance(value, str):
value = [value]
filter_conditions.append(col(source_column_name).isin(value))
elif operator == "NOT IN":
if isinstance(value, str):
value = [value]
filter_conditions.append(~col(source_column_name).isin(value))
elif operator == ">":
filter_conditions.append(col(source_column_name) > value)
elif operator == "<":
filter_conditions.append(col(source_column_name) < value)
else:
raise NotImplementedError

dfL_filt = dfL.filter(reduce(lambda x, y: x & y, filter_conditions))
# Create delta table for filtered data
save_as_delta_table(
dataframe=dfL_filt,
delta_table_name=new_lake_table_name,
write_mode=write_mode,
lakehouse=lakehouse_id,
workspace=lakehouse_workspace_id,
)

transitive_relations = find_transitive_incoming_relationships(
dfR, table_name
).sort_values(by="Degree")
print(transitive_relations)

alias_counter = 0
alias_map = {entity_name: alias_counter}
alias_counter += 1

for _, r in transitive_relations.iterrows():
from_column = r["From Column"]
to_column = r["To Column"]
from_table = r["From Table"]
to_table = r["To Table"]
degree = r["Degree"]

from_partition = next(
p.Name for p in tom.model.Tables[from_table].Partitions
)
from_entity = (
tom.model.Tables[from_table]
.Partitions[from_partition]
.Source.EntityName
)
from_source_column = (
tom.model.Tables[from_table].Columns[from_column].SourceColumn
)

to_partition = next(
p.Name for p in tom.model.Tables[to_table].Partitions
)
to_entity = (
tom.model.Tables[to_table]
.Partitions[to_partition]
.Source.EntityName
)
to_source_column = (
tom.model.Tables[to_table].Columns[to_column].SourceColumn
)

new_lake_table_name = f"{from_entity}_{suffix}"

if from_entity not in alias_map:
alias_map[from_entity] = alias_counter
alias_counter += 1
if to_entity not in alias_map:
alias_map[to_entity] = alias_counter
alias_counter += 1

# Generate SQL query dynamically for multi-degree joins
join_conditions = []
last_entity = from_entity
last_column = from_source_column

filtered_table = (
table_name # This is the final table where we apply the filter
)

# Build JOINs for each degree
for deg in range(degree, 0, -1):
rel = transitive_relations[
transitive_relations["Degree"] == deg
].iloc[0]

join_from_table = rel["From Table"]
join_to_table = rel["To Table"]
join_from_column = rel["From Column"]
join_to_column = rel["To Column"]

join_from_entity = (
tom.model.Tables[join_from_table]
.Partitions[
next(
p.Name
for p in tom.model.Tables[
join_from_table
].Partitions
)
]
.Source.EntityName
)
join_to_entity = (
tom.model.Tables[join_to_table]
.Partitions[
next(
p.Name
for p in tom.model.Tables[join_to_table].Partitions
)
]
.Source.EntityName
)

join_from_source_column = (
tom.model.Tables[join_from_table]
.Columns[join_from_column]
.SourceColumn
)
join_to_source_column = (
tom.model.Tables[join_to_table]
.Columns[join_to_column]
.SourceColumn
)

from_alias = f"{alias_map[join_from_entity]}"
to_alias = f"{alias_map[join_to_entity]}"

join_conditions.append(
f"LEFT JOIN {join_to_entity} AS T{to_alias} ON T{from_alias}.{join_from_source_column} = T{to_alias}.{join_to_source_column}\n"
)

last_entity = join_to_entity
last_column = join_to_source_column

max_value = max(alias_map.values())
# Final query with multiple JOINs
query = f"""SELECT T0.*\nFROM {from_entity} AS T{max_value}\n{''.join(join_conditions)}WHERE T{alias_map.get(entity_name)}.{source_column_name} {operator} '{value}'
"""
# print(query)
with ConnectLakehouse(
lakehouse=lakehouse_id, workspace=lakehouse_workspace_id
) as sql:
df_result = sql.query(query)

save_as_delta_table(
dataframe=df_result,
delta_table_name=new_lake_table_name,
write_mode=write_mode,
lakehouse=lakehouse_id,
workspace=lakehouse_workspace_id,
)

model_reduced = False
if perspective is not None:
from sempy_labs.tom import connect_semantic_model

with connect_semantic_model(
dataset=source_dataset, workspace=source_workspace, readonly=True
) as tom:
perspectives = [p.Name for p in tom.model.Perspectives]

# If invalid perspective, notify user
if filters is None and perspective in perspectives:
raise ValueError(
f"{icons.red_dot} The '{perspective}' is not a valid perspective in the source semantic model."
)
elif filters is not None and perspective not in perspectives:
print(
f"{icons.info} The '{perspective}' is not a valid perspective in the source semantic model."
)
# Only reduce model if the perspective does not contain all the objects in the model
elif not tom._is_perspective_full:
model_reduced = True
df_added = tom._reduce_model(perspective_name=perspective)
bim = tom.get_bim()

else:
Expand All @@ -342,7 +636,7 @@ def deploy_semantic_model(
if refresh_target_dataset:
refresh_semantic_model(dataset=target_dataset, workspace=target_workspace_id)

if perspective is not None:
if perspective is not None and model_reduced:
return df_added


Expand Down
Loading