Skip to content

Updated data ingestion to use spanner API#16

Merged
dwnoble merged 12 commits intodatacommonsorg:mainfrom
dwnoble:ingest-data
Mar 7, 2026
Merged

Updated data ingestion to use spanner API#16
dwnoble merged 12 commits intodatacommonsorg:mainfrom
dwnoble:ingest-data

Conversation

@dwnoble
Copy link
Contributor

@dwnoble dwnoble commented Mar 6, 2026

Refactors graph node ingestion to use native Spanner batch mutations for performance, handles Spanner's index key limits with bytes storage pattern, and adds tests for GraphService

  • Spanner Batch Inserts: Uses native spanner.Batch instead of SQLAlchemy for bulk insertions.
  • Byte Limit Handling: Truncates object_value to 4096 bytes to stay within Spanner's 8192-byte index limit. The full payload is preserved in a new object_bytes column.
  • Model Updates: Added object_bytes column, extracted table name constants, and removed FetchedValue().
  • API Routing: Added trailing slash support (/nodes/) to GET/POST endpoints.
  • Bugfixes & Cleanup: Stripped CURIE namespaces before DB insertion. Suppressed noisy telemetry logs and removed old sqlparse hacks. Added google-cloud-spanner to dependencies.
  • Testing: Added test suite for GraphService with mocked SQLAlchemy and Spanner clients.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the data ingestion pipeline by integrating directly with the Google Cloud Spanner API, moving away from SQLAlchemy ORM for graph data writes. This strategic shift is designed to boost the efficiency and scalability of handling large volumes of graph data. Additionally, it introduces improved data modeling for edge values, better handling of identifiers, and a new command-line tool for database table management, streamlining development and operational workflows.

Highlights

  • Spanner API Integration for Data Ingestion: The data ingestion logic within the GraphService has been refactored to directly utilize the Google Cloud Spanner API for inserting nodes and edges. This change moves away from SQLAlchemy ORM for bulk inserts, aiming to improve performance and scalability for graph data operations.
  • New CLI Command for Table Management: A new drop_tables command has been introduced to the api_cli.py utility, allowing users to easily delete the Node and Edge tables from the graph database via the command line, which is useful for development and testing environments.
  • CURIE Namespace Handling: The system now automatically strips CURIE (Compact URI) namespaces from node IDs, predicates, and object IDs before storing them in the database. This ensures cleaner data representation and consistency within the graph database.
  • Enhanced Object Value Storage: The EdgeModel has been updated to support both object_value (string) and object_bytes (binary) fields, with specific handling for Spanner index key length limits. A new helper function, get_edge_val, manages truncation and base64 encoding for values exceeding these limits.
  • API Endpoint Flexibility: The /nodes GET and POST endpoints have been made more flexible to accept requests with or without a trailing slash. The trailing slash version is now hidden from the OpenAPI schema to maintain a cleaner API documentation.
Changelog
  • packages/datacommons-api/datacommons_api/api_cli.py
    • Added imports for configuration and graph service components.
    • Implemented a new drop_tables CLI command to facilitate the deletion of Node and Edge tables from the Spanner database.
  • packages/datacommons-api/datacommons_api/core/config.py
    • Updated an error message to refer to 'Config variable' instead of 'Environment variable' for clarity.
  • packages/datacommons-api/datacommons_api/endpoints/routers/node_router.py
    • Modified the /nodes GET and POST endpoints to support both trailing and non-trailing slash URLs, with the trailing slash version excluded from the OpenAPI schema.
  • packages/datacommons-api/datacommons_api/services/graph_service.py
    • Added necessary imports for Spanner client, base64 encoding, and traceback handling.
    • Adjusted logging levels for OpenTelemetry to suppress verbose output.
    • Introduced strip_namespace utility function to remove CURIE prefixes from identifiers.
    • Updated create_node_model and create_edge_model to apply strip_namespace to relevant fields.
    • Modified create_edge_model to handle object_bytes and object_value more distinctly.
    • Enhanced node_model_to_graph_node to decode object_bytes when present.
    • Added get_edge_val to manage Spanner index key length limits for edge object values, including truncation and base64 encoding.
    • Implemented get_node_models and get_node_model_batches for converting JSON-LD into batched NodeModel instances.
    • Created insert_node_models_batch to perform batched inserts and updates using the Spanner API.
    • Refactored GraphService.__init__ to initialize the Spanner client and database, and silence its logger.
    • Rewrote insert_graph_nodes to leverage the new Spanner API batch insertion logic, including deleting existing edges before inserting new ones.
    • Added a drop_tables method to GraphService for programmatic deletion of graph tables and an associated index.
  • packages/datacommons-api/datacommons_api/services/graph_service_test.py
    • Added a new test file with fixtures and tests for the GraphService's Spanner API integration and drop_tables functionality.
  • packages/datacommons-db/datacommons_db/models/edge.py
    • Defined EDGE_TABLE_NAME and OBJECT_VALUE_MAX_LENGTH constants.
    • Updated __tablename__ to use the EDGE_TABLE_NAME constant.
    • Changed the object_value column type from Text() to String(OBJECT_VALUE_MAX_LENGTH).
    • Added a new object_bytes column of type LargeBinary().
  • packages/datacommons-db/datacommons_db/models/node.py
    • Defined NODE_TABLE_NAME constant.
    • Updated __tablename__ to use the NODE_TABLE_NAME constant.
  • packages/datacommons-db/datacommons_db/models/observation.py
    • Defined OBSERVATION_TABLE_NAME constant.
    • Updated __tablename__ to use the OBSERVATION_TABLE_NAME constant.
  • packages/datacommons-db/pyproject.toml
    • Added google-cloud-spanner to the project dependencies.
  • uv.lock
    • Updated the dependency lock file to include google-cloud-spanner.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the data ingestion process to use the native Google Cloud Spanner API instead of SQLAlchemy for improved performance and batching capabilities, and introduces a new CLI command to drop database tables. However, the new implementation introduces several security and reliability concerns: the ingestion process is vulnerable to Denial of Service (DoS) attacks due to improper handling of non-string property values and incorrect data types when interacting with the Spanner SDK, and a flaw in the batching logic can lead to silent data loss for nodes with a large number of edges. Additionally, a critical issue with an interactive prompt in the service layer could block execution, and there are medium-severity issues related to a bug in data processing, logging practices, and code cleanliness. Addressing these issues is crucial for the robustness and availability of the ingestion pipeline.

if len(val_bytes) > OBJECT_VALUE_MAX_LENGTH:
import base64

return base64.b64encode(val_bytes).decode("utf-8")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The get_edge_val function returns a base64-encoded string (str) for the object_bytes column when the original value exceeds the maximum index key length. However, the object_bytes column is defined as a LargeBinary (Spanner BYTES) in the EdgeModel. The Google Cloud Spanner Python client library expects bytes for BYTES columns. Passing a str will likely cause a TypeError during the batch insertion, leading to a crash and failure of the ingestion process for any data containing long strings.

Suggested change
return base64.b64encode(val_bytes).decode("utf-8")
return base64.b64encode(val_bytes)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, i'm not sure why we'd decode the object_bytes value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into this as a follow up, but it appears to be writing the value properly to spanner 🤷

dwnoble and others added 8 commits March 5, 2026 23:51
…e.py

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
…e.py

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
…e.py

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
@dwnoble dwnoble marked this pull request as ready for review March 6, 2026 07:59
@dwnoble dwnoble requested a review from clincoln8 March 6, 2026 07:59
Copy link
Contributor

@clincoln8 clincoln8 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!!

if len(val_bytes) > OBJECT_VALUE_MAX_LENGTH:
import base64

return base64.b64encode(val_bytes).decode("utf-8")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, i'm not sure why we'd decode the object_bytes value

# Remove all CURIE namespaces before storing the node id
subject_id = strip_namespace(graph_node.id)
types = [strip_namespace(t) for t in types]
return NodeModel(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long term when we revisit the code: it might be helpful to be more explicit for what this NodeModel is, maybe DbNodeModel? to differentiate from all of the other ways "Node" is used in datacommons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

val_truncated = val_bytes[:OBJECT_VALUE_MAX_LENGTH].decode(
"utf-8", errors="ignore"
)
return val_truncated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to worry about hashing at all for this value? if we're just truncating, is there possibility for index key collision?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call- added a todo to look into this. Maybe we can truncate the value and append it to a hash of the entire byte array.

object_value = sa.Column(Text(), nullable=True)
object_value = sa.Column(String(OBJECT_VALUE_MAX_LENGTH), nullable=True)
object_bytes = sa.Column(sa.LargeBinary(), nullable=True)
object_hash = sa.Column(String(64), primary_key=True, nullable=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the object_hash used for again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure- @n-h-diaz do you know what this field is for in the spanner db?

# Insert nodes and edges in batches
success_count = 0
try:
for node_model_batch in node_model_batches:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if nodeA points to nodeB which is in a later batch and not in the graph yet. I think this might cause an error trying to write nodeA in the first batch because the edge pointing to nodeB would say that nodeB wasn't found in the node table yet?

If that seems plausible, we can mark as a todo to handle this case. Or ignore if that's not actually an issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call- marking this as a todo for now.

this could apply to both nodes that are in later batches, and also nodes that are in remote knowledge graphs and not defined locally

@dwnoble dwnoble added this pull request to the merge queue Mar 7, 2026
Merged via the queue into datacommonsorg:main with commit 7b974bd Mar 7, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants