-
Notifications
You must be signed in to change notification settings - Fork 18
Description
When there is an exception while reading a window, we restart it with TaskState
pointing to last correctly read change. After restarting, we read the entire window from beginning, but we ignore unnecessary rows (up to last correctly read change).
The problem with current implementation is that it does this ignoring recursively (see last findNext
at line 148):
scylla-cdc-java/scylla-cdc-driver3/src/main/java/com/scylladb/cdc/cql/driver3/Driver3WorkerCQL.java
Lines 104 to 151 in a2c3c18
private void findNext(CompletableFuture<Optional<RawChange>> fut) { | |
if (rs.getAvailableWithoutFetching() == 0) { | |
if (rs.isFullyFetched()) { | |
fut.complete(Optional.empty()); | |
} else { | |
Futures.addCallback(rs.fetchMoreResults(), new FutureCallback<ResultSet>() { | |
@Override | |
public void onSuccess(ResultSet result) { | |
// There's no guarantee what thread will run this | |
rs = result; | |
findNext(fut); | |
} | |
@Override | |
public void onFailure(Throwable t) { | |
fut.completeExceptionally(t); | |
} | |
}); | |
} | |
} else { | |
Row row = rs.one(); | |
if (schema == null) { | |
schema = new Driver3SchemaBuilder() | |
.withClusterMetadata(session.getCluster().getMetadata()) | |
.withRow(row).build(); | |
} else { | |
// TODO: the schema might have changed | |
// is there some hash/digest that we can use to check that? | |
// it wouldn't be nice if we had to update `schema` on each query/page (expensive) | |
// See Scylla issue #7824. | |
} | |
Driver3RawChange newChange = new Driver3RawChange(row, schema); | |
// lastChangeId determines the point from which we should | |
// start reading within a window. In this implementation | |
// we simply read the entire window and skip rows that | |
// were before lastChangeId. | |
// | |
// If lastChangeId is Optional.empty(), then we read | |
// the entire window. | |
if (!lastChangeId.isPresent() || newChange.getId().compareTo(lastChangeId.get()) > 0) { | |
fut.complete(Optional.of(newChange)); | |
} else { | |
findNext(fut); | |
} | |
} | |
} |
I have observed one instance of this causing StackOverflowError. What's bad is that it won't be able to succeed upon restart (it will try to do it again after restart and fail in the same fashion).
The long term fix for this is to implement smarter resuming of reading (do not skip client-side, but craft such CQL queries that will read the correct portion of window).
/cc @haaawk Would you approve of a quick fix for this problem (wrapping findNext
with while
to do the skipping iteratively, not recursively) - doing "smarter resuming" will require more time and there are other priorities right now. Of course I will do "smarter resuming" later.