feat(taps): Allow REST streams to ignore certain errors and continue on to the next stream/context#3517
feat(taps): Allow REST streams to ignore certain errors and continue on to the next stream/context#3517edgarrmondragon wants to merge 23 commits into
Conversation
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
…on to the next stream/context Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Reviewer's GuideAdds support for ignorable REST stream errors that skip the current request/stream while allowing the overall sync to continue, and introduces tests to validate this behavior and its logging. Sequence diagram for REST stream request handling with ignorable errorssequenceDiagram
actor TapRunner
participant RESTStream
participant Paginator
participant HTTPClient as decorated_request
participant Logger
TapRunner->>RESTStream: request_records(context)
loop pages for context
RESTStream->>Paginator: get_next(prepared_request, context)
Paginator-->>RESTStream: next_page_token
RESTStream->>HTTPClient: decorated_request(prepared_request, context)
HTTPClient-->>RESTStream: IgnorableSyncError
RESTStream->>Logger: warning(Skipping request due to ignorable error)
RESTStream-->>TapRunner: break page loop for this context
end
TapRunner->>TapRunner: continue with next stream/context
Class diagram for REST stream error handling extensionsclassDiagram
class RESTStream {
logger
extra_retry_statuses
validate_response(response)
request_records(context)
parse_response(response)
update_sync_costs(prepared_request, response, context)
}
class IgnorableAPIError {
}
class IgnorableSyncError {
}
RESTStream ..> IgnorableAPIError : may raise
RESTStream ..> IgnorableSyncError : caught in
note for RESTStream "validate_response now documents IgnorableAPIError; request_records catches IgnorableSyncError to skip current request/stream and continue sync"
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Documentation build overview
17 files changed ·
|
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- In
request_records, you catchIgnorableSyncErrorbut the rest of the change (including tests) usesIgnorableAPIError; ensure the exception type is consistent and properly imported so ignorable errors are actually handled as intended.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `request_records`, you catch `IgnorableSyncError` but the rest of the change (including tests) uses `IgnorableAPIError`; ensure the exception type is consistent and properly imported so ignorable errors are actually handled as intended.
## Individual Comments
### Comment 1
<location path="singer_sdk/streams/rest.py" line_range="236-240" />
<code_context>
FatalAPIError: If the request is not retriable.
RetriableAPIError: If the request is retriable.
- """
+ IgnorableAPIError: If the request should be silently skipped.
+ """ # noqa: DOC502
if (
response.status_code in self.extra_retry_statuses
</code_context>
<issue_to_address>
**suggestion:** Clarify whether "silently skipped" aligns with the new warning log on ignorable errors.
The docstring says `IgnorableAPIError` should be "silently skipped", but `request_records` now logs a warning for `IgnorableSyncError`. If these represent the same class of error, consider either lowering the log level (info/debug) or rephrasing the docstring so it no longer implies the skip is silent to operators.
```suggestion
Raises:
FatalAPIError: If the request is not retriable.
RetriableAPIError: If the request is retriable.
IgnorableSyncError: If the request is skipped without raising an error
to the caller (the skip is still logged).
""" # noqa: DOC502
```
</issue_to_address>
### Comment 2
<location path="singer_sdk/streams/rest.py" line_range="479-485" />
<code_context>
next_page_token=paginator.current_value,
)
- resp = decorated_request(prepared_request, context)
+ try:
+ resp = decorated_request(prepared_request, context)
+ except IgnorableSyncError as e:
+ self.logger.warning(
+ "Skipping request due to ignorable error: %s", e
+ )
+ break
request_counter.increment()
self.update_sync_costs(prepared_request, resp, context)
</code_context>
<issue_to_address>
**question (bug_risk):** Re-evaluate using `break` on ignorable errors, which stops pagination for the remaining pages.
This handling stops pagination for the entire stream as soon as an `IgnorableSyncError` occurs. Please confirm whether the intent is to abort the rest of the loop, or whether we should instead skip just this request/page and continue with the next page/token.
</issue_to_address>
### Comment 3
<location path="tests/core/rest/test_failure.py" line_range="262-271" />
<code_context>
+ stream.validate_response(fake_response)
+
+
+def test_request_records_skips_on_ignorable_error(
+ requests_mock: requests_mock_module.Mocker,
+ rest_tap,
+ caplog: pytest.LogCaptureFixture,
+):
+ """request_records yields nothing and logs a warning on IgnorableAPIError.
+
+ No exception should propagate; the stream is silently skipped.
+ """
+ requests_mock.get("https://example.com/dummy", status_code=404, reason="Not Found")
+
+ stream = IgnorableStream(rest_tap)
+
+ with caplog.at_level(logging.WARNING, logger=stream.logger.name):
+ records = list(stream.request_records(None))
+
+ assert records == []
+ assert any(
+ "Skipping request due to ignorable error" in record.message
+ for record in caplog.records
</code_context>
<issue_to_address>
**suggestion (testing):** Consider an additional test for ignorable errors occurring after some records have already been emitted (pagination case)
You’re already covering the case where the first request is ignorable. Since `request_records` is commonly used with paginated streams, please add a test where an initial page returns records and a subsequent page hits an ignorable error (e.g., 404). The test should assert that the records from the successful page are returned, the warning is logged, and pagination stops after the ignorable error.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3517 +/- ##
=======================================
Coverage 93.85% 93.86%
=======================================
Files 73 74 +1
Lines 5907 5965 +58
Branches 725 735 +10
=======================================
+ Hits 5544 5599 +55
- Misses 270 274 +4
+ Partials 93 92 -1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
|
This will be nice to replace our custom |
Do you have handy examples of those implementations? I'd like to make sure this would actually let us remove the customizations. |
|
|
Also, IMO it would be good to be able to run a tap in a generic "skip errors" mode and have it report the errors at the end of the sync. That would ensure it gets all the data for all streams where possible, rather than exiting on the first encountered error. It's pretty common that for a client, when their pipeline fails and we narrow down to an issue in a particular stream, they ask us whether data for other the streams was synced - the answer is usually a "unfortunately not" (unless we already added explicit handling of known-resumable errors). |
I think it might make sense for this mode to actually be the only mode. I'm trying to imagine what problems that would create, but as long as we capture the errors with contextual information (stream, "stage", etc.) and exit with the right code, it feels like it's the correct approach. UPDATE: |
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
86d8ab7 to
ff8d083
Compare
…ror (#3545) Stub ## Summary by Sourcery Track per-stream sync results and allow tap runs to continue syncing remaining streams after individual failures while reporting an aggregate outcome. New Features: - Introduce a SyncResult enum to represent and combine per-stream sync outcomes and map them to process exit codes. - Return an aggregate SyncResult from Tap.sync_all and use it to set the tap process exit code instead of always exiting successfully. Bug Fixes: - Ensure errors in child streams mark the parent stream as partially successful instead of aborting remaining children and records. - Convert unexpected exceptions during stream sync into lifecycle abort exceptions so they are consistently handled and logged. Enhancements: - Log a one-line sync outcome per stream and improve error log messages during sync to include exception details. - Extend parent/child stream typing with Context and Record aliases and apply override annotations for better type safety. - Adjust typing-extensions dependency marker for Python 3.13 compatibility. Tests: - Add unit tests and snapshot tests covering SyncResult behavior and tap behavior when some streams fail while others continue syncing, including incremental and parent/child scenarios. --------- Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com> Signed-off-by: Edgar Ramírez-Mondragón <edgarrm358@gmail.com>
SSIA
Summary by Sourcery
Add support for treating certain REST stream errors as ignorable so affected requests are skipped without failing the sync.
Enhancements:
Tests: