diff --git a/CODEOWNERS b/CODEOWNERS new file mode 100644 index 00000000..bcff3975 --- /dev/null +++ b/CODEOWNERS @@ -0,0 +1 @@ +* @gvanrossum @gvanrossum-ms @umeshma @robgruen diff --git a/src/typeagent/emails/email_import.py b/src/typeagent/emails/email_import.py index 88f13b94..f0ac7a9a 100644 --- a/src/typeagent/emails/email_import.py +++ b/src/typeagent/emails/email_import.py @@ -84,7 +84,6 @@ def import_forwarded_email_string( # Imports an email.message.Message object and returns an EmailMessage object -# If the message is a reply, returns only the latest response. def import_email_message(msg: Message, max_chunk_length: int) -> EmailMessage: # Extract metadata from headers. # msg.get() can return a Header object instead of str for encoded headers, @@ -102,20 +101,32 @@ def import_email_message(msg: Message, max_chunk_length: int) -> EmailMessage: if timestamp_date is not None: timestamp = parsedate_to_datetime(timestamp_date).isoformat() - # Get email body. - # If the email was a reply, then ensure we only pick up the latest response + # Get email body and parse into chunks with source attribution body = _extract_email_body(msg) if body is None: body = "" - elif is_reply(msg): - body = get_last_response_in_thread(body) + # Prepend subject to body if available if email_meta.subject is not None: body = decode_encoded_words(email_meta.subject) + "\n\n" + body - body_chunks = _text_to_chunks(body, max_chunk_length) + # Parse into chunks with source attribution (handles inline replies) + parsed_chunks = parse_email_chunks(body) + + # Apply max_chunk_length splitting while preserving source attribution + text_chunks: list[str] = [] + chunk_sources: list[str | None] = [] + for text, source in parsed_chunks: + sub_chunks = _text_to_chunks(text, max_chunk_length) + for sub_chunk in sub_chunks: + text_chunks.append(sub_chunk) + chunk_sources.append(source) + email: EmailMessage = EmailMessage( - metadata=email_meta, text_chunks=body_chunks, timestamp=timestamp + metadata=email_meta, + text_chunks=text_chunks, + chunk_sources=chunk_sources, + timestamp=timestamp, ) return email @@ -157,21 +168,177 @@ def get_forwarded_email_parts(email_text: str) -> list[str]: # Precompiled regex for trailing line delimiters (underscores, dashes, equals, spaces) _TRAILING_LINE_DELIMITERS = re.compile(r"[\r\n][_\-= ]+\s*$") +# Pattern to detect "On wrote:" header for inline replies +# Uses alternation to handle different date formats: +# 1. "On Mon, Dec 10, 2020 at 10:30 AM John Doe wrote:" (AM/PM format) +# 2. "On Mon, Dec 10, 2020 Someone wrote:" (year followed by name) +# 3. Fallback: last words before "wrote:" +# Groups 1, 2, or 3 will capture the person's name depending on format +_INLINE_REPLY_HEADER = re.compile( + r"^on\s+(?:.+[AP]M\s+(.+?)|.+,\s*\d{4}\s+(.+?)|.+\s+(.+?))\s+wrote:\s*$", + re.IGNORECASE | re.MULTILINE, +) + +# Pattern to match quoted lines (starting with > possibly with leading whitespace) +_QUOTED_LINE = re.compile(r"^\s*>") + +# Pattern to detect email signature markers +_SIGNATURE_MARKER = re.compile(r"^--\s*$", re.MULTILINE) + + +# Type alias for chunk with source info +ChunkWithSource = tuple[str, str | None] # (text, source: None=original, str=quoted) -# Simple way to get the last response on an email thread in MIME format -def get_last_response_in_thread(email_text: str) -> str: + +def is_inline_reply(email_text: str) -> bool: + """ + Detect if an email contains inline replies (responses interspersed with quotes). + + An inline reply has: + 1. An "On ... wrote:" header + 2. Quoted lines (starting with >) interspersed with non-quoted response lines + """ if not email_text: - return "" + return False - match = _THREAD_DELIMITERS.search(email_text) - if match: - email_text = email_text[: match.start()] + # Must have the "On ... wrote:" header + header_match = _INLINE_REPLY_HEADER.search(email_text) + if not header_match: + return False - email_text = email_text.strip() - # Remove trailing line delimiters (e.g. underscores, dashes, equals) - _TRAILING_LINE_DELIMITER_REGEX = _TRAILING_LINE_DELIMITERS - email_text = _TRAILING_LINE_DELIMITER_REGEX.sub("", email_text) - return email_text + # Check content after the header for mixed quoted/non-quoted lines + content_after_header = email_text[header_match.end() :] + lines = content_after_header.split("\n") + + has_quoted = False + has_non_quoted_after_quoted = False + + for line in lines: + # Check for signature marker + if _SIGNATURE_MARKER.match(line): + break + + stripped = line.strip() + if not stripped: + continue + + if _QUOTED_LINE.match(line): + has_quoted = True + elif has_quoted: + # Non-quoted line after we've seen quoted lines = inline reply + has_non_quoted_after_quoted = True + break + + return has_quoted and has_non_quoted_after_quoted + + +def parse_email_chunks(email_text: str) -> list[ChunkWithSource]: + """ + Parse email text into chunks with source attribution. + + Returns a list of (text, source) tuples where: + - source is None for original (unquoted) content + - source is the quoted person's name for quoted content, or " " if unknown + + This handles inline replies where the sender responds inline to quoted text, + preserving both the quoted and unquoted portions as separate chunks. + """ + if not email_text: + return [] + + # Find the "On ... wrote:" header + header_match = _INLINE_REPLY_HEADER.search(email_text) + if not header_match: + # No inline reply pattern, return as a single original chunk + text = _strip_trailing_delimiters(email_text) + if text: + return [(text, None)] + return [] + + # Extract quoted person from header (first non-None group from groups 1, 2, or 3) + quoted_person = ( + header_match.group(1) or header_match.group(2) or header_match.group(3) or " " + ) + quoted_person = quoted_person.strip() if quoted_person else " " + if not quoted_person: + quoted_person = " " + + # Get preamble (content before the "On ... wrote:" header) + preamble = email_text[: header_match.start()].strip() + + # Process content after header + content_after_header = email_text[header_match.end() :] + lines = content_after_header.split("\n") + + result: list[ChunkWithSource] = [] + if preamble: + result.append((preamble, None)) + + current_reply_lines: list[str] = [] + current_quoted_lines: list[str] = [] + in_signature = False + + def flush_reply() -> None: + nonlocal current_reply_lines + if current_reply_lines: + text = "\n".join(current_reply_lines).strip() + if text: + result.append((text, None)) + current_reply_lines = [] + + def flush_quoted() -> None: + nonlocal current_quoted_lines + if current_quoted_lines: + text = "\n".join(current_quoted_lines).strip() + if text: + result.append((text, quoted_person)) + current_quoted_lines = [] + + for line in lines: + # Check for signature marker + if _SIGNATURE_MARKER.match(line): + in_signature = True + # Flush any pending content + flush_quoted() + flush_reply() + continue + + if in_signature: + # Skip signature content + continue + + if _QUOTED_LINE.match(line): + # This is a quoted line - flush any pending reply first + if current_reply_lines: + flush_reply() + # Strip the leading > and any space after it + unquoted = re.sub(r"^\s*>\s?", "", line) + current_quoted_lines.append(unquoted) + else: + # Non-quoted line - flush any pending quoted first + if current_quoted_lines: + flush_quoted() + # Only accumulate non-empty lines or preserve blank lines within a block + stripped = line.strip() + if stripped or current_reply_lines: + current_reply_lines.append(line.rstrip()) + + # Flush any remaining content + flush_quoted() + flush_reply() + + # Strip trailing delimiters from the last chunk + if result: + last_text, last_source = result[-1] + result[-1] = (_strip_trailing_delimiters(last_text), last_source) + + return result + + +def _strip_trailing_delimiters(text: str) -> str: + """Remove trailing line delimiters (underscores, dashes, equals, spaces).""" + text = text.strip() + return _TRAILING_LINE_DELIMITERS.sub("", text) # Extracts the plain text body from an email.message.Message object. diff --git a/src/typeagent/emails/email_message.py b/src/typeagent/emails/email_message.py index 47abdbec..92469ea1 100644 --- a/src/typeagent/emails/email_message.py +++ b/src/typeagent/emails/email_message.py @@ -153,6 +153,12 @@ def __init__(self, **data: Any) -> None: super().__init__(**data) text_chunks: list[str] = CamelCaseField("The text chunks of the email message") + # For each chunk: None means original content, str means quoted. + # If quoted, the string is the name of the person being quoted, or " " if unknown. + chunk_sources: list[str | None] = CamelCaseField( + "Source attribution for each chunk: None=original, str=quoted person or ' '", + default_factory=list, + ) metadata: EmailMessageMeta = CamelCaseField( "Metadata associated with the email message" ) diff --git a/src/typeagent/knowpro/conversation_base.py b/src/typeagent/knowpro/conversation_base.py index bda1a797..337aa2e2 100644 --- a/src/typeagent/knowpro/conversation_base.py +++ b/src/typeagent/knowpro/conversation_base.py @@ -4,8 +4,8 @@ """Base class for conversations with incremental indexing support.""" import asyncio -import contextlib from collections.abc import AsyncIterable, Callable, Sequence +import contextlib from dataclasses import dataclass from datetime import datetime, timezone from typing import Generic, Self, TypeVar @@ -282,9 +282,7 @@ async def _submit_batch(filtered: list[TMessage]) -> None: await _drain_commit() # Await extraction result for this batch - extraction = ( - await next_extraction if next_extraction is not None else None - ) + extraction = await next_extraction if next_extraction is not None else None # Start commit (DB transaction) — runs concurrently with the # *next* batch's LLM extraction once we yield back to the loop. diff --git a/src/typeagent/storage/memory/semrefindex.py b/src/typeagent/storage/memory/semrefindex.py index 84892171..76feb2d1 100644 --- a/src/typeagent/storage/memory/semrefindex.py +++ b/src/typeagent/storage/memory/semrefindex.py @@ -76,9 +76,7 @@ async def add_batch_to_semantic_ref_index[ (tl.message_ordinal, tl.chunk_ordinal, knowledge_result.value) ) if bulk_items: - await add_knowledge_batch_to_semantic_ref_index( - conversation, bulk_items - ) + await add_knowledge_batch_to_semantic_ref_index(conversation, bulk_items) async def add_batch_to_semantic_ref_index_from_list[ @@ -103,9 +101,7 @@ async def add_batch_to_semantic_ref_index_from_list[ f"Message ordinal {tl.message_ordinal} out of range " f"for list starting at {start_ordinal}" ) - text_batch.append( - messages[list_index].text_chunks[tl.chunk_ordinal].strip() - ) + text_batch.append(messages[list_index].text_chunks[tl.chunk_ordinal].strip()) knowledge_results = await extract_knowledge_from_text_batch( knowledge_extractor, @@ -123,9 +119,7 @@ async def add_batch_to_semantic_ref_index_from_list[ (tl.message_ordinal, tl.chunk_ordinal, knowledge_result.value) ) if bulk_items: - await add_knowledge_batch_to_semantic_ref_index( - conversation, bulk_items - ) + await add_knowledge_batch_to_semantic_ref_index(conversation, bulk_items) async def add_term_to_index( @@ -360,11 +354,13 @@ def _collect_knowledge_refs_and_terms( for entity in knowledge.entities: if not validate_entity(entity): continue - refs.append(SemanticRef( - semantic_ref_ordinal=ordinal, - range=text_range, - knowledge=entity, - )) + refs.append( + SemanticRef( + semantic_ref_ordinal=ordinal, + range=text_range, + knowledge=entity, + ) + ) terms.append((entity.name, ordinal)) for type_name in entity.type: terms.append((type_name, ordinal)) @@ -377,11 +373,13 @@ def _collect_knowledge_refs_and_terms( ordinal += 1 for action in list(knowledge.actions) + list(knowledge.inverse_actions): - refs.append(SemanticRef( - semantic_ref_ordinal=ordinal, - range=text_range, - knowledge=action, - )) + refs.append( + SemanticRef( + semantic_ref_ordinal=ordinal, + range=text_range, + knowledge=action, + ) + ) terms.append((" ".join(action.verbs), ordinal)) if action.subject_entity_name != "none": terms.append((action.subject_entity_name, ordinal)) @@ -404,11 +402,13 @@ def _collect_knowledge_refs_and_terms( ordinal += 1 for topic_text in knowledge.topics: - refs.append(SemanticRef( - semantic_ref_ordinal=ordinal, - range=text_range, - knowledge=Topic(text=topic_text), - )) + refs.append( + SemanticRef( + semantic_ref_ordinal=ordinal, + range=text_range, + knowledge=Topic(text=topic_text), + ) + ) terms.append((topic_text, ordinal)) ordinal += 1 @@ -431,7 +431,10 @@ async def add_knowledge_to_semantic_ref_index( base_ordinal = await semantic_refs.size() refs, terms = _collect_knowledge_refs_and_terms( - base_ordinal, message_ordinal, chunk_ordinal, knowledge, + base_ordinal, + message_ordinal, + chunk_ordinal, + knowledge, ) if refs: @@ -460,7 +463,10 @@ async def add_knowledge_batch_to_semantic_ref_index( for msg_ord, chunk_ord, knowledge in items: refs, terms = _collect_knowledge_refs_and_terms( - base_ordinal + len(all_refs), msg_ord, chunk_ord, knowledge, + base_ordinal + len(all_refs), + msg_ord, + chunk_ord, + knowledge, ) all_refs.extend(refs) all_terms.extend(terms) diff --git a/tests/test_add_messages_streaming.py b/tests/test_add_messages_streaming.py index dc3f55b4..5707f71b 100644 --- a/tests/test_add_messages_streaming.py +++ b/tests/test_add_messages_streaming.py @@ -384,9 +384,7 @@ async def test_streaming_exception_in_later_batch_preserves_earlier() -> None: msgs = [_make_message(f"msg-{i}", source_id=f"s-{i}") for i in range(6)] with pytest.raises(ExceptionGroup) as exc_info: - await transcript.add_messages_streaming( - _async_iter(msgs), batch_size=3 - ) + await transcript.add_messages_streaming(_async_iter(msgs), batch_size=3) assert any( isinstance(e, RuntimeError) and "Systemic failure" in str(e) diff --git a/tests/test_email_import.py b/tests/test_email_import.py index 371136bc..6b0af98f 100644 --- a/tests/test_email_import.py +++ b/tests/test_email_import.py @@ -100,3 +100,187 @@ def test_no_leading_separator_in_any_chunk(self) -> None: assert not chunk.startswith( "\n\n" ), f"chunk {chunk!r} has leading separator" + + +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from typeagent.emails.email_import import ( + is_inline_reply, + parse_email_chunks, +) + + +class TestIsInlineReply: + def test_empty_text(self) -> None: + assert is_inline_reply("") is False + + def test_no_header(self) -> None: + text = "Just a regular email with no quoted content." + assert is_inline_reply(text) is False + + def test_top_posted_reply(self) -> None: + # This has "On ... wrote:" but all quotes are at the bottom, no interleaving + text = """\ +Thanks for the info! + +On Mon, Dec 10, 2020 at 10:30 AM Someone wrote: + +> Here is some quoted text. +> More quoted text. +> Even more. +""" + assert is_inline_reply(text) is False + + def test_inline_reply(self) -> None: + text = """\ +I've given my replies in line with the quoted text. + +On Mon, Dec 10, 2020 at 10:30 AM Someone wrote: +> Quoted blah. + +That is clearly BS. + +> Quoted blah blah. + +Here I must agree. + +> More quoted text. + +-- +Guido van Rossum +""" + assert is_inline_reply(text) is True + + def test_inline_reply_no_preamble(self) -> None: + text = """\ +On Mon, Dec 10, 2020 at 10:30 AM Someone wrote: +> First quote. + +My first response. + +> Second quote. + +My second response. +""" + assert is_inline_reply(text) is True + + +class TestParseEmailChunks: + def test_empty_text(self) -> None: + assert parse_email_chunks("") == [] + + def test_no_inline_pattern(self) -> None: + text = "Just a regular email." + result = parse_email_chunks(text) + assert len(result) == 1 + assert result[0] == ("Just a regular email.", None) + + def test_basic_inline_reply(self) -> None: + text = """\ +I've given my replies in line with the quoted text. + +On Mon, Dec 10, 2020 at 10:30 AM Someone wrote: +> Quoted blah. + +That is clearly BS. + +> Quoted blah blah. + +Here I must agree. + +> More quoted text. + +-- +Guido van Rossum +""" + result = parse_email_chunks(text) + # Should have: preamble (original), quoted, reply, quoted, reply, quoted + texts = [chunk[0] for chunk in result] + # sources = [chunk[1] for chunk in result] + + # Check we have all the content + assert any("I've given my replies" in t for t in texts) + assert any("That is clearly BS" in t for t in texts) + assert any("Here I must agree" in t for t in texts) + assert any("Quoted blah" in t for t in texts) + + # Original content should have None source + for text, source in result: + if "I've given my replies" in text or "That is clearly BS" in text: + assert source is None + + # Quoted content should have the person's name + for text, source in result: + if "Quoted blah" in text: + assert source == "Someone" + + # Signature should NOT be included + assert not any("Guido van Rossum" in t for t in texts) + + def test_extracts_quoted_person_name(self) -> None: + text = """\ +On Mon, Dec 10, 2020 at 10:30 AM John Doe wrote: +> Is Python good? + +Yes, absolutely! + +> What about JavaScript? + +It has its uses. +""" + result = parse_email_chunks(text) + + # Find quoted chunks - they should have "John Doe" as source + quoted_chunks = [(t, s) for t, s in result if s is not None] + assert len(quoted_chunks) == 2 + for text, source in quoted_chunks: + assert source == "John Doe" + + def test_preserves_preamble(self) -> None: + text = """\ +Here's my preamble before the inline replies. + +On Mon, Dec 10, 2020 at 10:30 AM Someone wrote: +> Question? + +Answer! +""" + result = parse_email_chunks(text) + texts = [chunk[0] for chunk in result] + + assert any("preamble" in t for t in texts) + assert any("Answer" in t for t in texts) + + def test_strips_trailing_delimiters(self) -> None: + text = """\ +On Mon, Dec 10, 2020 at 10:30 AM Someone wrote: +> Question? + +Answer! +_______________ +""" + result = parse_email_chunks(text) + # Last non-quoted chunk should not end with underscores + original_chunks = [t for t, s in result if s is None] + assert len(original_chunks) > 0 + assert not original_chunks[-1].endswith("_") + + def test_quoted_content_is_unabbreviated(self) -> None: + text = """\ +On Mon, Dec 10, 2020 at 10:30 AM Someone wrote: +> This is a very long quoted line that should be preserved in full. +> And this is another line that continues the quote. +> Even more content here. + +My response. +""" + result = parse_email_chunks(text) + + # Find the quoted chunk + quoted = [t for t, s in result if s is not None] + assert len(quoted) == 1 + # Full content should be preserved + assert "very long quoted line" in quoted[0] + assert "another line" in quoted[0] + assert "Even more content" in quoted[0] diff --git a/tools/benchmark_semref_writes.py b/tools/benchmark_semref_writes.py index d799cba1..3162f495 100644 --- a/tools/benchmark_semref_writes.py +++ b/tools/benchmark_semref_writes.py @@ -43,14 +43,16 @@ TranscriptMessageMeta, ) - # --------------------------------------------------------------------------- # Inlined pre-optimization write path (one append + add_term per item) # --------------------------------------------------------------------------- async def _individual_add_knowledge( - conversation, message_ordinal, chunk_ordinal, knowledge, + conversation, + message_ordinal, + chunk_ordinal, + knowledge, ): """Reproduces the pre-optimization per-item write logic.""" verify_has_semantic_ref_index(conversation) @@ -95,7 +97,9 @@ async def _individual_add_knowledge( if action.object_entity_name != "none": await semantic_ref_index.add_term(action.object_entity_name, ordinal) if action.indirect_object_entity_name != "none": - await semantic_ref_index.add_term(action.indirect_object_entity_name, ordinal) + await semantic_ref_index.add_term( + action.indirect_object_entity_name, ordinal + ) if action.params: for param in action.params: if isinstance(param, str): @@ -135,8 +139,7 @@ def synthetic_knowledge(chunk_index: int) -> kplib.KnowledgeResponse: name=f"entity_{chunk_index}_{j}", type=[f"type_{j}", f"category_{chunk_index % 5}"], facets=[ - kplib.Facet(name=f"facet_{j}", value=f"value_{j}") - for j in range(2) + kplib.Facet(name=f"facet_{j}", value=f"value_{j}") for j in range(2) ], ) for j in range(3) @@ -237,15 +240,21 @@ async def main() -> None: description="Benchmark semref index write strategies.", ) parser.add_argument( - "--chunks", type=int, default=50, + "--chunks", + type=int, + default=50, help="Number of knowledge chunks to write per run (default: 50).", ) parser.add_argument( - "--rounds", type=int, default=10, + "--rounds", + type=int, + default=10, help="Number of timed rounds (default: 10).", ) parser.add_argument( - "--warmup", type=int, default=2, + "--warmup", + type=int, + default=2, help="Number of untimed warmup rounds (default: 2).", ) args = parser.parse_args() @@ -262,21 +271,31 @@ async def main() -> None: print(f"Total semrefs per run: ~{refs_per_chunk * args.chunks}") individual = await run_benchmark( - "Individual writes", bench_individual, - args.chunks, args.rounds, args.warmup, + "Individual writes", + bench_individual, + args.chunks, + args.rounds, + args.warmup, ) print_report( "Individual writes (per-entity append + add_term)", - individual, args.rounds, args.warmup, + individual, + args.rounds, + args.warmup, ) batched = await run_benchmark( - "Batched writes", bench_batched, - args.chunks, args.rounds, args.warmup, + "Batched writes", + bench_batched, + args.chunks, + args.rounds, + args.warmup, ) print_report( "Batched writes (bulk extend + add_terms_batch)", - batched, args.rounds, args.warmup, + batched, + args.rounds, + args.warmup, ) speedup = statistics.fmean(individual) / statistics.fmean(batched)