Skip to content

Commit e66b6fd

Browse files
authored
Merge pull request #5 from Mg30/develop
Develop
2 parents bb97bb6 + 537d8d4 commit e66b6fd

File tree

13 files changed

+143
-107
lines changed

13 files changed

+143
-107
lines changed

.github/workflows/contribute.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ jobs:
1111
runs-on: ubuntu-18.04
1212
strategy:
1313
matrix:
14-
python-version: [3.8, 3.9, 3.10]
14+
python-version: ['3.8', '3.9', '3.10', '3.11']
1515
steps:
1616
- uses: actions/checkout@v3
1717
- name: Set up Python

README.md

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ settings.yml
9999

100100
* `project_name/models`: where you will put your tasks
101101
* `project_name/dags/`: where the corresponding dag PNG file will be
102-
* `project_name/settings.yml`: a configuration file for your project. This file includes the configuration options for your project, such as the path to your data directory.
102+
* `settings.yml`: a configuration file for your project. This file includes the configuration options for your project, such as the path to your data directory.
103103

104104

105105

@@ -111,10 +111,20 @@ will export the current state of your dag in the `project_name/dags/` as PNG fil
111111

112112
## Run your project
113113

114-
`pydwt run`
114+
`pydwt run <module.function_name>`
115115

116-
will run the current state of your DAG. It will process the tasks in the DAG by level and parrelise
117-
it with the `ThreadExecutor`
116+
If no argument provided will run the current state of your DAG. It will process the tasks in the DAG by level and parallelize
117+
it with the `ThreadExecutor`. It a task failed then its child tasks will not be run.
118+
119+
If argument provided in the form of `module.function_name` for instance `example.task_one` then will run all tasks in the dag leading to this task.
120+
If parent tasks succeeded then run the task.
121+
122+
123+
## Test your connection setup
124+
125+
`pydwt test-connection`
126+
127+
will test the current setup of your DB connectiona according to your `settings.yml` file.
118128

119129
## Configuration of your pydwt project
120130

@@ -123,12 +133,11 @@ The `settings.yml` file is a configuration file for your pydwt project. It store
123133
### connection
124134
The connection section contains the configuration details for connecting to the database. The available options are:
125135

