Skip to content

Commit dedb348

Browse files
authored
Removing Caching from the orchestrator (#938)
* Removing Caching from the orchestrator * Remove more files
1 parent 9848fef commit dedb348

File tree

18 files changed

+38
-453
lines changed

18 files changed

+38
-453
lines changed

docs/migration-guide/4.0.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
---
2+
hide:
3+
- navigation
4+
---
5+
# 4.0 Migration Guide
6+
7+
In this document we describe the steps that should be taken to migrate from `orchestrator-core` v3 to v4.
8+
9+
## About 4.0
10+
11+
In this release we have removed the caching of domain models. Domain models will always be loaded from the database.
12+
13+
## Steps
14+
15+
To use 4.0 all workflows must have run to completion. The `cache_domain_models` step no longer is part of the codebase
16+
therfore `in flight` workflows will fail.

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ nav:
192192
- Migration guides:
193193
- 2.x: migration-guide/2.0.md
194194
- 3.x: migration-guide/3.0.md
195+
- 4.x: migration-guide/4.0.md
195196

196197
- Workshops:
197198
# - Beginner:

orchestrator/api/api_v1/endpoints/subscription_customer_descriptions.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
from orchestrator.schemas import SubscriptionDescriptionBaseSchema, SubscriptionDescriptionSchema
2929
from orchestrator.schemas.subscription_descriptions import UpdateSubscriptionDescriptionSchema
3030
from orchestrator.utils.errors import StaleDataError
31-
from orchestrator.utils.redis import delete_from_redis
3231

3332
router = APIRouter()
3433

@@ -55,7 +54,6 @@ def delete_subscription_customer_descriptions(_id: UUID) -> None:
5554
description = db.session.get(SubscriptionCustomerDescriptionTable, _id)
5655
if description:
5756
delete(SubscriptionCustomerDescriptionTable, _id)
58-
delete_from_redis(description.subscription_id)
5957

6058

6159
@router.get("/{_id}", response_model=SubscriptionDescriptionSchema)

orchestrator/domain/base.py

Lines changed: 4 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,27 +1294,13 @@ def from_other_lifecycle(
12941294
# Some common functions shared by from_other_product and from_subscription
12951295
@classmethod
12961296
def _get_subscription(cls: type[S], subscription_id: UUID | UUIDstr) -> SubscriptionTable | None:
1297-
from orchestrator.settings import app_settings
12981297

12991298
if not isinstance(subscription_id, UUID | UUIDstr):
13001299
raise TypeError(f"subscription_id is of type {type(subscription_id)} instead of UUID | UUIDstr")
13011300

1302-
if app_settings.ENABLE_SUBSCRIPTION_MODEL_OPTIMIZATIONS:
1303-
# TODO #900 remove toggle and make this path the default
1304-
loaders = [
1305-
joinedload(SubscriptionTable.product).selectinload(ProductTable.fixed_inputs),
1306-
]
1307-
1308-
else:
1309-
loaders = [
1310-
selectinload(SubscriptionTable.instances)
1311-
.joinedload(SubscriptionInstanceTable.product_block)
1312-
.selectinload(ProductBlockTable.resource_types),
1313-
selectinload(SubscriptionTable.instances).selectinload(
1314-
SubscriptionInstanceTable.in_use_by_block_relations
1315-
),
1316-
selectinload(SubscriptionTable.instances).selectinload(SubscriptionInstanceTable.values),
1317-
]
1301+
loaders = [
1302+
joinedload(SubscriptionTable.product).selectinload(ProductTable.fixed_inputs),
1303+
]
13181304

13191305
return db.session.get(SubscriptionTable, subscription_id, options=loaders)
13201306

@@ -1394,7 +1380,6 @@ def from_other_product(
13941380
def from_subscription(cls: type[S], subscription_id: UUID | UUIDstr) -> S:
13951381
"""Use a subscription_id to return required fields of an existing subscription."""
13961382
from orchestrator.domain.context_cache import get_from_cache, store_in_cache
1397-
from orchestrator.settings import app_settings
13981383

13991384
if cached_model := get_from_cache(subscription_id):
14001385
return cast(S, cached_model)
@@ -1421,12 +1406,7 @@ def from_subscription(cls: type[S], subscription_id: UUID | UUIDstr) -> S:
14211406

14221407
fixed_inputs = {fi.name: fi.value for fi in subscription.product.fixed_inputs}
14231408

1424-
instances: dict[str, Any]
1425-
if app_settings.ENABLE_SUBSCRIPTION_MODEL_OPTIMIZATIONS:
1426-
# TODO #900 remove toggle and make this path the default
1427-
instances = cls._load_root_instances(subscription_id)
1428-
else:
1429-
instances = cls._load_instances(subscription.instances, status, match_domain_attr=False)
1409+
instances = cls._load_root_instances(subscription_id)
14301410

14311411
try:
14321412
model = cls(

orchestrator/domain/customer_description.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from orchestrator.api.models import delete
2222
from orchestrator.db import SubscriptionCustomerDescriptionTable, db
2323
from orchestrator.utils.errors import StaleDataError
24-
from orchestrator.utils.redis import delete_subscription_from_redis
2524
from orchestrator.utils.validate_data_version import validate_data_version
2625
from orchestrator.websocket import invalidate_subscription_cache
2726

@@ -38,7 +37,6 @@ def get_customer_description_by_customer_subscription(
3837
return db.session.scalars(stmt).one_or_none()
3938

4039

41-
@delete_subscription_from_redis()
4240
async def create_subscription_customer_description(
4341
customer_id: str, subscription_id: UUID, description: str
4442
) -> SubscriptionCustomerDescriptionTable:
@@ -53,7 +51,6 @@ async def create_subscription_customer_description(
5351
return customer_description
5452

5553

56-
@delete_subscription_from_redis()
5754
async def update_subscription_customer_description(
5855
customer_description: SubscriptionCustomerDescriptionTable,
5956
description: str,
@@ -70,7 +67,6 @@ async def update_subscription_customer_description(
7067
return customer_description
7168

7269

73-
@delete_subscription_from_redis()
7470
async def delete_subscription_customer_description_by_customer_subscription(
7571
customer_id: str, subscription_id: UUID
7672
) -> SubscriptionCustomerDescriptionTable | None:

orchestrator/graphql/mutations/customer_description.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ async def resolve_remove_customer_description(
6060
)
6161
if not description:
6262
return NotFoundError(message="Customer description not found")
63-
return CustomerDescription.from_pydantic(description)
63+
return CustomerDescription.from_pydantic(description) # type: ignore
6464

6565

6666
@strawberry.type(description="Customer subscription description mutations")

orchestrator/services/processes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ def create_process(
441441
}
442442

443443
try:
444+
444445
state = post_form(workflow.initial_input_form, initial_state, user_inputs)
445446
except FormValidationError:
446447
logger.exception("Validation errors", user_inputs=user_inputs)

orchestrator/services/subscriptions.py

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,6 @@ def build_domain_model(subscription_model: SubscriptionModel) -> dict:
605605

606606
def build_extended_domain_model(subscription_model: SubscriptionModel) -> dict:
607607
"""Create a subscription dict from the SubscriptionModel with additional keys."""
608-
from orchestrator.settings import app_settings
609608

610609
stmt = select(SubscriptionCustomerDescriptionTable).where(
611610
SubscriptionCustomerDescriptionTable.subscription_id == subscription_model.subscription_id
@@ -615,27 +614,12 @@ def build_extended_domain_model(subscription_model: SubscriptionModel) -> dict:
615614
with cache_subscription_models():
616615
subscription = subscription_model.model_dump()
617616

618-
def inject_in_use_by_ids(path_to_block: str) -> None:
619-
if not (in_use_by_subs := getattr_in(subscription_model, f"{path_to_block}.in_use_by")):
620-
return
621-
622-
in_use_by_ids = [obj.in_use_by_id for obj in in_use_by_subs.col]
623-
in_use_by_relations = [convert_to_in_use_by_relation(instance) for instance in in_use_by_subs]
624-
update_in(subscription, f"{path_to_block}.in_use_by_ids", in_use_by_ids)
625-
update_in(subscription, f"{path_to_block}.in_use_by_relations", in_use_by_relations)
626-
627-
if app_settings.ENABLE_SUBSCRIPTION_MODEL_OPTIMIZATIONS:
628-
# TODO #900 remove toggle and make this path the default
629-
# query all subscription instances and inject the in_use_by_ids/in_use_by_relations into the subscription dict.
630-
instance_to_in_use_by = {
631-
instance.subscription_instance_id: instance.in_use_by
632-
for instance in eagerload_all_subscription_instances_only_inuseby(subscription_model.subscription_id)
633-
}
634-
inject_in_use_by_ids_v2(subscription, instance_to_in_use_by)
635-
else:
636-
# find all product blocks, check if they have in_use_by and inject the in_use_by_ids into the subscription dict.
637-
for path in product_block_paths(subscription):
638-
inject_in_use_by_ids(path)
617+
# query all subscription instances and inject the in_use_by_ids/in_use_by_relations into the subscription dict.
618+
instance_to_in_use_by = {
619+
instance.subscription_instance_id: instance.in_use_by
620+
for instance in eagerload_all_subscription_instances_only_inuseby(subscription_model.subscription_id)
621+
}
622+
inject_in_use_by_ids_v2(subscription, instance_to_in_use_by)
639623

640624
subscription["customer_descriptions"] = customer_descriptions
641625

orchestrator/settings.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ class AppSettings(BaseSettings):
5555
MAIL_PORT: int = 25
5656
MAIL_STARTTLS: bool = False
5757
CACHE_URI: RedisDsn = "redis://localhost:6379/0" # type: ignore
58-
CACHE_DOMAIN_MODELS: bool = False
5958
CACHE_HMAC_SECRET: str | None = None # HMAC signing key, used when pickling results in the cache
6059
REDIS_RETRY_COUNT: NonNegativeInt = Field(
6160
2, description="Number of retries for redis connection errors/timeouts, 0 to disable"
@@ -87,9 +86,6 @@ class AppSettings(BaseSettings):
8786
ENABLE_GRAPHQL_STATS_EXTENSION: bool = False
8887
VALIDATE_OUT_OF_SYNC_SUBSCRIPTIONS: bool = False
8988
FILTER_BY_MODE: Literal["partial", "exact"] = "exact"
90-
ENABLE_SUBSCRIPTION_MODEL_OPTIMIZATIONS: bool = (
91-
True # True=ignore cache + optimized DB queries; False=use cache + unoptimized DB queries. Remove in #900
92-
)
9389

9490

9591
app_settings = AppSettings()

orchestrator/utils/get_subscription_dict.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,11 @@
22

33
from orchestrator.domain.base import SubscriptionModel
44
from orchestrator.services.subscriptions import _generate_etag, build_domain_model, build_extended_domain_model
5-
from orchestrator.utils.redis import from_redis
65

76

87
async def get_subscription_dict(subscription_id: UUID, inject_inuseby: bool = True) -> tuple[dict, str]:
98
"""Helper function to get subscription dict by uuid from db or cache."""
109

11-
if cached_model := from_redis(subscription_id):
12-
return cached_model # type: ignore
13-
1410
subscription_model = SubscriptionModel.from_subscription(subscription_id)
1511

1612
if not inject_inuseby:

orchestrator/utils/redis.py

Lines changed: 1 addition & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,17 @@
1010
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
13-
import functools
1413
from collections.abc import AsyncGenerator
1514
from contextlib import asynccontextmanager
16-
from os import getenv
17-
from typing import Any, Callable
15+
from typing import Any
1816
from uuid import UUID
1917

2018
from anyio import CancelScope, get_cancelled_exc_class
2119
from redis.asyncio import Redis as AIORedis
2220
from redis.asyncio.client import Pipeline, PubSub
2321
from structlog import get_logger
2422

25-
from orchestrator.services.subscriptions import _generate_etag
2623
from orchestrator.settings import app_settings
27-
from orchestrator.utils.json import PY_JSON_TYPES, json_dumps, json_loads
2824
from orchestrator.utils.redis_client import (
2925
create_redis_asyncio_client,
3026
create_redis_client,
@@ -37,52 +33,6 @@
3733
ONE_WEEK = 3600 * 24 * 7
3834

3935

40-
def caching_models_enabled() -> bool:
41-
return getenv("AIOCACHE_DISABLE", "0") == "0" and app_settings.CACHE_DOMAIN_MODELS
42-
43-
44-
def to_redis(subscription: dict[str, Any]) -> str | None:
45-
if caching_models_enabled():
46-
logger.info("Setting cache for subscription", subscription=subscription["subscription_id"])
47-
etag = _generate_etag(subscription)
48-
cache.set(f"orchestrator:domain:{subscription['subscription_id']}", json_dumps(subscription), ex=ONE_WEEK)
49-
cache.set(f"orchestrator:domain:etag:{subscription['subscription_id']}", etag, ex=ONE_WEEK)
50-
return etag
51-
52-
logger.warning("Caching disabled, not caching subscription", subscription=subscription["subscription_id"])
53-
return None
54-
55-
56-
def from_redis(subscription_id: UUID) -> tuple[PY_JSON_TYPES, str] | None:
57-
log = logger.bind(subscription_id=subscription_id)
58-
59-
if app_settings.ENABLE_SUBSCRIPTION_MODEL_OPTIMIZATIONS:
60-
# TODO #900 remove toggle and remove usage of this function in get_subscription_dict
61-
log.info("Using SubscriptionModel optimization, not loading subscription from redis cache")
62-
return None
63-
64-
if caching_models_enabled():
65-
log.debug("Try to retrieve subscription from cache")
66-
obj = cache.get(f"orchestrator:domain:{subscription_id}")
67-
etag = cache.get(f"orchestrator:domain:etag:{subscription_id}")
68-
if obj and etag:
69-
log.info("Retrieved subscription from cache")
70-
return json_loads(obj), etag.decode("utf-8")
71-
log.info("Subscription not found in cache")
72-
return None
73-
log.warning("Caching disabled, not loading subscription")
74-
return None
75-
76-
77-
def delete_from_redis(subscription_id: UUID) -> None:
78-
if caching_models_enabled():
79-
logger.info("Deleting subscription object from cache", subscription_id=subscription_id)
80-
cache.delete(f"orchestrator:domain:{subscription_id}")
81-
cache.delete(f"orchestrator:domain:etag:{subscription_id}")
82-
else:
83-
logger.warning("Caching disabled, not deleting subscription", subscription=subscription_id)
84-
85-
8636
def default_get_subscription_id(data: Any) -> UUID:
8737
if hasattr(data, "subscription_id"):
8838
return data.subscription_id
@@ -91,22 +41,6 @@ def default_get_subscription_id(data: Any) -> UUID:
9141
return data
9242

9343

94-
def delete_subscription_from_redis(
95-
extract_fn: Callable[..., UUID] = default_get_subscription_id,
96-
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
97-
def _delete_subscription_from_redis(func: Callable[..., Any]) -> Callable[..., Any]:
98-
@functools.wraps(func)
99-
async def wrapper(*args: tuple, **kwargs: dict[str, Any]) -> Any:
100-
data = await func(*args, **kwargs)
101-
key = extract_fn(data)
102-
delete_from_redis(key)
103-
return data
104-
105-
return wrapper
106-
107-
return _delete_subscription_from_redis
108-
109-
11044
async def delete_keys_matching_pattern(_cache: AIORedis, pattern: str, chunksize: int = 5000) -> int:
11145
"""Delete all keys matching the given pattern.
11246

orchestrator/workflows/modify_note.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,10 @@
1313
from orchestrator.db import db
1414
from orchestrator.forms import SubmitFormPage
1515
from orchestrator.services import subscriptions
16-
from orchestrator.settings import app_settings
1716
from orchestrator.targets import Target
1817
from orchestrator.utils.json import to_serializable
19-
from orchestrator.workflow import StepList, conditional, done, init, step, workflow
20-
from orchestrator.workflows.steps import cache_domain_models, store_process_subscription
18+
from orchestrator.workflow import StepList, done, init, step, workflow
19+
from orchestrator.workflows.steps import store_process_subscription
2120
from orchestrator.workflows.utils import wrap_modify_initial_input_form
2221
from pydantic_forms.types import FormGenerator, State, UUIDstr
2322
from pydantic_forms.validators import LongText
@@ -54,11 +53,4 @@ def store_subscription_note(subscription_id: UUIDstr, note: str) -> State:
5453

5554
@workflow("Modify Note", initial_input_form=wrap_modify_initial_input_form(initial_input_form), target=Target.MODIFY)
5655
def modify_note() -> StepList:
57-
push_subscriptions = conditional(lambda _: app_settings.CACHE_DOMAIN_MODELS)
58-
return (
59-
init
60-
>> store_process_subscription(Target.MODIFY)
61-
>> store_subscription_note
62-
>> push_subscriptions(cache_domain_models)
63-
>> done
64-
)
56+
return init >> store_process_subscription(Target.MODIFY) >> store_subscription_note >> done

0 commit comments

Comments
 (0)