fix: Emitted ACTIVATE_VERSION now honor the stream aliases configured with stream maps#3654
Conversation
Reviewer's guide (collapsed on small PRs)Reviewer's GuideFix ACTIVATE_VERSION emission to respect stream aliases defined by stream maps and extend mapper tests/snapshots to assert consistent aliased stream names across SCHEMA, ACTIVATE_VERSION, and RECORD messages. Sequence diagram for ACTIVATE_VERSION emission with stream aliasingsequenceDiagram
participant Tap as Tap
participant Stream as MappedStream
participant StreamMap as StreamMap
participant Singer as singer
Tap->>Stream: _write_activate_version_message(full_table_version)
loop for each stream_map in stream_maps
Stream->>StreamMap: [check isinstance RemoveRecordTransform]
alt non RemoveRecordTransform
Stream->>Tap: write_message(ActivateVersionMessage)
Note over Stream,Singer: ActivateVersionMessage(stream=stream_map.stream_alias,
Note over Stream,Singer: version=full_table_version)
else RemoveRecordTransform
Stream-->>Stream: [continue]
end
end
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- In
_write_activate_version_message, the iteration and filtering overself.stream_mapsduplicates logic used elsewhere for emitting aliased messages; consider refactoring to share a common helper so that SCHEMA/RECORD/ACTIVATE_VERSION stay consistent if stream-map behavior changes. - The special-case
isinstance(stream_map, RemoveRecordTransform)check leaks transform-specific knowledge into the core stream logic; moving this decision behind a method/flag on the stream map (e.g.,should_emit_messages) would keep concerns better separated.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `_write_activate_version_message`, the iteration and filtering over `self.stream_maps` duplicates logic used elsewhere for emitting aliased messages; consider refactoring to share a common helper so that SCHEMA/RECORD/ACTIVATE_VERSION stay consistent if stream-map behavior changes.
- The special-case `isinstance(stream_map, RemoveRecordTransform)` check leaks transform-specific knowledge into the core stream logic; moving this decision behind a method/flag on the stream map (e.g., `should_emit_messages`) would keep concerns better separated.
## Individual Comments
### Comment 1
<location path="tests/core/test_parent_child.py" line_range="508-509" />
<code_context>
assert tally["sibling"] == 4, msg
+
+
+@time_machine.travel(DATETIME, tick=False)
+def test_activate_version_uses_stream_alias():
+ """ACTIVATE_VERSION must use the mapped stream alias, not the raw stream name.
+
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a complementary test for non-aliased streams to ensure ACTIVATE_VERSION still uses the original stream name when no `__alias__` is configured.
To fully verify the behavior, please add a second test where `stream_maps` is empty or omits `__alias__` (e.g. `{"raw_name": {}}`) and assert that `SCHEMA`, `RECORD`, and `ACTIVATE_VERSION` all use `"raw_name"`. This will help prevent regressions in the non-aliased path.
Suggested implementation:
```python
msg = "Sibling records should also be emitted"
assert tally["sibling"] == 4, msg
def test_activate_version_uses_raw_stream_name_when_no_alias():
"""ACTIVATE_VERSION must use the raw stream name when no alias is configured.
When stream_maps is empty or does not define __alias__ for a stream, the
ACTIVATE_VERSION message must carry the original raw stream name so the target
can still correctly correlate it with the SCHEMA and RECORD messages.
"""
# Use a stream map that keeps the stream unaliased.
stream_maps = {"raw_name": {}}
# This helper should mirror the setup used in `test_activate_version_uses_stream_alias`,
# but assert that SCHEMA, RECORD, and ACTIVATE_VERSION all use "raw_name".
_assert_activate_version_stream_name(
stream_maps=stream_maps,
expected_stream_name="raw_name",
)
@time_machine.travel(DATETIME, tick=False)
def test_activate_version_uses_stream_alias():
```
To fully implement this, you should:
1. Introduce a shared helper `_assert_activate_version_stream_name(stream_maps, expected_stream_name)` in this test module (or reuse an existing helper if one already exists in the file) that:
- Configures/overrides `stream_maps` for the parent/child tap or target.
- Executes the scenario that produces SCHEMA, RECORD, and ACTIVATE_VERSION messages for the relevant stream.
- Collects those messages (e.g. via an in-memory target, captured stdout, or existing test utilities).
- Asserts that:
* The SCHEMA message's `stream` (or equivalent field) equals `expected_stream_name`.
* At least one RECORD message for that logical stream uses `expected_stream_name`.
* The ACTIVATE_VERSION message's `stream` (or equivalent field) equals `expected_stream_name`.
2. Update `test_activate_version_uses_stream_alias` to call `_assert_activate_version_stream_name` with a `stream_maps` that includes `{"raw_name": {"__alias__": "aliased_name"}}` and `expected_stream_name="aliased_name"`, so both tests share the same verification logic.
3. Ensure any fixtures or utilities required by `_assert_activate_version_stream_name` (e.g. tap/target runners, message collectors) are imported or injected into the helper according to the conventions already used elsewhere in `tests/core/test_parent_child.py`.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## v0.54 #3654 +/- ##
=======================================
Coverage 93.81% 93.81%
=======================================
Files 73 73
Lines 5948 5951 +3
Branches 729 731 +2
=======================================
+ Hits 5580 5583 +3
Misses 273 273
Partials 95 95
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Documentation build overview
8 files changed ·
|
0bb0bc0 to
020cc39
Compare
…ed with stream maps Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
020cc39 to
bfe5079
Compare
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- The new implementation skips emitting ACTIVATE_VERSION entirely when
self.stream_mapsis empty, which changes prior behavior; consider falling back toself.namein that case so unmapped streams still emit an activation message. - Looping over all
stream_mapsand emitting an ACTIVATE_VERSION for each non-RemoveRecordTransformmay produce duplicate activation messages when multiple maps share the samestream_alias; consider deduplicating on alias or clearly constraining expected configurations.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The new implementation skips emitting ACTIVATE_VERSION entirely when `self.stream_maps` is empty, which changes prior behavior; consider falling back to `self.name` in that case so unmapped streams still emit an activation message.
- Looping over all `stream_maps` and emitting an ACTIVATE_VERSION for each non-`RemoveRecordTransform` may produce duplicate activation messages when multiple maps share the same `stream_alias`; consider deduplicating on alias or clearly constraining expected configurations.
## Individual Comments
### Comment 1
<location path="singer_sdk/streams/core.py" line_range="858-869" />
<code_context>
+ if isinstance(stream_map, RemoveRecordTransform):
+ continue
+
+ self._tap.write_message(
+ singer.ActivateVersionMessage(
+ stream=stream_map.stream_alias,
+ version=full_table_version,
+ )
)
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Avoid emitting duplicate ACTIVATE_VERSION messages for the same mapped alias.
If `self.stream_maps` can contain multiple entries with the same `stream_alias`, this will emit duplicate `ACTIVATE_VERSION` messages for that alias. Even if the protocol tolerates this, it adds noise and may confuse some targets. Consider de‑duplicating by alias (e.g., keep a `seen_aliases` set and emit once per alias), unless there’s a requirement to send one message per mapping.
```suggestion
def _write_activate_version_message(self, full_table_version: int) -> None:
"""Write out an ACTIVATE_VERSION message."""
seen_aliases = set()
for stream_map in self.stream_maps:
if isinstance(stream_map, RemoveRecordTransform):
continue
stream_alias = stream_map.stream_alias
if stream_alias in seen_aliases:
continue
seen_aliases.add(stream_alias)
self._tap.write_message(
singer.ActivateVersionMessage(
stream=stream_alias,
version=full_table_version,
)
)
```
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
SSIA
Summary by Sourcery
Ensure ACTIVATE_VERSION messages emitted by mapped streams respect configured stream aliases instead of always using the original stream name.
Bug Fixes:
Tests: