Skip to content

Use streaming in ingest_email.py#265

Merged
bmerkle merged 13 commits intomicrosoft:mainfrom
gvanrossum:streaming-ingest
May 1, 2026
Merged

Use streaming in ingest_email.py#265
bmerkle merged 13 commits intomicrosoft:mainfrom
gvanrossum:streaming-ingest

Conversation

@gvanrossum-ms
Copy link
Copy Markdown
Contributor

We now count chunks to determine batch size. Reporting is much improved.

@gvanrossum-ms
Copy link
Copy Markdown
Contributor Author

@KRRT7 What did I miss?
@bmerkle Approval?

@KRRT7
Copy link
Copy Markdown
Contributor

KRRT7 commented May 1, 2026

Here's what I caught (confirmed via coverage.py on all 32 streaming tests — all pass):

  1. Breaking source_id format changestr(email_file) changed to str(email_file.resolve()). Existing databases ingested with relative paths won't recognize re-ingestion attempts with absolute paths, causing all previously ingested emails to be duplicated. Needs to be documented prominently and possibly a migration path.

  2. SQLite IN (...) variable limitare_sources_ingested() passes all source_ids as bind params. With small emails and large batch_size, this could exceed SQLite's SQLITE_MAX_VARIABLE_NUMBER (999 on older versions). Consider chunking the query.

  3. Lost AuthenticationError handling — The old code had a clean sys.exit() for wrong API keys (confirmed: except openai.AuthenticationError was removed in the diff). Now it surfaces as an ExceptionGroup, which is a UX regression.

  4. Lost failed-source DB tracking — Old code called mark_source_ingested(source_id, exc_name) on failures so re-runs skip broken files (confirmed: removed in the diff). New code only counts failures in memory, meaning broken files get re-attempted every run.

  5. Double dedup check_email_generator calls is_source_ingested() per email, then _filter_ingested() checks again per batch. The first avoids parsing already-ingested files, the second is transactional dedup — both paths exercised by coverage. Worth a comment explaining why both exist.

  6. set_on_retry_hooks([_on_retry]) at import time — Global side effect that replaces any previously set hooks. Coverage confirms _on_retry (lines 85-88) is never exercised by any test — entirely untested code path.

  7. Fragile messages_skipped mutation_drain_commit() overwrites result.messages_skipped with = instead of +=. If _commit_batch_streaming ever sets messages_skipped itself, it gets silently clobbered. Also, coverage confirms the all-skipped-after-pending-commit branch (lines 294-296 of conversation_base.py) is untested.

