feat(sync-service): Send individual operations to ShapeLogCollector #3429
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Part of #3414, this PR sends individual operations from the ReplicationClient to ShapeLogCollector. This should reduce the working memory used by the ReplicationClient, but the ultimate aim is to send individual operations to storage and so not buffer at any point (unless subqueries are involved). This PR just moves the transaction consolidation one step, from ReplicationClient to ShapeLogCollector to keep the PR small. Other PRs will move the consolidation further, from ShapeLogCollector to Consumer, then finally from to Consumer to storage (except with subqueries are involved).
Currently the replication stream take two forms:
Replication.Messagessuch asMessages.Begin,Messages.Commit,Messages.Insertetc.Changes.Transaction(orChanges.Relation)And
Collectorconverts one to the other.This PR introduces an intermediate form, Operations, as Messages are coupled to the postgres replication protocol and are not optimised for use with Shapes (for example they do not include the relation name) and Transactions are whole transactions. So with this PR we have:
Replication.Messagessuch asMessages.Begin,Messages.Commit,Messages.Insertetc.Changes.operation()) such asChanges.Begin.Changes.Commit,Changes.NewRecordetc.Changes.Transaction(orChanges.Relation) which I'm calling database actions (spec:Changes.action())Collectorhas been replaced byMessageConverterthat converts 1 to 2 andTransactionBuilderthat coverts 2 to 3.