Skip to content

feat: DJ sync #355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
# pip-compile --no-annotate dev-requirements.in
#
-e file:.
about-time==4.2.1
aiohttp==3.8.3
aiosignal==1.2.0
alive-progress==3.2.0
anyio==4.9.0
appdirs==1.4.4
astroid==2.12.12
async-timeout==4.0.2
Expand All @@ -19,24 +22,31 @@ cfgv==3.3.1
charset-normalizer==2.0.12
click==8.1.2
codespell==2.1.0
commonmark==0.9.1
coverage[toml]==6.4.3
cython==0.29.32
datajunction==0.0.1a102
dill==0.3.6
distlib==0.3.5
execnet==2.1.1
filelock==3.8.0
freezegun==1.2.2
frozenlist==1.3.1
grapheme==0.6.0
greenlet==1.1.3.post0
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
identify==2.5.3
idna==3.3
iniconfig==1.1.1
isort==5.10.1
jinja2==3.1.2
lazy-object-proxy==1.8.0
markdown-it-py==3.0.0
markupsafe==2.1.1
marshmallow==3.17.0
mccabe==0.7.0
mdurl==0.1.2
multidict==6.0.2
nodeenv==1.7.0
numpy==1.23.1
Expand All @@ -51,28 +61,30 @@ prison==0.2.1
prompt-toolkit==3.0.30
py==1.11.0
pyfakefs==4.6.3
pygments==2.12.0
pygments==2.19.1
pylint==2.15.5
pyparsing==3.0.9
pytest==7.1.2
pytest-cov==3.0.0
pytest-mock==3.8.2
pytest-xdist==3.7.0
python-dateutil==2.8.2
python-graphql-client==0.4.3
pytz==2022.2
pyyaml==6.0
requests==2.27.1
pyyaml==6.0.2
requests==2.32.4
requests-mock==1.9.3
rich==12.5.1
rich==14.0.0
six==1.16.0
sniffio==1.3.1
soupsieve==2.3.2.post1
sqlalchemy==1.4.40
sqlglot==26.23.0
tabulate==0.8.10
toml==0.10.2
tomli==2.0.1
tomlkit==0.11.6
typing-extensions==4.3.0
typing-extensions==4.14.0
urllib3==1.26.9
virtualenv==20.16.3
wcwidth==0.2.5
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ testing =
pre-commit
pip-tools>=6.6.0
pylint==2.15.5
datajunction

[options.entry_points]
# Add here console scripts like:
Expand Down
9 changes: 5 additions & 4 deletions src/preset_cli/api/clients/superset.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ class OwnershipType(TypedDict):


class SupersetClient: # pylint: disable=too-many-public-methods

"""
A client for running queries against Superset.
"""
Expand Down Expand Up @@ -360,9 +359,11 @@ def get_data( # pylint: disable=too-many-locals, too-many-arguments

# and order bys
processed_orderbys = [
(orderby, not order_desc)
if orderby in metric_names
else (convert_to_adhoc_metric(orderby), not order_desc)
(
(orderby, not order_desc)
if orderby in metric_names
else (convert_to_adhoc_metric(orderby), not order_desc)
)
for orderby in (order_by or [])
]

Expand Down
Empty file.
90 changes: 90 additions & 0 deletions src/preset_cli/cli/superset/sync/dj/command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
"""
A command to sync DJ cubes into a Superset instance.
"""

from __future__ import annotations

import logging
from uuid import UUID

import click
from datajunction import DJClient
from yarl import URL

from preset_cli.api.clients.superset import SupersetClient
from preset_cli.cli.superset.sync.dj.lib import sync_cube
from preset_cli.lib import split_comma

_logger = logging.getLogger(__name__)


@click.command()
@click.option(
"--database-uuid",
required=True,
help="Database UUID",
)
@click.option(
"--schema",
required=True,
help="Schema where virtual dataset will be created",
)
@click.option(
"--cubes",
callback=split_comma,
help="Comma-separated list of cubes to sync",
)
@click.option(
"dj_url",
"--dj-url",
required=True,
help="DJ URL",
default="http://localhost:8000",
)
@click.option(
"dj_username",
"--dj-username",
required=True,
help="DJ username",
default="dj",
)
@click.option(
"dj_password",
"--dj-password",
required=True,
help="DJ password",
default="dj",
)
@click.option("--external-url-prefix", default="", help="Base URL for resources")
@click.pass_context
def dj( # pylint: disable=invalid-name,too-many-arguments
ctx: click.core.Context,
database_uuid: str,
schema: str,
cubes: list[str],
dj_url: str,
dj_username: str,
dj_password: str,
external_url_prefix: str = "",
) -> None:
"""
Sync DJ cubes to Superset.
"""
superset_auth = ctx.obj["AUTH"]
superset_url = URL(ctx.obj["INSTANCE"])
superset_client = SupersetClient(superset_url, superset_auth)

dj_client = DJClient(dj_url)
dj_client.basic_login(dj_username, dj_password)

base_url = URL(external_url_prefix) if external_url_prefix else None

for cube in cubes:
sync_cube(
UUID(database_uuid),
schema,
dj_client,
superset_client,
cube,
base_url,
)
154 changes: 154 additions & 0 deletions src/preset_cli/cli/superset/sync/dj/lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""
Helper functions for DJ sync.
"""

import json
from typing import Any, Optional
from uuid import UUID

from datajunction import DJClient
from yarl import URL

from preset_cli.api.clients.superset import SupersetClient
from preset_cli.api.operators import OneToMany


def sync_cube( # pylint: disable=too-many-arguments
database_uuid: UUID,
schema: str,
dj_client: DJClient,
superset_client: SupersetClient,
cube: str,
base_url: Optional[URL],
) -> None:
"""
Sync a DJ cube to a Superset virtual dataset.
"""
response = dj_client._session.post( # pylint: disable=protected-access
"/graphql",
json={
"query": """
query FindCubes($names:[String!], $tags: [String!]) {
findNodes(names: $names, tags: $tags, nodeTypes: [CUBE]) {
name
current {
description
displayName
cubeMetrics {
name
description
extractedMeasures {
derivedExpression
}
}
cubeDimensions {
name
}
}
}
}
""",
"variables": {"names": [cube]},
},
)
payload = response.json()
description = payload["data"]["findNodes"][0]["current"]["description"]
columns = [
dimension["name"]
for dimension in payload["data"]["findNodes"][0]["current"]["cubeDimensions"]
]
metrics = [
{
"metric_name": metric["name"],
"expression": metric["extractedMeasures"]["derivedExpression"],
"description": metric["description"],
}
for metric in payload["data"]["findNodes"][0]["current"]["cubeMetrics"]
]

response = dj_client._session.post( # pylint: disable=protected-access
"/graphql",
json={
"query": """
query MeasuresSql($metrics: [String!]!, $dimensions: [String!]!) {
measuresSql(
cube: {metrics: $metrics, dimensions: $dimensions, filters: []}
preaggregate: true
) {
sql
}
}
""",
"variables": {
"metrics": [metric["metric_name"] for metric in metrics],
"dimensions": columns,
},
},
)
payload = response.json()
sql = payload["data"]["measuresSql"][0]["sql"]

database = get_database(superset_client, database_uuid)
dataset = get_or_create_dataset(superset_client, database, schema, cube, sql)

superset_client.update_dataset(
dataset["id"],
override_columns=True,
metrics=[],
)

superset_client.update_dataset(
dataset["id"],
override_columns=False,
metrics=metrics,
description=description,
is_managed_externally=True,
external_url=base_url / "nodes" / cube if base_url else None,
extra=json.dumps(
{
"certification": {
"certified_by": "DJ",
"details": "This table is created by DJ.",
},
},
),
sql=sql,
)


def get_database(superset_client: SupersetClient, uuid: UUID) -> dict[str, Any]:
"""
Get database info given its UUID.
"""
databases = superset_client.get_databases(uuid=str(uuid))
if not databases:
raise ValueError(f"Database with UUID {uuid} not found in Superset.")

return databases[0]


def get_or_create_dataset(
superset_client: SupersetClient,
database: dict[str, Any],
schema: str,
cube: str,
sql: str,
) -> dict[str, Any]:
"""
Get or create a dataset in Superset.
"""
if existing := superset_client.get_datasets(
database=OneToMany(database["id"]), # type: ignore
schema=schema,
table_name=cube,
):
dataset = existing[0]
return superset_client.get_dataset(dataset["id"])

return superset_client.create_dataset(
database=database["id"],
catalog=None,
schema=schema,
table_name=cube,
sql=sql,
)
2 changes: 2 additions & 0 deletions src/preset_cli/cli/superset/sync/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import click

from preset_cli.cli.superset.sync.dbt.command import dbt_cloud, dbt_core
from preset_cli.cli.superset.sync.dj.command import dj
from preset_cli.cli.superset.sync.native.command import native


Expand All @@ -16,6 +17,7 @@ def sync() -> None:


sync.add_command(native)
sync.add_command(dj)
sync.add_command(dbt_cloud, name="dbt-cloud")
sync.add_command(dbt_core, name="dbt-core")
# for backwards compatibility
Expand Down
Empty file.
Loading
Loading