Posted a formal review with ````suggestion` blocks for each item.

Copy link
Copy Markdown
Contributor

@KRRT7 KRRT7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran coverage.py against the streaming tests (32/32 pass). Coverage on the key files: conversation_base.py 78%, sqlite/provider.py 57%, model_adapters.py 46%. Below are findings with suggested fixes — most can be applied directly.

Comment thread tools/ingest_email.py
Comment thread tools/ingest_email.py
Comment thread src/typeagent/storage/sqlite/provider.py Outdated
Comment thread src/typeagent/aitools/model_adapters.py
Comment thread src/typeagent/knowpro/conversation_base.py Outdated
Comment thread src/typeagent/knowpro/conversation_base.py
gvanrossum and others added 3 commits April 30, 2026 21:04
Co-authored-by: Kevin Turcios <106575910+KRRT7@users.noreply.github.com>
Co-authored-by: Kevin Turcios <106575910+KRRT7@users.noreply.github.com>
@gvanrossum
Copy link
Copy Markdown
Collaborator

@KRRT7 -- I've addressed all your feedback except for the backward compatibility (it's too early for that).

@gvanrossum
Copy link
Copy Markdown
Collaborator

PS. Please don't use "Request changes". Leaving comments has the same effect and I don't get the nagging message in the PR.

Since stamina installs a default hook that just prints 'stamina.retry_scheduled',
the condition '-if not get_on_retry_hooks():' was always false.
@KRRT7
Copy link
Copy Markdown
Contributor

KRRT7 commented May 1, 2026

PS. Please don't use "Request changes". Leaving comments has the same effect and I don't get the nagging message in the PR.

sounds good, do you prefer the flow where I make the exact code suggestions and you can click "Apply suggestions" instead?

@KRRT7
Copy link
Copy Markdown
Contributor

KRRT7 commented May 1, 2026

@KRRT7 -- I've addressed all your feedback except for the backward compatibility (it's too early for that).

sounds good to me, just wasn't sure of your policy around that.

Copy link
Copy Markdown
Contributor

@KRRT7 KRRT7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-reviewed the updated commits (551d407..ec4085d). All previous suggestions addressed — chunked IN clause, += for messages_skipped, dedup comment, all-skipped test all look good.

Ran full suite locally: make format (no changes), make check (0 errors/warnings), make test (697 passed). Four small follow-ups below, all click-to-apply.

async def _submit_batch(filtered: list[TMessage], skipped: int) -> None:
nonlocal pending_commit, pending_skipped
if not filtered and not skipped:
return
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead guard. All three call sites guarantee batch is non-empty before calling _filter_ingested, so len(filtered) + skipped == len(batch) > 0. This branch can never fire. The if not filtered: check at line 290 already handles the all-skipped case.

Suggested change
return
if filtered and should_extract:

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KRRT7 I am confused by your suggestions. This one, when applied, generates invalid syntax. Maybe you are using a tool to generate them and it's got an off-by-one error or is working from an outdated file version? It's the case for all suggestions in this batch.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. If we ever revisit this code we can delete it. Bernhard merged it before I got a chance to look at your feedback, in part confused by the misaligned suggestions. It's harmless so I see no need to fix it in a separate PR.

Comment thread tools/ingest_email.py
def on_batch_committed(result: AddMessagesResult) -> None:
nonlocal last_batch_time
counters["ingested"] += result.messages_added
counters["batches"] += 1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noisy output for all-skipped batches. When _submit_batch handles an all-skipped batch, this prints +0 messages, +0 chunks, +0 semrefs | 0.0s (0.00s/chunk). Early return keeps the batch counter and timing accurate for real batches.

Suggested change
counters["batches"] += 1
counters["ingested"] += result.messages_added
if not result.messages_added:
return
counters["batches"] += 1

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, that behavior sounds reasonable to me, given that this should be extremely rare. I think to trigger it you would have to run two instances of import_email in parallel with the same input files, or pass the same file multiple times but in different batches. Neither of which you should do. And if you do the latter, the batch lookup of source IDs might still bite you if the file occurs twice in the same batch.

Comment thread tools/ingest_email.py
print(f"Failed to import {failed_count} email(s)")
print(f"Successfully ingested {result.messages_added} email(s)")
if counters["skipped"]:
print(f"Skipped {counters['skipped']} already-ingested email(s)")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skip accounting gap. counters["skipped"] counts generator-level pre-parse dedup, but result.messages_skipped (batch-level transactional dedup from _filter_ingested) is never displayed. The two populations are disjoint — a source caught by the generator is never yielded to the batch layer — so summing is safe.

Suggested change
print(f"Skipped {counters['skipped']} already-ingested email(s)")
if counters["skipped"] + result.messages_skipped:
print(f"Skipped {counters['skipped'] + result.messages_skipped} already-ingested email(s)")

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weirdly the line numbers in the file listing don't match those in the suggestion (506 --> 528).

You may be right about the accounting, it's hard to follow for me TBH.

Comment thread tools/ingest_email.py
print(f"Skipped: {skipped_count} (already ingested)")
if failed_count:
print(f"Failed: {failed_count}")
if counters["skipped"]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same skip accounting fix for non-verbose output, plus capitalize "Ingested" to match the verbose branch.

Suggested change
if counters["skipped"]:
f"Ingested {result.messages_added} emails to {database} "
f"({semref_count} refs, {elapsed:.1f}s)"
)
if counters["skipped"] + result.messages_skipped:
print(f"Skipped: {counters['skipped'] + result.messages_skipped} (already ingested)")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: the Capitalization, I know it's a minor detail but I care about them.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do too and it was already fixed by some last-minute improvement to the output.

@KRRT7
Copy link
Copy Markdown
Contributor

KRRT7 commented May 1, 2026

sounds good, do you prefer the flow where I make the exact code suggestions and you can click "Apply suggestions" instead?

demoing it here.

@gvanrossum
Copy link
Copy Markdown
Collaborator

@KRRT7 -- have another look (mostly spit and polish)

@KRRT7
Copy link
Copy Markdown
Contributor

KRRT7 commented May 1, 2026

Reviewed the 3 new commits (5bc5764, 016e6a3, 875b7ff). Chunks/semrefs counters and ^C handling look good. semrefs_added over semref_coll.size() is the right fix.

A few things still open from the last round:

Dead guard in _submit_batch (conversation_base.py). if not filtered and not skipped: return can never fire — all call sites guarantee len(batch) > 0 before _filter_ingested, so filtered + skipped > 0 always. The if not filtered: block at line ~290 already handles the all-skipped case.

All-skipped batch output. on_batch_committed still prints +0 messages, +0 chunks, +0 semrefs when a batch is entirely deduped. Early return on result.messages_added == 0 would keep the batch counter and timing accurate for real batches only.

Skip accounting gap. counters["skipped"] only counts generator-level pre-parse dedup. result.messages_skipped (batch-level transactional dedup from _filter_ingested) is never surfaced in the final summary. These are disjoint populations — a source caught by the generator never reaches the batch layer. The total skipped is counters["skipped"] + total.messages_skipped. This is also a problem on ^C where result is None and batch-level skips are lost entirely.

next_extraction task leak. In _submit_batch, if an exception occurs after next_extraction = asyncio.create_task(...) but before it's awaited, the task leaks. The except BaseException block only cancels pending_commit, not next_extraction.

@gvanrossum
Copy link
Copy Markdown
Collaborator

sounds good, do you prefer the flow where I make the exact code suggestions and you can click "Apply suggestions" instead?

Yes. Though in this case I found some issues with your suggestions -- it's still better, and you get credit.

@bmerkle
Copy link
Copy Markdown
Collaborator

bmerkle commented May 1, 2026

Just ping me when you are finshed...so i can merge

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the email ingestion pipeline to use the streaming ingestion API and to size commit batches by text chunks (rather than message count), with improved progress reporting and additional streaming/batching test coverage.

Changes:

  • Update tools/ingest_email.py to stream messages into the DB with chunk-based batching, optional concurrency override, and richer batch progress reporting.
  • Extend the storage provider API with batch source-id dedup (are_sources_ingested) and use it from streaming ingestion.
  • Expand streaming ingestion tests to cover multi-chunk messages, chunk-based batching behavior, and skipped/duplicate reporting.

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
tools/ingest_email.py Switches email ingest to streaming, adds CLI controls for concurrency/batching, adds pre-parse dedup and chunk truncation, improves progress + summary output.
tests/test_add_messages_streaming.py Adds extensive tests for multi-chunk extraction and chunk-based batching + skipped reporting.
src/typeagent/storage/sqlite/provider.py Implements are_sources_ingested() for efficient batch dedup lookups in SQLite.
src/typeagent/storage/memory/provider.py Implements are_sources_ingested() for in-memory provider.
src/typeagent/knowpro/interfaces_storage.py Adds are_sources_ingested() to the storage provider interface.
src/typeagent/knowpro/interfaces_core.py Extends AddMessagesResult with messages_skipped.
src/typeagent/knowpro/conversation_base.py Changes streaming batching semantics to chunk-based; adds skipped accounting and uses batch dedup API.
src/typeagent/aitools/model_adapters.py Adds global stamina retry logging hook and adjusts embed retrier defaults.

Comment thread tools/ingest_email.py
Comment thread tools/ingest_email.py
Comment thread tools/ingest_email.py
Comment thread src/typeagent/knowpro/conversation_base.py
Comment thread src/typeagent/aitools/model_adapters.py
Comment thread tools/ingest_email.py
Copy link
Copy Markdown
Collaborator

@bmerkle bmerkle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approved

@bmerkle bmerkle merged commit b40c125 into microsoft:main May 1, 2026
20 checks passed
@gvanrossum
Copy link
Copy Markdown
Collaborator

Hm, I had wanted to respond to some of Copilot's and @KRRT7's review comments. But I'm happy that at least this much is now on main, we can iterate further in follow-up PRs.

@gvanrossum gvanrossum deleted the streaming-ingest branch May 2, 2026 00:13
@gvanrossum
Copy link
Copy Markdown
Collaborator

I already answered the first two issues. (Sadly your suggestions demo misfired.)

Skip accounting gap. counters["skipped"] only counts generator-level pre-parse dedup. result.messages_skipped (batch-level transactional dedup from _filter_ingested) is never surfaced in the final summary. These are disjoint populations — a source caught by the generator never reaches the batch layer. The total skipped is counters["skipped"] + total.messages_skipped. This is also a problem on ^C where result is None and batch-level skips are lost entirely.

Can you talk me through this in more detail? I'm lost about what messages_skipped and skipped stand for at this point.

next_extraction task leak. In _submit_batch, if an exception occurs after next_extraction = asyncio.create_task(...) but before it's awaited, the task leaks. The except BaseException block only cancels pending_commit, not next_extraction.

This one's new. I think it just requires the same treatment in the except block?

@gvanrossum
Copy link
Copy Markdown
Collaborator

Copilot didn't find anything new.

@KRRT7
Copy link
Copy Markdown
Contributor

KRRT7 commented May 3, 2026

Hey Guido — quick note on why those click-to-apply suggestions generated invalid syntax.

The suggestions in my second review were generated against commit ec4085d. You then pushed 3 more commits (5bc5764, 016e6a3, 875b7ff) that added/removed ~45 lines in ingest_email.py, shifting line numbers by ~22 lines. GitHub anchors suggestion blocks to specific diff positions at review submission time — they don't rebase when new commits land. So by the time you clicked "Apply", the anchors were stale and pointed to wrong lines.

The suggestion content itself was correct for ec4085d. This is a known GitHub limitation.

@KRRT7
Copy link
Copy Markdown
Contributor

KRRT7 commented May 3, 2026

Can you talk me through this in more detail? I'm lost about what messages_skipped and skipped stand for at this point.

There are two independent dedup layers, each with its own skip counter:

Layer 1 — Generator-level (counters["skipped"]). In _email_generator(), each file's source_id is checked against the DB via storage.is_source_ingested(source_id) before parsing. If already ingested, counters["skipped"] += 1 and the message is never yielded. This catches sources ingested in prior runs (or by earlier batches that already committed in the same run).

Layer 2 — Batch-level (result.messages_skipped). In add_messages_streaming(), when a batch is ready, _filter_ingested() does a second bulk dedup check via storage.are_sources_ingested(). This catches a race: if source_id X passed Layer 1 (wasn't ingested yet), then an earlier batch commits and marks X as ingested before this batch reaches _filter_ingested(), Layer 2 catches it. These skips are tracked as messages_skipped in AddMessagesResult and accumulated in total.messages_skipped.

The gap: The final summary in ingest_email.py only reported counters["skipped"] (Layer 1). Layer 2 skips were never surfaced:

  • on_batch_committed() tracked messages_added, chunks_added, semrefs_added — but not messages_skipped
  • The summary's "Skipped" line only showed counters['skipped']
  • The true total skipped is counters["skipped"] + result.messages_skipped

On ^C it's worse: result is None (the await email_memory.add_messages_streaming(...) never returns), so result.messages_skipped is completely lost.

The fix adds a batch_skipped counter to on_batch_committed and reports the sum in the summary.

This one's new. I think it just requires the same treatment in the except block?

Yes. The fix promotes next_extraction to a nonlocal (pending_extraction) and cancels it in the except BaseException block alongside pending_commit. The lifecycle:

  1. pending_extraction is set when _submit_batch creates the extraction task
  2. Cleared after await next_extraction consumes the result
  3. If _drain_commit() raises between steps 1 and 2, pending_extraction is still in-flight → the except block cancels it

Both fixes are in #266.

gvanrossum pushed a commit that referenced this pull request May 3, 2026
## Summary

Follow-up to #265. Fixes two issues identified during review:

- **Skip accounting gap.** `ingest_email.py` only reported
generator-level skips (`counters["skipped"]`). Batch-level skips from
`_filter_ingested` (`result.messages_skipped`) were never surfaced in
the final summary. The two populations are disjoint — a source caught by
the generator never reaches the batch layer. The summary now reports
`total_skipped = counters["skipped"] + counters["batch_skipped"]`.
Survives ^C since `on_batch_committed` fires per committed batch.

- **`next_extraction` task leak.** In `_submit_batch`, if
`_drain_commit()` raises after `next_extraction =
asyncio.create_task(...)` but before it's awaited, the task leaked.
Promoted `next_extraction` to a `nonlocal` (`pending_extraction`) so the
`except BaseException` block can cancel it alongside `pending_commit`.

## Changes

- `tools/ingest_email.py`: Add `batch_skipped` counter, track
`result.messages_skipped` in `on_batch_committed`, report combined total
in summary
- `src/typeagent/knowpro/conversation_base.py`: Track
`pending_extraction`, cancel in except block
- `tests/test_add_messages_streaming.py`: 4 new tests covering both
cancellation paths and edge cases

## Test plan

- [x] `make format check test` passes (701 tests)
- [x] Coverage for `conversation_base.py`: 94% → 96%
- [x] New test: `pending_extraction` cancelled when prior commit raises
during `_drain_commit`
- [x] New test: `pending_commit` cancelled when message iterator raises
- [x] New test: empty iterator returns zeros
- [x] New test: messages with empty `text_chunks` skip extraction
entirely
@gvanrossum
Copy link
Copy Markdown
Collaborator

Thanks! And merged. I never encountered that GitHub issue before but it makes sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants