Separate batching from concurrency during ingestion#252
Merged
gvanrossum merged 8 commits intomicrosoft:mainfrom Apr 26, 2026
Merged
Separate batching from concurrency during ingestion#252gvanrossum merged 8 commits intomicrosoft:mainfrom
gvanrossum merged 8 commits intomicrosoft:mainfrom
Conversation
Contributor
Author
|
|
…ders - IMessage gains an optional `source_id: str | None` field; mirrored on ConversationMessage and EmailMessage. Lets ingestion pipelines carry the external source identifier (email id, file path, URL) on the message itself instead of via parallel arrays. - New ChunkFailures table in the SQLite schema, keyed by (msg_id, chunk_ordinal), recording error_class/error_message/failed_at for chunks whose knowledge extraction failed. - New ChunkFailure dataclass and three IStorageProvider methods: record_chunk_failure, clear_chunk_failure, get_chunk_failures. Implemented for both SQLite and in-memory providers. Groundwork for a streaming, restartable add_messages API that records per-chunk extraction failures without aborting the run.
But only if session_ids is None.
New method on ConversationBase that accepts an AsyncIterable of messages and processes them in commit-per-batch transactions. Designed for million-message ingestion where a single all-or-nothing transaction is impractical. Key behaviors: - Buffers messages into configurable batch_size (default 100) - Skips messages whose source_id is already ingested - Records chunk-level extraction Failures via record_chunk_failure instead of raising (processing continues with remaining chunks) - Raised exceptions (HTTP/timeout/auth) stop the run; the current batch rolls back but previously committed batches survive Also adds _ingest_batch_streaming (single-batch transaction logic) and _add_llm_knowledge_streaming (extraction with failure recording). Tests: 8 new tests in test_add_messages_streaming.py using a ControlledExtractor that can return Failure or raise on specific calls.
- Add chunks_added field to AddMessagesResult (making all its fields default to 0) - Add on_batch_committed callback parameter to add_messages_streaming - Use callback in podcast_ingest.py for per-batch progress reporting
Collaborator
bmerkle
reviewed
Apr 26, 2026
| cursor = self.db.cursor() | ||
| cursor.execute( | ||
| """ | ||
| INSERT OR REPLACE INTO ChunkFailures |
Collaborator
There was a problem hiding this comment.
existing databases possibly fail, because with schema error, because they did not have the table.
Either add a migration step (similar to how INGESTED_SOURCES_SCHEMA may have been added) or document that a fresh DB is required.
| # Filter out already-ingested sources | ||
| filtered: list[TMessage] = [] | ||
| for msg in batch: | ||
| if msg.source_id is not None and await storage.is_source_ingested( |
Collaborator
There was a problem hiding this comment.
is_source_ingested called outside the transaction
In a multi-process scenario two workers could both pass the check and both ingest the same source.
Collaborator
There was a problem hiding this comment.
I don't believe such a scenario is valid anyway, so let's not move the check into the transaction (it will just slow down other tasks that want to write to the db).
gvanrossum
approved these changes
Apr 26, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Batch size is now just something the ingestion tools can use to control transaction size/frequency(*). Concurrency is how many concurrent knowledge extraction tasks to start concurrently.
To maximize throughput if you have paid your provider to allow you to use lots of tokens/minute, use a huge batch size and set concurrency just high enough that you don't overload your provider. However, since a single failure throws away your entire batch, you are advised not to set your batch size too large. Tune based on experiments.
(*) My next plan is to add a new top-level API that you feed with an iterator, where you can separately control concurrency and transaction batch size.
Not sure yet whether I'll make that part of this PR; probably not.Wait for those commits to come soon.@KRRT7