Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotAncestryValidator;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -293,9 +296,37 @@ private void commitOperation(
operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId);
operation.set(SinkUtil.OPERATOR_ID, operatorId);
operation.toBranch(branch);
// Guard against the duplicate-commit race tracked in Iceberg issue #14425: between the
// initial getMaxCommittedCheckpointId() read and SnapshotProducer.refresh() inside commit(),
// an in-flight commit from a prior run can land in the catalog. Without this check a second
// snapshot would be created for the same checkpoint id. The validator runs after the
// post-refresh ancestry is known and aborts the commit when the same (job-id, operator-id)
// tuple has already advanced max-committed-checkpoint-id.
operation.validateWith(
new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId, operatorId));

long startNano = System.nanoTime();
operation.commit(); // abort is automatically called if this fails.
try {
operation.commit(); // abort is automatically called if this fails.
} catch (ValidationException e) {
if (e.getMessage() != null
&& e.getMessage().contains(MaxCommittedCheckpointIdValidator.MARKER)) {
LOG.info(
"Skipping commit of {} for checkpoint {} to table {} branch {}: the table already "
+ "reflects this checkpoint for (jobId={}, operatorId={}). This can happen on "
+ "recovery when an in-flight commit from a prior run landed after the stale "
+ "max-committed-checkpoint-id check (Iceberg issue #14425).",
description,
checkpointId,
table.name(),
branch,
newFlinkJobId,
operatorId,
e);
return;
}
throw e;
}
long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano);
LOG.info(
"Committed {} to table: {}, branch: {}, checkpointId {} in {} ms",
Expand All @@ -309,6 +340,44 @@ private void commitOperation(
}
}

/**
* {@link SnapshotAncestryValidator} that rejects a commit attempt when the post-refresh ancestry
* already contains a snapshot from the same {@code (flink.job-id, flink.operator-id)} tuple with
* {@code flink.max-committed-checkpoint-id} ≥ the staged checkpoint id.
*
* <p>This closes the recovery race described in <a
* href="https://github.com/apache/iceberg/issues/14425">issue #14425</a>. Detection is by the
* unique {@link #MARKER} string surfaced through {@link #errorMessage()}; {@code commitOperation}
* catches the resulting {@link ValidationException} and treats the commit as already-applied.
*/
private static class MaxCommittedCheckpointIdValidator implements SnapshotAncestryValidator {
private static final String MARKER =
"flink.max-committed-checkpoint-id already advanced past staged checkpoint";

private final long stagedCheckpointId;
private final String flinkJobId;
private final String flinkOperatorId;

private MaxCommittedCheckpointIdValidator(
long stagedCheckpointId, String flinkJobId, String flinkOperatorId) {
this.stagedCheckpointId = stagedCheckpointId;
this.flinkJobId = flinkJobId;
this.flinkOperatorId = flinkOperatorId;
}

@Override
public boolean validate(Iterable<Snapshot> baseSnapshots) {
long committed =
SinkUtil.getMaxCommittedCheckpointId(baseSnapshots, flinkJobId, flinkOperatorId);
return committed < stagedCheckpointId;
}

@Override
public String errorMessage() {
return MARKER;
}
}

@Override
public void close() throws IOException {
tableLoader.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotAncestryValidator;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -388,9 +391,37 @@ private void commitOperation(
operation.set(FLINK_JOB_ID, newFlinkJobId);
operation.set(OPERATOR_ID, operatorId);
operation.toBranch(branch);
// Guard against the duplicate-commit race tracked in Iceberg issue #14425: between the
// initial getMaxCommittedCheckpointId() read on initializeState and SnapshotProducer.refresh()
// inside commit(), an in-flight commit from a prior run can land in the catalog. Without
// this check, a second snapshot would be created for the same checkpoint id. The validator
// runs after the post-refresh ancestry is known and aborts the commit if the same
// (job-id, operator-id) tuple has already advanced the max-committed-checkpoint-id.
operation.validateWith(
new MaxCommittedCheckpointIdValidator(checkpointId, newFlinkJobId, operatorId));

long startNano = System.nanoTime();
operation.commit(); // abort is automatically called if this fails.
try {
operation.commit(); // abort is automatically called if this fails.
} catch (ValidationException e) {
if (e.getMessage() != null
&& e.getMessage().contains(MaxCommittedCheckpointIdValidator.MARKER)) {
LOG.info(
"Skipping commit of {} for checkpoint {} to table {} branch {}: the table already "
+ "reflects this checkpoint for (jobId={}, operatorId={}). This can happen on "
+ "recovery when an in-flight commit from a prior run landed after the stale "
+ "max-committed-checkpoint-id check (Iceberg issue #14425).",
description,
checkpointId,
table.name(),
branch,
newFlinkJobId,
operatorId,
e);
return;
}
throw e;
}
long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano);
LOG.info(
"Committed {} to table: {}, branch: {}, checkpointId {} in {} ms",
Expand All @@ -402,6 +433,44 @@ private void commitOperation(
committerMetrics.commitDuration(durationMs);
}

/**
* {@link SnapshotAncestryValidator} that rejects a commit attempt when the post-refresh ancestry
* already contains a snapshot from the same {@code (flink.job-id, flink.operator-id)} tuple with
* {@code flink.max-committed-checkpoint-id} ≥ the staged checkpoint id.
*
* <p>This is the mechanism that closes the recovery race described in <a
* href="https://github.com/apache/iceberg/issues/14425">issue #14425</a>. Detection is by the
* unique {@link #MARKER} string surfaced through {@link #errorMessage()}; {@code commitOperation}
* catches the resulting {@link ValidationException} and treats the commit as already-applied.
*/
private static class MaxCommittedCheckpointIdValidator implements SnapshotAncestryValidator {
private static final String MARKER =
"flink.max-committed-checkpoint-id already advanced past staged checkpoint";

private final long stagedCheckpointId;
private final String flinkJobId;
private final String flinkOperatorId;

private MaxCommittedCheckpointIdValidator(
long stagedCheckpointId, String flinkJobId, String flinkOperatorId) {
this.stagedCheckpointId = stagedCheckpointId;
this.flinkJobId = flinkJobId;
this.flinkOperatorId = flinkOperatorId;
}

@Override
public boolean validate(Iterable<Snapshot> baseSnapshots) {
long committed =
SinkUtil.getMaxCommittedCheckpointId(baseSnapshots, flinkJobId, flinkOperatorId);
return committed < stagedCheckpointId;
}

@Override
public String errorMessage() {
return MARKER;
}
}

@Override
public void processElement(StreamRecord<FlinkWriteResult> element) {
FlinkWriteResult flinkWriteResult = element.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.SnapshotUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -83,9 +84,29 @@ static Set<Integer> checkAndGetEqualityFieldIds(Table table, List<String> equali
static long getMaxCommittedCheckpointId(
Table table, String flinkJobId, String operatorId, String branch) {
Snapshot snapshot = table.snapshot(branch);
Iterable<Snapshot> ancestry =
snapshot != null
? SnapshotUtil.ancestorsOf(snapshot.snapshotId(), table::snapshot)
: List.of();
return getMaxCommittedCheckpointId(ancestry, flinkJobId, operatorId);
}

/**
* Returns the largest {@code flink.max-committed-checkpoint-id} carried by a snapshot in the
* given ancestry whose {@code flink.job-id} matches {@code flinkJobId} and whose {@code
* flink.operator-id} either matches {@code operatorId} or is absent. Returns {@code
* INITIAL_CHECKPOINT_ID} (-1) when no such snapshot is found.
*
* <p>Used both for the static read in {@link IcebergFilesCommitter#initializeState} (via the
* {@link Table}-based overload above) and for the post-refresh validation inside {@link
* IcebergFilesCommitter#commitOperation} that prevents the duplicate-commit race tracked in
* Iceberg issue #14425.
*/
static long getMaxCommittedCheckpointId(
Iterable<Snapshot> ancestry, String flinkJobId, String operatorId) {
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;

while (snapshot != null) {
for (Snapshot snapshot : ancestry) {
Map<String, String> summary = snapshot.summary();
String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
String snapshotOperatorId = summary.get(OPERATOR_ID);
Expand All @@ -97,8 +118,6 @@ static long getMaxCommittedCheckpointId(
break;
}
}
Long parentSnapshotId = snapshot.parentId();
snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null;
}

return lastCommittedCheckpointId;
Expand Down
Loading