diff --git a/CHANGES.md b/CHANGES.md index 4c2a049..13bd5ab 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,8 @@ # Changelog +* *0.3.0* (2025-03-19) + * The whole queue iteration now is wrapped in a transaction atomic + * *0.2.0* (2025-03-17) * Extend strict mode to prohibit event (!) handlers to talk to the database diff --git a/queuebie/__init__.py b/queuebie/__init__.py index 5328de3..260a87c 100644 --- a/queuebie/__init__.py +++ b/queuebie/__init__.py @@ -1,6 +1,6 @@ """Simple message queue for commands and events (CQRS)""" -__version__ = "0.2.0" +__version__ = "0.3.0" from queuebie.registry import MessageRegistry diff --git a/queuebie/runner.py b/queuebie/runner.py index 746b39d..283ffa6 100644 --- a/queuebie/runner.py +++ b/queuebie/runner.py @@ -21,17 +21,18 @@ def handle_message(messages: Message | list[Message]) -> None: # Run auto-registry message_registry.autodiscover() - while queue: - message = queue.pop(0) - if isinstance(message, Command): - handler_list = message_registry.command_dict.get(message.module_path(), []) - block_db_access = False - else: - handler_list = message_registry.event_dict.get(message.module_path(), []) - block_db_access = True if get_queuebie_strict_mode() else False + with transaction.atomic(): + while queue: + message = queue.pop(0) + if isinstance(message, Command): + handler_list = message_registry.command_dict.get(message.module_path(), []) + block_db_access = False + else: + handler_list = message_registry.event_dict.get(message.module_path(), []) + block_db_access = True if get_queuebie_strict_mode() else False - new_messages = _process_message(handler_list=handler_list, message=message, block_db_access=block_db_access) - queue.extend(new_messages) + new_messages = _process_message(handler_list=handler_list, message=message, block_db_access=block_db_access) + queue.extend(new_messages) def _process_message(*, handler_list: list, message: [Command, Event], block_db_access: bool) -> list[Message]: @@ -41,24 +42,22 @@ def _process_message(*, handler_list: list, message: [Command, Event], block_db_ logger = get_logger() messages = [] - # TODO: should the whole chain be atomic and not just the handler? - with transaction.atomic(): - for handler in handler_list: - try: - logger.debug( - f"Handling command '{message.module_path()}' ({message.uuid}) with handler '{handler['name']}'." - ) - module = importlib.import_module(handler["module"]) - handler_function = getattr(module, handler["name"]) - with BlockDatabaseAccess() if block_db_access else nullcontext(): - handler_messages = handler_function(context=message) or [] - handler_messages = handler_messages if isinstance(handler_messages, list) else [handler_messages] - if len(handler_messages) > 0: - messages.extend(handler_messages) - uuid_list = [f"{m!s}" for m in handler_messages] - logger.debug(f"New messages: {uuid_list!s}") - except Exception as e: - logger.debug(f"Exception handling command {message.module_path()}: {e!s}") - raise e from e + for handler in handler_list: + try: + logger.debug( + f"Handling command '{message.module_path()}' ({message.uuid}) with handler '{handler['name']}'." + ) + module = importlib.import_module(handler["module"]) + handler_function = getattr(module, handler["name"]) + with BlockDatabaseAccess() if block_db_access else nullcontext(): + handler_messages = handler_function(context=message) or [] + handler_messages = handler_messages if isinstance(handler_messages, list) else [handler_messages] + if len(handler_messages) > 0: + messages.extend(handler_messages) + uuid_list = [f"{m!s}" for m in handler_messages] + logger.debug(f"New messages: {uuid_list!s}") + except Exception as e: + logger.debug(f"Exception handling command {message.module_path()}: {e!s}") + raise e from e return messages diff --git a/testapp/handlers/commands/testapp.py b/testapp/handlers/commands/testapp.py index e7ca787..61586e3 100644 --- a/testapp/handlers/commands/testapp.py +++ b/testapp/handlers/commands/testapp.py @@ -5,7 +5,13 @@ from queuebie import message_registry from queuebie.logger import get_logger from queuebie.messages import Event -from testapp.messages.commands.my_commands import CriticalCommand, DoSomething, PersistSomething +from testapp.messages.commands.my_commands import ( + CreateUser, + CriticalCommand, + DoSomething, + PersistSomething, + RaiseRuntimeError, +) from testapp.messages.events.my_events import SomethingHappened, SomethingHappenedThatWantsToBePersisted @@ -29,9 +35,11 @@ def handle_critical_command(*, context: CriticalCommand) -> None: raise RuntimeError("Handler is broken.") # noqa: TRY003 -def create_user(*args, **kwargs): - return User.objects.create_user(username="username") +@message_registry.register_command(command=CreateUser) +def create_user(context: CreateUser): + User.objects.create_user(username=context.username) -def raise_exception(*args, **kwargs): - raise RuntimeError("Something is broken.") # noqa: TRY003 +@message_registry.register_command(command=RaiseRuntimeError) +def raise_exception(context: RaiseRuntimeError): + raise RuntimeError(context.error_msg) diff --git a/testapp/messages/commands/my_commands.py b/testapp/messages/commands/my_commands.py index 32af73c..24eda0b 100644 --- a/testapp/messages/commands/my_commands.py +++ b/testapp/messages/commands/my_commands.py @@ -21,3 +21,13 @@ class SameNameCommand(Command): @dataclass(kw_only=True) class PersistSomething(Command): any_var: str + + +@dataclass(kw_only=True) +class CreateUser(Command): + username: str + + +@dataclass(kw_only=True) +class RaiseRuntimeError(Command): + error_msg: str diff --git a/tests/test_registry.py b/tests/test_registry.py index 1a72a44..7166859 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -124,7 +124,7 @@ def test_message_autodiscover_regular(): message_registry.autodiscover() # Assert one command registered - assert len(message_registry.command_dict) == 3 # noqa: PLR2004 + assert len(message_registry.command_dict) == 5 # noqa: PLR2004 assert DoSomething.module_path() in message_registry.command_dict.keys() assert CriticalCommand.module_path() in message_registry.command_dict.keys() diff --git a/tests/test_runner.py b/tests/test_runner.py index 6b0ad55..3caf55a 100644 --- a/tests/test_runner.py +++ b/tests/test_runner.py @@ -8,8 +8,15 @@ from queuebie import MessageRegistry from queuebie.database_blocker import DatabaseAccessDeniedError from queuebie.exceptions import InvalidMessageTypeError -from queuebie.runner import _process_message, handle_message -from testapp.messages.commands.my_commands import CriticalCommand, DoSomething, PersistSomething, SameNameCommand +from queuebie.runner import handle_message +from testapp.messages.commands.my_commands import ( + CreateUser, + CriticalCommand, + DoSomething, + PersistSomething, + RaiseRuntimeError, + SameNameCommand, +) from testapp.messages.events.my_events import ( SomethingHappened, SomethingHappenedThatWantsToBePersistedViaEvent, @@ -125,15 +132,8 @@ def dummy_func(*args, **kwargs): @pytest.mark.django_db @mock.patch("queuebie.registry.get_queuebie_strict_mode", return_value=False) -def test_process_message_atomic_works(mocked_handle_command, *args): - handler_list = [ - {"module": "testapp.handlers.commands.testapp", "name": "create_user"}, - {"module": "testapp.handlers.commands.testapp", "name": "raise_exception"}, - ] - - message = DoSomething(my_var=1) - +def test_handle_message_atomic_works(*args): with pytest.raises(RuntimeError, match="Something is broken."): - _process_message(handler_list=handler_list, message=message, block_db_access=False) + handle_message([CreateUser(username="username"), RaiseRuntimeError(error_msg="Something is broken.")]) assert User.objects.filter(username="username").exists() is False