refactor: Insert then delete#3453
Conversation
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Reviewer's GuideRefactors SQL hard delete handling to use an upsert-then-delete workflow for LOG_BASED replication, simplifying deletion logic to rely on the _sdc_deleted_at column and adding a generic bulk_upsert_records implementation using SQLite-style INSERT OR REPLACE. Sequence diagram for hard_delete upsert-then-delete workflow in SQLSink.process_batchsequenceDiagram
actor Tap
participant SQLSink
participant SQLConnector
participant Database
Tap->>SQLSink: process_batch(context)
SQLSink->>SQLSink: config.hard_delete == True
SQLSink->>SQLSink: bulk_upsert_records(full_table_name, schema, records)
SQLSink->>SQLSink: conform_schema(schema)
SQLSink->>SQLSink: conform_record(record) * for each record
SQLSink->>SQLConnector: _connect()
SQLConnector-->>SQLSink: Connection
SQLSink->>Database: INSERT OR REPLACE INTO full_table_name VALUES(records)
Database-->>SQLSink: rowcount
SQLSink->>SQLSink: hard_delete_records()
SQLSink->>SQLConnector: delete_where_sdc_deleted_at_is_not_null(full_table_name, sdc_deleted_at_column)
SQLConnector->>SQLConnector: _connect()
SQLConnector->>Database: DELETE FROM full_table_name WHERE sdc_deleted_at_column IS NOT NULL
Database-->>SQLConnector: rowcount
SQLConnector-->>SQLSink: rows_deleted
SQLSink-->>Tap: batch processed
Class diagram for updated SQLSink and SQLConnector hard delete and upsert behaviorclassDiagram
class SQLSink {
+connector_class: type~_C~
+connector: SQLConnector
+full_table_name: str | FullyQualifiedName
+schema: dict
+soft_delete_column_name: str
+key_properties: list~str~
+process_batch(context: dict) void
+bulk_insert_records(full_table_name: str | FullyQualifiedName, schema: dict, records: Iterable~dict~) int
+bulk_upsert_records(full_table_name: str | FullyQualifiedName, schema: dict, records: Iterable~dict~) int
+hard_delete_records() int
+conform_schema(schema: dict) dict
+conform_record(record: dict) dict
+conform_name(name: str, kind: str) str
}
class SQLConnector {
+delete_where_sdc_deleted_at_is_not_null(full_table_name: str | FullyQualifiedName, sdc_deleted_at_column: str) int
+_connect() Connection
}
SQLSink --> SQLConnector: uses connector
note for SQLSink "When config.hard_delete is true, process_batch calls bulk_upsert_records then hard_delete_records"
note for SQLConnector "hard_delete_records in SQLSink delegates to delete_where_sdc_deleted_at_is_not_null, which deletes rows where _sdc_deleted_at is not null"
File-Level ChangesIS NOT NULL statement.
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Codecov Report❌ Patch coverage is
❌ Your patch check has failed because the patch coverage (91.30%) is below the target coverage (100.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## sdk-log-based-hard-delete #3453 +/- ##
=============================================================
- Coverage 94.17% 94.13% -0.05%
=============================================================
Files 70 70
Lines 5820 5811 -9
Branches 724 719 -5
=============================================================
- Hits 5481 5470 -11
- Misses 236 237 +1
- Partials 103 104 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
_sdc_deleted_atto flag records for deletion in hard-delete mode #3450Summary by Sourcery
Refactor SQL hard-delete handling to upsert then delete based on the soft delete column for LOG_BASED replication.
New Features:
Bug Fixes:
Enhancements: