Skip to content

Architecture Overview

trentleslie edited this page Nov 19, 2025 · 1 revision

Nexify FlowForge: Software Architecture Overview

System Purpose

Nexify is a workflow orchestration engine designed for biological data harmonization pipelines. It solves the problem of mapping heterogeneous biological identifiers (proteins, metabolites, chemistry data) across multiple databases and ontologies through configurable, multi-stage matching strategies. Primary users are bioinformaticians and data scientists who need to build reproducible ETL pipelines without writing orchestration code.

High-Level Architecture

FlowForge implements a YAML-driven, monolithic orchestration engine with a self-registering action pattern. The architecture prioritizes in-memory data flow over distributed execution, trading off scalability for simplicity and rapid iteration. This design choice reflects the target use case: research-oriented batch processing of biological datasets (1K-100K identifiers) where workflows evolve frequently and require low-barrier configuration changes. The system avoids microservices complexity in favor of a single-process execution model with extensibility achieved through a plugin-style action registry.

Architecture Diagram

graph TB
    subgraph "Workflow Layer"
        YAML[YAML Workflow Definition]
        PARAMS[Parameter Resolution]
    end

    subgraph "Orchestration Core"
        MSS[MinimalStrategyService<br/>Strategy Executor]
        CTX[ExecutionContext<br/>dict + Pydantic dual contexts]
        SYNC[Context Synchronizer]
    end

    subgraph "Action Registry"
        REG[ACTION_REGISTRY<br/>Dict str → Type]
        BASE[TypedStrategyAction<br/>Base class]
        ACTIONS[50+ Domain Actions<br/>Proteins, Metabolites, Chemistry]
    end

    subgraph "Standards Layer"
        UCTX[UniversalContext<br/>dict/object wrapper]
        VALID[Pydantic Validators<br/>ActionParamsBase]
        TRACE[Debug Tracer<br/>Identifier tracking]
    end

    subgraph "Data Storage"
        MEM["In-Memory Context<br/>context['datasets'] = {<br/>  'stage1': list[dict],<br/>  'stage2': list[dict]<br/>}"]
        FILES[File I/O<br/>CSV/TSV/Parquet]
    end

    YAML --> PARAMS
    PARAMS --> MSS
    MSS --> CTX
    CTX <--> SYNC
    SYNC <--> UCTX
    MSS --> REG
    REG --> BASE
    BASE --> ACTIONS
    ACTIONS --> UCTX
    UCTX --> MEM
    ACTIONS --> FILES
    ACTIONS --> VALID
    MSS --> TRACE

    style MSS fill:#4a90e2,stroke:#2e5c8a,stroke-width:3px,color:#000
    style REG fill:#f5a623,stroke:#c77d1a,stroke-width:3px,color:#000
    style UCTX fill:#7ed321,stroke:#5a9b19,stroke-width:3px,color:#000
    style MEM fill:#bd10e0,stroke:#8b0ca8,stroke-width:3px,color:#fff
Loading

Core Components

  • MinimalStrategyService: Orchestration engine that loads YAML workflows, resolves parameters, creates execution context, and executes actions sequentially with conditional branching support.

  • ACTION_REGISTRY: Central dictionary (Dict[str, Type]) mapping action names to implementation classes, populated automatically via @register_action decorator on import.

  • UniversalContext: Critical wrapper that provides uniform get/set interface for both dict-based (legacy) and Pydantic-based (modern) execution contexts, enabling backward compatibility during migration.

  • TypedStrategyAction: Generic base class (TypedStrategyAction[TParams, TResult]) providing Pydantic-validated parameter models and dual-context execution with automatic type conversion.

  • ExecutionContext: Dual representation (dict + Pydantic) of pipeline state containing datasets (primary data storage), statistics, provenance, output_files, and debug tracers, synchronized after each action.

Data Flow

Data flows through FlowForge as in-memory object references with per-action serialization. A workflow begins with parameter resolution (${parameters.key} substitution), then creates an execution context dict. For each step, the orchestrator retrieves the action class from ACTION_REGISTRY, instantiates it, and calls execute() with the shared context. Actions read DataFrames from context["datasets"] as list[dict], convert to pandas DataFrame, process, then store results back as df.to_dict('records'). The orchestrator synchronizes dict and Pydantic contexts after each step, accumulating all datasets in memory until workflow completion. No serialization occurs between steps at the orchestration level—only within actions for DataFrame storage format conversion.

Data Flow Sequence

sequenceDiagram
    participant YAML as YAML Workflow
    participant MSS as MinimalStrategyService
    participant CTX as ExecutionContext (dict)
    participant ACT as Action (e.g., LOAD_DATASET)
    participant UCTX as UniversalContext
    participant MEM as In-Memory Datasets

    YAML->>MSS: Load workflow + parameters
    MSS->>MSS: Resolve ${parameters.key}
    MSS->>CTX: Create execution_context = {"datasets": {}}

    loop For each workflow step
        MSS->>ACT: Instantiate ACTION_REGISTRY[type]
        MSS->>ACT: execute(context=CTX)
        ACT->>UCTX: wrap(context)
        UCTX->>MEM: get("datasets", {})
        MEM-->>UCTX: {"stage1": [...]}
        UCTX-->>ACT: datasets dict

        ACT->>ACT: df = pd.DataFrame(datasets["stage1"])
        ACT->>ACT: processed = transform(df)
        ACT->>ACT: result = processed.to_dict('records')

        ACT->>UCTX: set("datasets", {"stage2": result})
        UCTX->>MEM: Update context["datasets"]
        ACT-->>MSS: Return ActionResult
        MSS->>MSS: Sync dict ↔ Pydantic contexts
    end

    MSS-->>YAML: Return final context
Loading

Memory Management and Chunking Strategies

FlowForge provides optional chunking utilities for processing large datasets that exceed comfortable memory limits, though chunking is not automatically enforced at the workflow level. Three distinct chunking patterns exist:

1. In-Memory Chunking (EfficientMatcher.chunked_processing()): Processes already-loaded datasets in batches (default 10K items per chunk) by slicing the list and iterating through chunks. Use when data fits in memory but processing benefits from smaller batch sizes for cache locality or intermediate result accumulation.

2. Streaming File Loading (BiologicalFileLoader.load_chunked()): Reads large files via pandas chunksize parameter, yielding DataFrame chunks through a generator pattern without loading the entire file into memory. Use when source files exceed available RAM (>1GB) and you need to process row-by-row or filter during load.

3. Action-Specific Chunking: Individual actions implement chunking internally—for example, the LOINC Qdrant collection action reads 50K-row chunks, filters each chunk for relevant terms, then concatenates only the filtered results. Google Drive sync actions use 10MB upload chunks for resumable transfers.

Current Limitation: Chunking is action-specific and opt-in. The orchestration layer does not automatically detect large datasets or enforce chunking strategies. Actions must explicitly implement chunking logic, and most standard actions (LOAD_DATASET_IDENTIFIERS, MERGE_DATASETS, etc.) load entire datasets into memory.

Key Technologies

  • Language: Python 3.11+
  • Validation: Pydantic 2.x (type-safe parameter models)
  • Data Processing: pandas, numpy (DataFrame operations)
  • Biological Libraries: rdkit, libchebipy, biopython
  • ML/Embeddings: sentence-transformers, chromadb, faiss (semantic matching)
  • Configuration: PyYAML (workflow definitions)
  • Testing: pytest, pytest-asyncio (3-level test strategy: unit/integration/production-like)
  • Code Quality: ruff (linting/formatting), mypy (type checking)
  • Optional API: FastAPI, uvicorn (REST endpoints for strategy execution)

Important Design Decisions

1. YAML-Driven Configuration Over Code

  • Decision: Workflows defined in YAML with parameter substitution (${parameters.key}) rather than Python code or DSL.
  • Rationale: Enables bioinformaticians to modify pipelines without developer involvement; easy version control of workflow evolution; low learning curve for domain experts.
  • Trade-off: Less flexibility than code-defined graphs (e.g., LangGraph); no IDE autocomplete for workflow editing.

2. In-Memory DataFrame Serialization (list[dict] Format)

  • Decision: Store DataFrames as df.to_dict('records') in context rather than native DataFrame objects or file references.
  • Rationale: Balances memory efficiency with compatibility—serialized format works with both dict and Pydantic contexts; avoids file I/O overhead; enables straightforward JSON export for debugging.
  • Trade-off: Per-action serialization/deserialization cost (~10 conversions per 10-step workflow); increased memory from liberal .copy() usage to prevent mutations; all datasets accumulate in RAM.

3. Dual Context System (dict + Pydantic)

  • Decision: Maintain synchronized dict-based and Pydantic-based representations of execution context.
  • Rationale: Backward compatibility during migration from legacy dict-based actions to modern type-safe Pydantic actions; allows gradual refactoring without breaking existing workflows.
  • Trade-off: Synchronization overhead after each step; increased complexity in orchestration logic; technical debt to eventually consolidate on single context type.

4. Self-Registering Actions via Decorator

  • Decision: Actions auto-register to ACTION_REGISTRY via @register_action("NAME") decorator on import.
  • Rationale: Zero-configuration extensibility—adding new actions requires no central registry modification; discovery-based architecture scales to 50+ actions; clear naming convention enforces standards.
  • Trade-off: Actions must be imported to register (solved via __init__.py imports); potential namespace collisions if action names overlap; harder to lazy-load actions.

Integration Points

  • File Systems: Local filesystem for input/output (CSV, TSV, Parquet, JSON); Google Drive API optional via google-api-python-client for cloud storage sync.
  • Biological Databases: UniProt, Ensembl, HMDB, KEGG, PubChem (typically via downloaded reference files or occasional API calls within actions).
  • LLM Providers: OpenAI, Anthropic (for semantic metabolite matching actions); LangFuse for LLM observability.
  • Vector Databases: ChromaDB, Qdrant, FAISS (for embedding-based semantic matching of biological entities).
  • Graph Databases: ArangoDB (optional, via pyarango/python-arango for knowledge graph queries).
  • Optional API Layer: FastAPI-based REST API (/api/v2/strategies/execute) for remote strategy execution from clients.

Known Limitations and Technical Debt

1. Memory Accumulation Without Garbage Collection

  • Issue: All intermediate datasets persist in context["datasets"] for entire workflow execution—no automatic cleanup of datasets no longer needed by downstream steps.
  • Impact: Memory footprint grows linearly with workflow length; tested limit ~200MB across 20 datasets, but constrains workflows with >100K row DataFrames through 10+ stages.
  • Mitigation: Manual file-based passing for large datasets; chunking utilities exist (see Memory Management and Chunking Strategies section) but are optional per-action, not automatically applied based on dataset size; future work: add dataset lifecycle hints to YAML (retain: false).

2. Dual Context Technical Debt

  • Issue: Synchronization between dict and Pydantic contexts adds ~10-15% orchestration overhead; code complexity in _sync_contexts() with 70+ lines of edge case handling.
  • Impact: Harder to reason about context state; potential for sync bugs if new context fields added without updating sync logic; onboarding friction for new developers.
  • Mitigation: MVP actions explicitly marked as dict-compatible; gradual migration to Pydantic-only context; future work: deprecate dict context after all actions migrated.

3. No Checkpoint/Resume for Long-Running Workflows

  • Issue: Workflow failure at step N requires full restart from step 1—no mid-execution state persistence or recovery mechanism.
  • Impact: Expensive LLM-based semantic matching steps must re-execute on retry; multi-hour workflows vulnerable to transient failures (network, API rate limits).
  • Mitigation: Workflows designed as idempotent stages that can be run independently; manual checkpointing via intermediate file outputs; future work: add optional checkpoint system inspired by LangGraph for expensive actions only.

Document Version: 1.0 Last Updated: 2025-11-19 Target Audience: Computer scientists, software architects, technical leads Recommended Pairing: Read CLAUDE.md for implementation details and development workflows.