You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm trying to create a graph_asset that calls an op twice, this op calls a single ConfigurableResource but with different configuration values for the two calls. However I can't get this to work.
Intended use case: The single graph_asset represents a table where data is read in from 2 different servers/databases and joined together.
The below (simplified) works when the configuration for the resource is supplied to the op but this means the resource is set to one server/database only. I'd like to be able to reuse the op within the job to pull data from another server/database
from dagster import op
from sqlalchemy import create_engine
from sqlalchemy.engine import URL
import dagster as dg
import dask.dataframe as dd
import pandas as pd
class ConnectionResource(dg.ConfigurableResource):
server: str
database: str
def sql_engine(self, context: dg.InitResourceContext):
ds = (
r"DRIVER={ODBC Driver 17 for SQL Server};"
f"SERVER={self.server};"
f"DATABASE={self.database};"
r"Trusted_Connection=yes;"
r"TrustServerCertificate=yes;"
)
connection_url = URL.create("mssql+pyodbc", query={"odbc_connect": ds})
engine = create_engine(connection_url, fast_executemany=True, future=True)
return engine
@op
def read_table_from_sql(
context, sql: ConnectionResource, table_name: str
) -> dd.DataFrame:
engine = sql.sql_engine(context)
context.log.info(f"Reading table '{table_name}' from SQL Server")
df = pd.read_sql_table(table_name, con=engine)
df = dd.from_pandas(df)
context.log.info(f"Read {len(df)} rows from '{table_name}'.")
return df
@dg.graph_asset(
config={
"read_table_from_sql": {"inputs": {"table_name": "some_table_name"}},
}
)
def asset_one():
df = read_table_from_sql()
# ideally including this 2nd read and merge but cannot configure a 2nd server/database using the same ConfigurableResource
df2 = read_table_from_sql("but from a different server/database")
df = df.merge(df2)
return df
asset_job = dg.define_asset_job(
name="asset_job",
selection=[asset_one],
)
# Define your Definitions with resources
defs = dg.Definitions(
assets=[asset_one],
jobs=[asset_job],
resources={
"sql": ConnectionResource(
server="someserver",
database="somedatabase"
)
}
)
I have tried to supply resource_defs to the graph_asset but this leads to various issues and even Dagster AI doesn't seem to be able to suggest something that works?
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
-
I'm trying to create a
graph_asset
that calls anop
twice, thisop
calls a singleConfigurableResource
but with different configuration values for the two calls. However I can't get this to work.Intended use case: The single
graph_asset
represents a table where data is read in from 2 different servers/databases and joined together.The below (simplified) works when the configuration for the resource is supplied to the
op
but this means the resource is set to one server/database only. I'd like to be able to reuse theop
within the job to pull data from another server/databaseI have tried to supply
resource_defs
to thegraph_asset
but this leads to various issues and even Dagster AI doesn't seem to be able to suggest something that works?Beta Was this translation helpful? Give feedback.
All reactions