Open
Conversation
d788aa3 to
088b52b
Compare
|
this is great - i will test this locally |
wadesherman
reviewed
Aug 26, 2022
| private boolean shouldRecaptureSchemaSnapshot(MySqlPartition partition){ | ||
| try (MySqlSnapshotContext ctx = (MySqlSnapshotContext)prepare(partition)){ | ||
| determineCapturedTables(ctx); | ||
| boolean result = !databaseSchema.tableIds().containsAll(ctx.capturedSchemaTables); |
There was a problem hiding this comment.
do you think it is feasible to just call createSchemaEventsForTables right here?
Owner
Author
There was a problem hiding this comment.
@wadesherman I have tried
private void ensureSchemaForNewTables(MySqlPartition partition) {
try (MySqlSnapshotContext snapshotContext = (MySqlSnapshotContext) prepare(partition)) {
determineCapturedTables(snapshotContext);
boolean newTables = !databaseSchema.tableIds().containsAll(snapshotContext.capturedSchemaTables);
LOGGER.info("new tables detected: " + newTables);
if (newTables) {
ChangeEventSourceContext sourceContext = new ChangeEventSourceContext() {
@Override
public boolean isRunning() {
return true;
}
};
SnapshottingTask task = new SnapshottingTask(true, false);
createSchemaChangeEventsForTables(sourceContext, snapshotContext, task);
}
}
catch (Exception e) {
LOGGER.error("Failed to recapture new tables schemas.", e);
throw new RuntimeException(e);
}
}
@Override
protected SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffsetContext previousOffset) {
boolean snapshotSchema = true;
boolean snapshotData = true;
// found a previous offset and the earlier snapshot has completed
if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
snapshotSchema = databaseSchema.isStorageInitializationExecuted();
if (!snapshotSchema && databaseSchema.storeOnlyCapturedTables()) {
ensureSchemaForNewTables(partition);
}
snapshotData = false;
}
...But it doesn't produce schema messages. Looks like that something is not inititialized. I think that solution in this branch is good. The only one thing is memory overhead: it recapture all tables schemas. But it uses standard snapshot procedure with locks and interruption on connector stop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
We will have overhead. If
database.history.store.only.monitored.tables.ddlis set totrueon each connector start it will additionally load tables list from database.p.s.
store.only.captured.tables.ddlis deprecated, should usestore.only.captured.tables.ddlinstead