Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 89 additions & 16 deletions backend/app/routes/migration_routes.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import json
from collections import defaultdict
from uuid import UUID

from fastapi import APIRouter, Depends, HTTPException
from supabase._async.client import AsyncClient

from app.core.dependencies import get_current_admin
from app.schemas.classification_schemas import Classification
from app.core.supabase import get_async_supabase
from app.schemas.classification_schemas import Classification, ExtractedFile
from app.schemas.migration_schemas import Migration, MigrationCreate
from app.schemas.relationship_schemas import Relationship
from app.services.classification_service import (
Expand All @@ -18,7 +22,8 @@
RelationshipService,
get_relationship_service,
)
from app.utils.migrations import create_migrations
from app.utils.migrations import _table_name_for_classification, create_migrations
from app.utils.tenant_connection import get_schema_name

router = APIRouter(prefix="/migrations", tags=["Migrations"])

Expand Down Expand Up @@ -56,7 +61,6 @@ async def generate_migrations(
Then insert the new migrations into the `migrations` table and return them.
"""
try:
# 1) Load current state from DB
classifications: list[
Classification
] = await classification_service.get_classifications(tenant_id)
Expand All @@ -72,19 +76,15 @@ async def generate_migrations(
status_code=404, detail="No classifications found for tenant"
)

# 2) Compute *new* migrations (pure function)
# IMPORTANT: this should return list[MigrationCreate]
new_migration_creates: list[MigrationCreate] = create_migrations(
classifications=classifications,
relationships=relationships,
initial_migrations=existing_migrations,
)

if not new_migration_creates:
# Nothing new to add
return []

# 3) Insert into DB and return the created migrations
created: list[Migration] = []
for m in new_migration_creates:
new_id = await migration_service.create_migration(m)
Expand Down Expand Up @@ -122,6 +122,88 @@ async def execute_migrations(
raise HTTPException(status_code=500, detail=str(e)) from e


@router.post("/load_data/{tenant_id}")
async def load_data_for_tenant(
tenant_id: UUID,
classification_service: ClassificationService = Depends(get_classification_service),
supabase: AsyncClient = Depends(get_async_supabase),
admin=Depends(get_current_admin),
) -> dict:
"""
Full data sync for a tenant:

- Fetch all extracted files + their classifications
- Group by classification
- For each classification:
* derive table name (same as migrations) using helper function
* DELETE existing rows for that tenant in tenant-specific schema
* INSERT rows for each file in that classification in tenant-specific schema
"""
try:
extracted_files: list[
ExtractedFile
] = await classification_service.get_extracted_files(tenant_id)

if not extracted_files:
return {
"status": "ok",
"tables_updated": [],
"message": "No extracted files found",
}

files_by_class_id: dict[UUID, list[ExtractedFile]] = defaultdict(list)

for ef in extracted_files:
if ef.classification is None:
continue
files_by_class_id[ef.classification.classification_id].append(ef)

# Get tenant-specific schema name
schema_name = get_schema_name(tenant_id)
updated_tables: list[str] = []

for class_files in files_by_class_id.values():
classification = class_files[0].classification
table_name = _table_name_for_classification(classification)
qualified_table_name = f'"{schema_name}"."{table_name}"'

# Delete existing rows for this tenant in the tenant-specific schema
# Use parameterized approach via dollar-quoting for safety
tenant_id_str = str(tenant_id)
delete_sql = f"DELETE FROM {qualified_table_name} WHERE tenant_id = '{tenant_id_str}';"
await supabase.rpc("execute_sql", {"query": delete_sql}).execute()

# Insert new rows into the tenant-specific schema
if class_files:
# Build INSERT statement with proper JSONB escaping using dollar-quoting
values = []
for idx, f in enumerate(class_files):
# Use dollar-quoting for JSONB to avoid SQL injection
# Convert to JSON string - dollar-quoting doesn't require escaping
data_json = json.dumps(f.extracted_data)
# Use dollar-quoting with unique tag per value to safely handle any characters
tag = f"json{idx}"
values.append(
f"('{str(f.extracted_file_id)}', '{tenant_id_str}', ${tag}${data_json}${tag}$::jsonb)"
)

insert_sql = f"""
INSERT INTO {qualified_table_name} (id, tenant_id, data)
VALUES {", ".join(values)};
""".strip()
await supabase.rpc("execute_sql", {"query": insert_sql}).execute()

updated_tables.append(table_name)

return {
"status": "ok",
"tables_updated": updated_tables,
"message": "Data synced from extracted_files into generated tables",
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) from e


@router.get("/connection-url/{tenant_id}")
async def get_tenant_connection_url(
tenant_id: UUID,
Expand All @@ -130,15 +212,6 @@ async def get_tenant_connection_url(
) -> dict:
"""
Get a PostgreSQL connection URL for a specific tenant.

This URL is scoped to only show the tenant's generated tables.

Query params:
include_public: If true, also include public schema (for shared tables)

Example:
GET /migrations/connection-url/{tenant_id}
GET /migrations/connection-url/{tenant_id}?include_public=true
"""
from app.utils.tenant_connection import get_schema_name, get_tenant_connection_url

Expand Down
17 changes: 13 additions & 4 deletions backend/app/services/pattern_recognition_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,20 @@ async def analyze_and_store_relationships(
) -> list[RelationshipCreate]:
"""
Main workflow:
1. Fetch classifications and extracted files
2. Run pattern recognition analysis
3. Store relationships in database
4. Return created relationships
1. Delete existing relationships for this tenant
2. Fetch classifications and extracted files
3. Run pattern recognition analysis
4. Store relationships in database
5. Return created relationships
"""
# DELETE existing relationships first to avoid duplicates
await (
self.supabase.table("relationships")
.delete()
.eq("tenant_id", str(tenant_id))
.execute()
)

# Fetch data
classifications = await self.get_classifications(tenant_id)
extracted_files = await self.get_extracted_files(tenant_id)
Expand Down
Loading
Loading