Skip to content
4 changes: 3 additions & 1 deletion backend/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import app.modules.matching.models
import app.modules.relationship_management.models
import app.modules.tenant_housing_orgs.models
import app.modules.workflow.models
# import app.modules.workflow.models
import app.projections.dashboard_users_view
import app.core.sa_event_store

from logging.config import fileConfig

Expand Down
46 changes: 46 additions & 0 deletions backend/alembic/versions/281f96d4d453_dashboard_users_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Dashboard users view.

Revision ID: 281f96d4d453
Revises: fead8af85db5
Create Date: 2024-11-04 20:58:09.765444

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '281f96d4d453'
down_revision = 'fead8af85db5'
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('dashboard_users_view', sa.Column('user_id', sa.String(), nullable=False))
op.add_column('dashboard_users_view', sa.Column('name', sa.String(), nullable=False))
op.add_column('dashboard_users_view', sa.Column('status', sa.String(), nullable=False))
op.add_column('dashboard_users_view', sa.Column('coordinator_name', sa.String(), nullable=False))
op.add_column('dashboard_users_view', sa.Column('updated', sa.DateTime(), nullable=False))
op.drop_column('dashboard_users_view', 'id')
op.drop_column('dashboard_users_view', 'caseStatus')
op.drop_column('dashboard_users_view', 'userName')
op.drop_column('dashboard_users_view', 'lastUpdated')
op.drop_column('dashboard_users_view', 'coordinatorName')
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('dashboard_users_view', sa.Column('coordinatorName', sa.VARCHAR(), autoincrement=False, nullable=False))
op.add_column('dashboard_users_view', sa.Column('lastUpdated', postgresql.TIMESTAMP(), autoincrement=False, nullable=False))
op.add_column('dashboard_users_view', sa.Column('userName', sa.VARCHAR(), autoincrement=False, nullable=False))
op.add_column('dashboard_users_view', sa.Column('caseStatus', sa.VARCHAR(), autoincrement=False, nullable=False))
op.add_column('dashboard_users_view', sa.Column('id', sa.VARCHAR(), autoincrement=False, nullable=False))
op.drop_column('dashboard_users_view', 'updated')
op.drop_column('dashboard_users_view', 'coordinator_name')
op.drop_column('dashboard_users_view', 'status')
op.drop_column('dashboard_users_view', 'name')
op.drop_column('dashboard_users_view', 'user_id')
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Dashboard users read model.

Revision ID: 6ce898b0e45e
Revises: 281f96d4d453
Create Date: 2024-11-04 21:19:21.415171

"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision = '6ce898b0e45e'
down_revision = '281f96d4d453'
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('dashboard_users_view')
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('dashboard_users_view',
sa.Column('email', sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column('role', sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column('notes', sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column('user_id', sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column('name', sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column('status', sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column('coordinator_name', sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column('updated', postgresql.TIMESTAMP(), autoincrement=False, nullable=False)
)
# ### end Alembic commands ###
2 changes: 1 addition & 1 deletion backend/alembic/versions/a1a53aaf81d3_initial_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def upgrade() -> None:
sa.Column('description', sa.String(), nullable=False),
sa.Column('created_at',
sa.DateTime(timezone=True),
server_default=sa.sql.func.utcnow(),
server_default=sa.func.timezone('UTC', sa.func.now()),
nullable=False), sa.PrimaryKeyConstraint('form_id'))
op.create_table('housing_orgs',
sa.Column('housing_org_id', sa.Integer(), nullable=False),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""Refactor users and roles tables. Add event store.

Revision ID: fead8af85db5
Revises: a1a53aaf81d3
Create Date: 2024-11-04 19:44:58.247093

"""
from alembic import op
import sqlalchemy as sa
import app


# revision identifiers, used by Alembic.
revision = 'fead8af85db5'
down_revision = 'a1a53aaf81d3'
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('dashboard_users_view',
sa.Column('id', sa.String(), nullable=False),
sa.Column('email', sa.String(), nullable=False),
sa.Column('userName', sa.String(), nullable=False),
sa.Column('role', sa.String(), nullable=False),
sa.Column('caseStatus', sa.String(), nullable=False),
sa.Column('coordinatorName', sa.String(), nullable=False),
sa.Column('lastUpdated', sa.DateTime(), nullable=False),
sa.Column('notes', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('id', 'email')
)
op.create_table('event_streams',
sa.Column('stream_id', sa.String(length=36), nullable=False),
sa.Column('stream_version', sa.Integer(), nullable=False),
sa.Column('event_data', sa.JSON(), nullable=False),
sa.Column('meta_data', sa.JSON(), nullable=True),
sa.Column('stored_at', sa.DateTime(timezone=True), nullable=False),
sa.PrimaryKeyConstraint('stream_id', 'stream_version')
)
op.create_table('roles',
sa.Column('role', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('role')
)
op.create_table('users',
sa.Column('user_id', app.modules.access.models.UserIdType(), nullable=False),
sa.Column('email', app.modules.access.models.EmailAddressType(), nullable=False),
sa.Column('first_name', sa.String(), nullable=False),
sa.Column('middle_name', sa.String(), nullable=True),
sa.Column('last_name', sa.String(), nullable=False),
sa.Column('role', sa.String(), nullable=False),
sa.ForeignKeyConstraint(['role'], ['roles.role'], ),
sa.PrimaryKeyConstraint('user_id')
)
op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True)
op.drop_constraint('user_roleId_fkey', 'user', 'foreignkey')
op.drop_constraint('responses_user_id_fkey', 'responses', 'foreignkey')
op.drop_constraint('unmatched_guest_case_status_id_fkey', 'unmatched_guest_case', 'foreignkey')
op.drop_constraint('unmatched_guest_case_guest_id_fkey', 'unmatched_guest_case', 'foreignkey')
op.drop_constraint('unmatched_guest_case_coordinator_id_fkey', 'unmatched_guest_case', 'foreignkey')
op.drop_index('ix_role_id', table_name='role')
op.drop_table('role')
op.drop_table('unmatched_guest_case_status')
op.drop_index('ix_user_id', table_name='user')
op.drop_table('user')
op.drop_table('unmatched_guest_case')
op.alter_column('responses', 'user_id', new_column_name='old_user_id')
op.add_column('responses',
sa.Column('user_id', app.modules.access.models.UserIdType(), nullable=False))
op.drop_column('responses', 'old_user_id')
op.create_foreign_key(None, 'responses', 'users', ['user_id'], ['user_id'])

op.execute(
"INSERT INTO roles (role) VALUES ('admin') ON CONFLICT DO NOTHING")
op.execute(
"INSERT INTO roles (role) VALUES ('guest') ON CONFLICT DO NOTHING")
op.execute(
"INSERT INTO roles (role) VALUES ('coordinator') ON CONFLICT DO NOTHING"
)
op.execute(
"INSERT INTO roles (role) VALUES ('host') ON CONFLICT DO NOTHING")
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(None, 'responses', type_='foreignkey')
op.create_foreign_key('responses_user_id_fkey', 'responses', 'user', ['user_id'], ['id'])
op.alter_column('responses', 'user_id',
existing_type=app.modules.access.models.UserIdType(),
type_=sa.INTEGER(),
existing_nullable=False)
op.create_table('unmatched_guest_case',
sa.Column('id', sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column('guest_id', sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column('coordinator_id', sa.INTEGER(), autoincrement=False, nullable=False),
sa.Column('status_id', sa.INTEGER(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(['coordinator_id'], ['user.id'], name='unmatched_guest_case_coordinator_id_fkey'),
sa.ForeignKeyConstraint(['guest_id'], ['user.id'], name='unmatched_guest_case_guest_id_fkey'),
sa.ForeignKeyConstraint(['status_id'], ['unmatched_guest_case_status.id'], name='unmatched_guest_case_status_id_fkey'),
sa.PrimaryKeyConstraint('id', name='unmatched_guest_case_pkey')
)
op.create_table('user',
sa.Column('id', sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column('email', sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column('firstName', sa.VARCHAR(length=255), autoincrement=False, nullable=False),
sa.Column('middleName', sa.VARCHAR(length=255), autoincrement=False, nullable=True),
sa.Column('lastName', sa.VARCHAR(length=255), autoincrement=False, nullable=True),
sa.Column('roleId', sa.INTEGER(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(['roleId'], ['role.id'], name='user_roleId_fkey'),
sa.PrimaryKeyConstraint('id', name='user_pkey'),
sa.UniqueConstraint('email', name='user_email_key')
)
op.create_index('ix_user_id', 'user', ['id'], unique=False)
op.create_table('unmatched_guest_case_status',
sa.Column('id', sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column('status_text', sa.VARCHAR(), autoincrement=False, nullable=False),
sa.PrimaryKeyConstraint('id', name='unmatched_guest_case_status_pkey'),
sa.UniqueConstraint('status_text', name='unmatched_guest_case_status_status_text_key')
)
op.create_table('role',
sa.Column('id', sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column('type', sa.VARCHAR(), autoincrement=False, nullable=False),
sa.PrimaryKeyConstraint('id', name='role_pkey'),
sa.UniqueConstraint('type', name='role_type_key')
)
op.create_index('ix_role_id', 'role', ['id'], unique=False)
op.drop_index(op.f('ix_users_email'), table_name='users')
op.drop_table('users')
op.drop_table('roles')
op.drop_table('event_streams')
op.drop_table('dashboard_users_view')
# ### end Alembic commands ###
87 changes: 87 additions & 0 deletions backend/app/core/event_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from abc import abstractmethod
from dataclasses import dataclass
from datetime import datetime, timezone
import importlib
import json
from typing import Any, Protocol

from .interfaces import Identity, DomainEvent


class AppendOnlyStoreConcurrencyException(Exception):
pass


@dataclass
class DomainEventStream:
version: int
events: list[DomainEvent]


class EventStore(Protocol):
"""Abstraction for something that stores domain events."""

@abstractmethod
def fetch(self, stream_id: Identity) -> DomainEventStream:
raise NotImplementedError

@abstractmethod
def append(self, stream_id: Identity, new_events: list[DomainEvent],
expected_version: int):
raise NotImplementedError


class InMemoryEventStore:

@dataclass
class EventStreamEntry:
stream_id: str
stream_version: int
event_data: str
meta_data: dict[str, Any]
stored_at: datetime

def __init__(self):
self.events: dict[int, list[self.EventStreamEntry]] = {}

def fetch(self, stream_id: Identity) -> DomainEventStream:
stream = DomainEventStream(version=0, events=[])

for stream_entry in self.events.get(stream_id, []):
stream.version = stream_entry.stream_version
stream.events.append(
self._deserialize_event(json.loads(stream_entry.event_data)))

return stream

def append(self, stream_id: Identity, new_events: list[DomainEvent],
expected_version: int):
stream_entries = self.events.get(stream_id, [])

version = len(stream_entries)
if version != expected_version:
raise AppendOnlyStoreConcurrencyException(
f"version={version}, expected={expected_version}")

stream_entries.extend([
self.EventStreamEntry(
stream_id=str(stream_id),
stream_version=version + inc,
event_data=json.dumps(e.to_dict(), default=str),
meta_data={},
stored_at=datetime.now(tz=timezone.utc),
) for inc, e in enumerate(new_events, start=1)
])

self.events[stream_id] = stream_entries

def _deserialize_event(self, event_data):
"""Convert a dictionary back to the correct event class."""
fully_qualified_type = event_data["type"]
module_name, class_name = fully_qualified_type.rsplit(".", 1)

# Dynamically import the module and get the class
module = importlib.import_module(module_name)
event_class = getattr(module, class_name)

return event_class.from_dict(event_data["data"])
Loading
Loading