Skip to content

Commit ad72888

Browse files
authored
Rework SubscriptionModel.from_subscription and in_use_by_id injection to remove dependency on Redis (#880)
* v1: ~45ms query (incomplete) - pgsql: - blocks: return all as list, optionals not included - values: return only scalars (incomplete), optionals not included - base.py: - blocks: rewrite single block relations from list to dict, add default list for missing optionals - values: add default list/none for missing optionals * v2: ~100ms query - pgsql: - blocks: return all as list, optionals not included - values: return as scalars or list depending on mapping, add default array/null for missing optionals - base.py: - blocks: rewrite single block relations from list to dict, add default list for missing optionals - values: no rewrite needed * v3: ~48ms query - pgsql: - blocks: return all as list, optionals not included - values: return all as list, optionals not included - base.py: - blocks: rewrite single block relations from list to dict, add default list for missing optionals - values: rewrite single values from list to dict, add default list/None for missing optionals * Refactor get_query_loaders function to be reusable * v3.1 - Further improve SubscriptionModel.from_subscription - UNION ALL slightly better than UNION when no duplicates are possible - Temporary toggle ENABLE_SUBSCRIPTION_MODEL_OPTIMIZATIONS * v3.2 - Speed up in_use_by injection in build_extended_domain_model * Update tests * Rename function and actually use the sqlalchemy definition * Sort value/instance lists ascending to be consistent with sqlalchemy * Also use ENABLE_SUBSCRIPTION_MODEL_OPTIMIZATIONS to toggle subscription model caching * Add logic to SubscriptionModel._load_root_instances and document it, refactor to support list of root product blocks * Lots of fixes + code reorganization * Add headers * Fix flaky test * Add comment and fix type ignore * Raise clear TypeError when passing an invalid subscription_id * Add test test_subscription_detail_domain_model_optimizations for code coverage * Address review comment * Move serializable property tests to separate file * Add testcase for retrieving SubscriptionModel in serializable property * Add todo's for removing ENABLE_SUBSCRIPTION_MODEL_OPTIMIZATIONS * Add context_cache.cache_subscription_models * Document __eq__ override * Ensure sqlalchemy model loaders are always initialized * Bump version to 3.2.0rc1
1 parent 6729ef2 commit ad72888

34 files changed

+1480
-665
lines changed

.bumpversion.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[bumpversion]
2-
current_version = 3.1.2
2+
current_version = 3.2.0rc1
33
commit = False
44
tag = False
55
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(rc(?P<build>\d+))?

orchestrator/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
"""This is the orchestrator workflow engine."""
1515

16-
__version__ = "3.1.2"
16+
__version__ = "3.2.0rc1"
1717

1818
from orchestrator.app import OrchestratorCore
1919
from orchestrator.settings import app_settings

orchestrator/db/loaders.py

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from functools import reduce
2-
from typing import Any, Callable, Iterator, NamedTuple, cast
2+
from typing import Any, Callable, Iterable, Iterator, NamedTuple, cast
33

44
import structlog
55
from sqlalchemy import inspect
@@ -131,12 +131,15 @@ def init_model_loaders() -> None:
131131
_MODEL_LOADERS[model] = dict(_inspect_model(model))
132132

133133

134-
def lookup_attr_loaders(model: type[DbBaseModel], attr: str) -> list[AttrLoader]:
134+
def _lookup_attr_loaders(model: type[DbBaseModel], attr: str) -> list[AttrLoader]:
135135
"""Return loader(s) for an attribute on the given model."""
136+
if not _MODEL_LOADERS:
137+
# Ensure loaders are always initialized
138+
init_model_loaders()
136139
return _MODEL_LOADERS.get(model, {}).get(attr, [])
137140

138141

139-
def join_attr_loaders(loaders: list[AttrLoader]) -> Load | None:
142+
def _join_attr_loaders(loaders: list[AttrLoader]) -> Load | None:
140143
"""Given 1 or more attribute loaders, instantiate and chain them together."""
141144
if not loaders:
142145
return None
@@ -151,3 +154,48 @@ def chain_loader_func(final_loader: Load, next: AttrLoader) -> Load:
151154
return getattr(final_loader, next.loader_fn.__name__)(next.attr)
152155

153156
return reduce(chain_loader_func, other_loaders, loader_fn)
157+
158+
159+
def _split_path(query_path: str) -> Iterable[str]:
160+
yield from (field for field in query_path.split("."))
161+
162+
163+
def get_query_loaders_for_model_paths(root_model: type[DbBaseModel], model_paths: list[str]) -> list[Load]:
164+
"""Get sqlalchemy query loaders to use for the model based on the paths."""
165+
# Sort by length to find the longest match first
166+
model_paths.sort(key=lambda x: x.count("."), reverse=True)
167+
168+
def get_loader_for_path(model_path: str) -> tuple[str, Load | None]:
169+
next_model = root_model
170+
171+
matched_fields: list[str] = []
172+
path_loaders: list[AttrLoader] = []
173+
174+
for field in _split_path(model_path):
175+
if not (attr_loaders := _lookup_attr_loaders(next_model, field)):
176+
break
177+
178+
matched_fields.append(field)
179+
path_loaders.extend(attr_loaders)
180+
next_model = attr_loaders[-1].next_model
181+
182+
return ".".join(matched_fields), _join_attr_loaders(path_loaders)
183+
184+
query_loaders: dict[str, Load] = {}
185+
186+
for path in model_paths:
187+
matched_path, loader = get_loader_for_path(path)
188+
if not matched_path or not loader or matched_path in query_loaders:
189+
continue
190+
if any(known_path.startswith(f"{matched_path}.") for known_path in query_loaders):
191+
continue
192+
query_loaders[matched_path] = loader
193+
194+
loaders = list(query_loaders.values())
195+
logger.debug(
196+
"Generated query loaders for paths",
197+
root_model=root_model,
198+
model_paths=model_paths,
199+
query_loaders=[str(i.path) for i in loaders],
200+
)
201+
return loaders

orchestrator/db/models.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import enum
1717
from datetime import datetime, timezone
18+
from uuid import UUID
1819

1920
import sqlalchemy
2021
import structlog
@@ -43,6 +44,7 @@
4344
from sqlalchemy.ext.associationproxy import association_proxy
4445
from sqlalchemy.ext.orderinglist import ordering_list
4546
from sqlalchemy.orm import Mapped, deferred, mapped_column, object_session, relationship, undefer
47+
from sqlalchemy.sql.functions import GenericFunction
4648
from sqlalchemy_utils import TSVectorType, UUIDType
4749

4850
from orchestrator.config.assignee import Assignee
@@ -667,3 +669,14 @@ class EngineSettingsTable(BaseModel):
667669
global_lock = mapped_column(Boolean(), default=False, nullable=False, primary_key=True)
668670
running_processes = mapped_column(Integer(), default=0, nullable=False)
669671
__table_args__: tuple = (CheckConstraint(running_processes >= 0, name="check_running_processes_positive"), {})
672+
673+
674+
class SubscriptionInstanceAsJsonFunction(GenericFunction):
675+
# Added in migration 42b3d076a85b
676+
name = "subscription_instance_as_json"
677+
678+
type = pg.JSONB()
679+
inherit_cache = True
680+
681+
def __init__(self, sub_inst_id: UUID):
682+
super().__init__(sub_inst_id)

orchestrator/db/queries/__init__.py

Whitespace-only changes.
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# Copyright 2019-2025 SURF.
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
from uuid import UUID
15+
16+
from sqlalchemy import UUID as SA_UUID
17+
from sqlalchemy import cast as sa_cast
18+
from sqlalchemy import select
19+
from sqlalchemy.orm import raiseload
20+
21+
from orchestrator.db import SubscriptionInstanceRelationTable, SubscriptionInstanceTable, db
22+
23+
24+
def _eagerload_subscription_instances(
25+
subscription_id: UUID | str, instance_attributes: list[str]
26+
) -> list[SubscriptionInstanceTable]:
27+
"""Given a subscription id, recursively query all depends_on subscription instances with the instance_attributes eagerloaded.
28+
29+
Note: accessing instance attributes on the result that were not explicitly loaded will
30+
trigger a sqlalchemy error.
31+
"""
32+
from orchestrator.db.loaders import get_query_loaders_for_model_paths
33+
34+
# CTE to recursively get all subscription instance ids the subscription depends on
35+
instance_ids_cte = (
36+
select(
37+
sa_cast(None, SA_UUID(as_uuid=True)).label("in_use_by_id"),
38+
SubscriptionInstanceTable.subscription_instance_id.label("depends_on_id"),
39+
)
40+
.where(SubscriptionInstanceTable.subscription_id == subscription_id)
41+
.cte(name="recursive_instance_ids", recursive=True)
42+
)
43+
44+
cte_alias = instance_ids_cte.alias()
45+
rel_alias = select(SubscriptionInstanceRelationTable).alias()
46+
47+
instance_ids = instance_ids_cte.union(
48+
select(rel_alias.c.in_use_by_id, rel_alias.c.depends_on_id).where(
49+
rel_alias.c.in_use_by_id == cte_alias.c.depends_on_id
50+
)
51+
)
52+
53+
select_all_instance_ids = select(instance_ids.c.depends_on_id).subquery()
54+
55+
# Eagerload specified instance attributes
56+
query_loaders = get_query_loaders_for_model_paths(SubscriptionInstanceTable, instance_attributes)
57+
# Prevent unwanted lazyloading of all other attributes
58+
query_loaders += [raiseload("*")] # type: ignore[list-item] # todo fix this type
59+
stmt = (
60+
select(SubscriptionInstanceTable)
61+
.where(SubscriptionInstanceTable.subscription_instance_id.in_(select(select_all_instance_ids)))
62+
.options(*query_loaders)
63+
)
64+
65+
return db.session.scalars(stmt).all() # type: ignore[return-value] # todo fix this type
66+
67+
68+
def eagerload_all_subscription_instances(subscription_id: UUID | str) -> list[SubscriptionInstanceTable]:
69+
"""Recursively find the subscription's depends_on instances and resolve relations for SubscriptionModel.from_subscription_id()."""
70+
instance_attributes = [
71+
"subscription.product",
72+
"product_block",
73+
"values.resource_type",
74+
"depends_on",
75+
"in_use_by",
76+
]
77+
return _eagerload_subscription_instances(subscription_id, instance_attributes)
78+
79+
80+
def eagerload_all_subscription_instances_only_inuseby(subscription_id: UUID | str) -> list[SubscriptionInstanceTable]:
81+
"""Recursively find the subscription's depends_on instances and resolve their in_use_by relations."""
82+
instance_attributes = [
83+
"in_use_by",
84+
]
85+
return _eagerload_subscription_instances(subscription_id, instance_attributes)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Copyright 2019-2025 SURF.
2+
# Licensed under the Apache License, Version 2.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
#
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
#
8+
# Unless required by applicable law or agreed to in writing, software
9+
# distributed under the License is distributed on an "AS IS" BASIS,
10+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
# See the License for the specific language governing permissions and
12+
# limitations under the License.
13+
14+
from uuid import UUID
15+
16+
from sqlalchemy import select
17+
18+
from orchestrator.db import db
19+
from orchestrator.db.models import SubscriptionInstanceAsJsonFunction
20+
21+
22+
def get_subscription_instance_dict(subscription_instance_id: UUID) -> dict:
23+
"""Query the subscription instance as aggregated JSONB and returns it as a dict.
24+
25+
Note: all values are returned as lists and have to be transformed by the caller.
26+
It was attempted to do this in the DB query but this gave worse performance.
27+
"""
28+
return db.session.execute(select(SubscriptionInstanceAsJsonFunction(subscription_instance_id))).scalar_one()

0 commit comments

Comments
 (0)