Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

* *0.3.0* (2025-03-19)
* The whole queue iteration now is wrapped in a transaction atomic

* *0.2.0* (2025-03-17)
* Extend strict mode to prohibit event (!) handlers to talk to the database

Expand Down
2 changes: 1 addition & 1 deletion queuebie/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Simple message queue for commands and events (CQRS)"""

__version__ = "0.2.0"
__version__ = "0.3.0"

from queuebie.registry import MessageRegistry

Expand Down
57 changes: 28 additions & 29 deletions queuebie/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ def handle_message(messages: Message | list[Message]) -> None:
# Run auto-registry
message_registry.autodiscover()

while queue:
message = queue.pop(0)
if isinstance(message, Command):
handler_list = message_registry.command_dict.get(message.module_path(), [])
block_db_access = False
else:
handler_list = message_registry.event_dict.get(message.module_path(), [])
block_db_access = True if get_queuebie_strict_mode() else False
with transaction.atomic():
while queue:
message = queue.pop(0)
if isinstance(message, Command):
handler_list = message_registry.command_dict.get(message.module_path(), [])
block_db_access = False
else:
handler_list = message_registry.event_dict.get(message.module_path(), [])
block_db_access = True if get_queuebie_strict_mode() else False

new_messages = _process_message(handler_list=handler_list, message=message, block_db_access=block_db_access)
queue.extend(new_messages)
new_messages = _process_message(handler_list=handler_list, message=message, block_db_access=block_db_access)
queue.extend(new_messages)


def _process_message(*, handler_list: list, message: [Command, Event], block_db_access: bool) -> list[Message]:
Expand All @@ -41,24 +42,22 @@ def _process_message(*, handler_list: list, message: [Command, Event], block_db_
logger = get_logger()
messages = []

# TODO: should the whole chain be atomic and not just the handler?
with transaction.atomic():
for handler in handler_list:
try:
logger.debug(
f"Handling command '{message.module_path()}' ({message.uuid}) with handler '{handler['name']}'."
)
module = importlib.import_module(handler["module"])
handler_function = getattr(module, handler["name"])
with BlockDatabaseAccess() if block_db_access else nullcontext():
handler_messages = handler_function(context=message) or []
handler_messages = handler_messages if isinstance(handler_messages, list) else [handler_messages]
if len(handler_messages) > 0:
messages.extend(handler_messages)
uuid_list = [f"{m!s}" for m in handler_messages]
logger.debug(f"New messages: {uuid_list!s}")
except Exception as e:
logger.debug(f"Exception handling command {message.module_path()}: {e!s}")
raise e from e
for handler in handler_list:
try:
logger.debug(
f"Handling command '{message.module_path()}' ({message.uuid}) with handler '{handler['name']}'."
)
module = importlib.import_module(handler["module"])
handler_function = getattr(module, handler["name"])
with BlockDatabaseAccess() if block_db_access else nullcontext():
handler_messages = handler_function(context=message) or []
handler_messages = handler_messages if isinstance(handler_messages, list) else [handler_messages]
if len(handler_messages) > 0:
messages.extend(handler_messages)
uuid_list = [f"{m!s}" for m in handler_messages]
logger.debug(f"New messages: {uuid_list!s}")
except Exception as e:
logger.debug(f"Exception handling command {message.module_path()}: {e!s}")
raise e from e

return messages
18 changes: 13 additions & 5 deletions testapp/handlers/commands/testapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
from queuebie import message_registry
from queuebie.logger import get_logger
from queuebie.messages import Event
from testapp.messages.commands.my_commands import CriticalCommand, DoSomething, PersistSomething
from testapp.messages.commands.my_commands import (
CreateUser,
CriticalCommand,
DoSomething,
PersistSomething,
RaiseRuntimeError,
)
from testapp.messages.events.my_events import SomethingHappened, SomethingHappenedThatWantsToBePersisted


Expand All @@ -29,9 +35,11 @@ def handle_critical_command(*, context: CriticalCommand) -> None:
raise RuntimeError("Handler is broken.") # noqa: TRY003


def create_user(*args, **kwargs):
return User.objects.create_user(username="username")
@message_registry.register_command(command=CreateUser)
def create_user(context: CreateUser):
User.objects.create_user(username=context.username)


def raise_exception(*args, **kwargs):
raise RuntimeError("Something is broken.") # noqa: TRY003
@message_registry.register_command(command=RaiseRuntimeError)
def raise_exception(context: RaiseRuntimeError):
raise RuntimeError(context.error_msg)
10 changes: 10 additions & 0 deletions testapp/messages/commands/my_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,13 @@ class SameNameCommand(Command):
@dataclass(kw_only=True)
class PersistSomething(Command):
any_var: str


@dataclass(kw_only=True)
class CreateUser(Command):
username: str


@dataclass(kw_only=True)
class RaiseRuntimeError(Command):
error_msg: str
2 changes: 1 addition & 1 deletion tests/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def test_message_autodiscover_regular():
message_registry.autodiscover()

# Assert one command registered
assert len(message_registry.command_dict) == 3 # noqa: PLR2004
assert len(message_registry.command_dict) == 5 # noqa: PLR2004
assert DoSomething.module_path() in message_registry.command_dict.keys()
assert CriticalCommand.module_path() in message_registry.command_dict.keys()

Expand Down
22 changes: 11 additions & 11 deletions tests/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,15 @@
from queuebie import MessageRegistry
from queuebie.database_blocker import DatabaseAccessDeniedError
from queuebie.exceptions import InvalidMessageTypeError
from queuebie.runner import _process_message, handle_message
from testapp.messages.commands.my_commands import CriticalCommand, DoSomething, PersistSomething, SameNameCommand
from queuebie.runner import handle_message
from testapp.messages.commands.my_commands import (
CreateUser,
CriticalCommand,
DoSomething,
PersistSomething,
RaiseRuntimeError,
SameNameCommand,
)
from testapp.messages.events.my_events import (
SomethingHappened,
SomethingHappenedThatWantsToBePersistedViaEvent,
Expand Down Expand Up @@ -125,15 +132,8 @@ def dummy_func(*args, **kwargs):

@pytest.mark.django_db
@mock.patch("queuebie.registry.get_queuebie_strict_mode", return_value=False)
def test_process_message_atomic_works(mocked_handle_command, *args):
handler_list = [
{"module": "testapp.handlers.commands.testapp", "name": "create_user"},
{"module": "testapp.handlers.commands.testapp", "name": "raise_exception"},
]

message = DoSomething(my_var=1)

def test_handle_message_atomic_works(*args):
with pytest.raises(RuntimeError, match="Something is broken."):
_process_message(handler_list=handler_list, message=message, block_db_access=False)
handle_message([CreateUser(username="username"), RaiseRuntimeError(error_msg="Something is broken.")])

assert User.objects.filter(username="username").exists() is False