126-
* `db`: the name of the database
127-
* `host`: the hostname or IP address of the database server
128-
* `user`: the username for the database connection
129-
* `password`: the password for the database connection
130-
* `port`: the port number to use for the database connection
131-
* `sql_alchemy_driver`: the SQLAlchemy driver to use for the database connection
136+
* `url`: the connection string to your db
137+
138+
You can add others keys that will be forwarded to the underlying `create_engine` function
139+
for instance you can add a `echo : true` and it will call `create_engine(url=url, echo=echo)`
140+
see [here](https://docs.sqlalchemy.org/en/20/core/engines.html#sqlalchemy.create_engine) supported args.
132141

133142
### project
134143
The project section contains the project-related settings. The available options are:

poetry.lock

Lines changed: 1 addition & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pydwt/app.py

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1+
import traceback
12
from typing import Dict
23
import os
34
import sys
4-
5+
from typing import Optional
56
import typer
67
import yaml
78
from dependency_injector.wiring import register_loader_containers
8-
99
from pydwt.core.containers import Container
10+
import logging
1011

1112

1213
sys.path.append(os.getcwd())
@@ -21,6 +22,7 @@ def load_config(path: str) -> Dict:
2122
config = yaml.safe_load(f)
2223
return config
2324

25+
2426
# Define command-line interface using Typer
2527
@app.command()
2628
def new(project_name: str):
@@ -30,12 +32,12 @@ def new(project_name: str):
3032

3133

3234
@app.command()
33-
def run():
35+
def run(name: Optional[str] = typer.Argument(None)):
3436
"""Run the workflow DAG for the current project."""
3537
config = load_config(path="settings.yml")
3638
container.config.from_dict(config)
3739
project_handler = container.project_factory()
38-
project_handler.run()
40+
project_handler.run(name)
3941

4042

4143
@app.command()
@@ -47,6 +49,21 @@ def export_dag():
4749
project_handler.export_dag()
4850

4951

52+
@app.command()
53+
def test_connection():
54+
"""Export the workflow DAG for the current project."""
55+
config = load_config(path="settings.yml")
56+
container.config.from_dict(config)
57+
try:
58+
conn = container.database_client()
59+
engine = conn.get_engine()
60+
dbapi = engine.connect()
61+
dbapi.close()
62+
logging.info("successfully connected to db")
63+
except Exception:
64+
logging.error(f"connection failed {traceback.print_exc()}")
65+
66+
5067
if __name__ == "__main__":
5168
# Run the command-line interface
5269
try:

pydwt/context/connection.py

Lines changed: 4 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,19 @@
11
from dataclasses import dataclass, field
2+
from typing import Dict
23
from sqlalchemy import create_engine, Engine
3-
from urllib.parse import quote_plus
44

55

66
@dataclass
77
class Connection(object):
88
"""Class representing a connection to a database.
99
1010
Attributes:
11-
db (str): Name of the database.
12-
host (int): Hostname or IP address of the server.
13-
name (str): Name of the connection.
14-
password (str): Password for the connection.
15-
user (str): Username for the connection.
16-
sql_alchemy_driver (str): SQL Alchemy driver.
11+
params (Dict): containning SQL alchemy DB url and Kwargs forwarded to create_engine
1712
engine (sqlalchemy.engine.Engine): Database engine.
1813
"""
1914

20-
db: str
21-
host: int
22-
name: str
23-
password: str
24-
user: str
25-
port: int
26-
sql_alchemy_driver: str
15+
params: Dict
2716
engine: Engine = field(init=False, default=None)
2817

29-
def __post_init__(self):
30-
"""Initialize the connection object and create the database engine."""
31-
# Encode the password to be used in the connection URL
32-
self.password = quote_plus(self.password)
33-
# Create the database engine using SQLAlchemy
34-
3518
def get_engine(self):
36-
return create_engine(
37-
f"{self.sql_alchemy_driver}://{self.user}:{self.password}@{self.host}:{self.port}/{self.db}"
38-
)
19+
return create_engine(**self.params)

pydwt/core/containers.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,7 @@ class Container(containers.DeclarativeContainer):
2424
# Singleton provider that provides the database connection instance
2525
database_client = providers.ThreadSafeSingleton(
2626
Connection,
27-
config.connection.db,
28-
config.connection.host,
29-
config.connection.name,
30-
config.connection.password,
31-
config.connection.user,
32-
config.connection.port,
33-
config.connection.sql_alchemy_driver,
27+
config.connection,
3428
)
3529

3630
# Singleton provider that provides the datasources instance

pydwt/core/dag.py

Lines changed: 33 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,9 @@ def __init__(self, tasks: List) -> None:
1616
self.tasks = tasks
1717
self.graph = nx.DiGraph()
1818
self.source = "s"
19-
self.nodes_by_level = {}
2019
self.node_index = {}
2120
self.node_names = {}
2221

23-
@property
24-
def levels(self) -> Dict[int, List[int]]:
25-
"""Get the level of each task in the dag.
26-
27-
Returns:
28-
Dict[int, List[int]]: Dictionary of levels and their corresponding task indexes.
29-
"""
30-
self.build_level()
31-
return self.nodes_by_level
32-
3322
def build_dag(self) -> None:
3423
"""Build the directed acyclic graph from the tasks and their dependencies."""
3524
edges = []
@@ -59,17 +48,40 @@ def build_dag(self) -> None:
5948

6049
self.graph.add_edges_from(edges)
6150

62-
def build_level(self) -> None:
63-
"""Assign levels to nodes based on the breadth-first search."""
51+
def build_level(self, target: str = None) -> Dict:
52+
"""Assign levels to nodes in the dag using the breadth-first search.
53+
54+
Args:
55+
target (str): Optional target node. If provided, the search will be performed up to this node.
56+
57+
Returns:
58+
Dict: Dictionary of levels and their corresponding node indexes.
59+
60+
Raises:
61+
KeyError: If target node is not found in the graph.
62+
"""
63+
nodes_by_level = {}
64+
# If a target node is provided, get its index in the node_index dictionary
65+
node_index = self.node_index.get(target, None)
6466
# Perform breadth-first search on the graph
65-
self.nodes_by_level = {}
6667
bfs_tree = nx.bfs_tree(self.graph, self.source)
68+
69+
level = None
70+
71+
if node_index is not None:
72+
level = {}
73+
path = nx.shortest_path(self.graph, self.source, node_index)
74+
for i,node in enumerate(path):
75+
level[node] = i
76+
77+
else:
6778
# Assign levels to nodes based on their distance from the root node
68-
level = nx.shortest_path_length(bfs_tree, self.source)
79+
level = nx.shortest_path_length(bfs_tree, self.source)
6980
for node, node_level in level.items():
70-
if node_level not in self.nodes_by_level:
71-
self.nodes_by_level[node_level] = []
72-
self.nodes_by_level[node_level].append(node)
81+
if node_level not in nodes_by_level:
82+
nodes_by_level[node_level] = []
83+
nodes_by_level[node_level].append(node)
84+
return nodes_by_level
7385

7486
def check_parents_status(self, task):
7587
"""Check if all parent tasks have the attribute status set to success.
@@ -82,7 +94,9 @@ def check_parents_status(self, task):
8294
"""
8395
node_index = self.node_index[task.name]
8496
for parent_index in self.graph.predecessors(node_index):
97+
if parent_index == "s":
98+
continue
8599
parent = self.tasks[parent_index]
86-
if parent.status == Status.ERROR:
100+
if parent.status == Status.ERROR or parent.status == Status.PENDING:
87101
return False
88102
return True

pydwt/core/project.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,12 @@ def import_all_models(self) -> None:
6262
for model in models:
6363
importlib.import_module(f"{self.name}.models.{model}")
6464

65-
def run(self) -> None:
65+
def run(self, task_name: str = None) -> None:
6666
"""Run the DAG-based workflow."""
67+
task_full_name = f"{self.name}.models.{task_name}" if task_name else None
6768
self.import_all_models()
6869
self.workflow.dag.build_dag()
69-
self.workflow.run()
70+
self.workflow.run(task_full_name)
7071

7172
def export_dag(self) -> None:
7273
"""Export the DAG to a PNG image file."""
@@ -115,12 +116,8 @@ def _create_settings(self, project_name: str) -> None:
115116
"tasks": {"task_one": {"materialize": "view"}},
116117
"sources": {"one": {"table": "table_name", "schema": "some_schema"}},
117118
"connection": {
118-
"db": "",
119-
"host": "",
120-
"port": 0,
121-
"password": "",
122-
"user": "",
123-
"sql_alchemy_driver": "",
119+
"url": '<connection-string>',
120+
"echo": True
124121
},
125122
}
126123
if not os.path.exists(settings_projects):

pydwt/core/workflow.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ def __post_init__(self) -> None:
2929
# check if cache strategy is provided and use_cache flag is enabled
3030
self.dag = Dag(self.tasks)
3131

32-
def run(self) -> None:
32+
def run(self, task_name: str = None) -> None:
3333
"""Run the tasks in the DAG."""
3434
# Get a dictionary of all the task levels in the DAG
35-
levels = self.dag.levels
35+
levels = self.dag.build_level(task_name)
3636
start_time_workflow = time.time()
3737
# Iterate through each level in the DAG
3838
for level, task_indexes in levels.items():

pyproject.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "pydwt"
3-
version = "0.1.1a1"
3+
version = "0.1.1"
44
description = ""
55
authors = ["gonza <[email protected]>"]
66
readme = "README.md"
@@ -10,7 +10,6 @@ python = "^3.8"
1010
networkx = "^3.0"
1111
matplotlib = "^3.6.3"
1212
sqlalchemy = "^2.0.1"
13-
dill = "^0.3.6"
1413
typer = "^0.7.0"
1514
pyyaml = "^6.0"
1615
dependency-injector = "^4.41.0"

0 commit comments

Comments
 (0)