Skip to content

createOrReplace drops concurrent writers' snapshots on commit retry #16942

Description

@amenck

Apache Iceberg version

1.11.0 (latest release)

Query engine

Spark

Please describe the bug 🐞

As shown in the repro below, when two concurrent createOrReplace calls run on the same table, the snapshot from whichever table finishes first is dropped from the history.

"""
Minimal reproduction: createOrReplace silently drops concurrent writers' snapshots.

Sequential createOrReplace calls correctly preserve all snapshots in the table's
history. But when two createOrReplace calls race, the retry path in
BaseTransaction.commitReplaceTransaction refreshes `base` (the latest committed
metadata) without rebuilding `current` (the metadata being committed). The retried
commit overwrites the table with stale metadata, dropping any snapshots the other
writer added.

Run:
    pip install pyspark==3.5.5
    python repro_create_or_replace_snapshot_loss.py
"""

import os
import shutil
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from threading import Barrier

ICEBERG_VERSION = "1.11.0"
SPARK_MAJOR = "3.5"
JAR_NAME = f"iceberg-spark-runtime-{SPARK_MAJOR}_2.12-{ICEBERG_VERSION}.jar"
JAR_PATH = os.path.join("/tmp", JAR_NAME)
WAREHOUSE = "/tmp/iceberg-repro-warehouse"
TABLE = "local.db.repro"

if not os.path.exists(JAR_PATH):
    url = (
        f"https://repo1.maven.org/maven2/org/apache/iceberg/"
        f"iceberg-spark-runtime-{SPARK_MAJOR}_2.12/{ICEBERG_VERSION}/{JAR_NAME}"
    )
    print(f"Downloading {JAR_NAME} ...")
    urllib.request.urlretrieve(url, JAR_PATH)

if os.path.exists(WAREHOUSE):
    shutil.rmtree(WAREHOUSE)

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .master("local[4]")
    .config("spark.jars", JAR_PATH)
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.local.type", "hadoop")
    .config("spark.sql.catalog.local.warehouse", WAREHOUSE)
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .getOrCreate()
)

spark.sql("CREATE NAMESPACE IF NOT EXISTS local.db")

# ---------------------------------------------------------------------------
# Part 1: Sequential createOrReplace preserves all snapshots (expected behavior)
# ---------------------------------------------------------------------------
print("=" * 70)
print("Part 1: Sequential createOrReplace — snapshots should be preserved")
print("=" * 70)

df_a = spark.createDataFrame([(1, "a")], schema=["id", "label"])
df_a.writeTo(TABLE).using("iceberg").createOrReplace()

df_b = spark.createDataFrame([(2, "b")], schema=["id", "label"])
df_b.writeTo(TABLE).using("iceberg").createOrReplace()

df_c = spark.createDataFrame([(3, "c")], schema=["id", "label"])
df_c.writeTo(TABLE).using("iceberg").createOrReplace()

sequential_snapshots = spark.sql(f"SELECT snapshot_id FROM {TABLE}.snapshots").count()
print(f"  After 3 sequential createOrReplace calls: {sequential_snapshots} snapshots")
assert sequential_snapshots == 3, f"Expected 3 snapshots, got {sequential_snapshots}"
print("  OK — all 3 snapshots preserved")

# ---------------------------------------------------------------------------
# Part 2: Concurrent createOrReplace drops snapshots (bug)
# ---------------------------------------------------------------------------
print()
print("=" * 70)
print("Part 2: Concurrent createOrReplace — snapshots should be preserved, but are lost")
print("=" * 70)

ITERATIONS = 10
lost_count = 0

for i in range(ITERATIONS):
    pre_count = spark.sql(f"SELECT * FROM {TABLE}.snapshots").count()

    barrier = Barrier(2)

    def write(writer_id: int) -> None:
        barrier.wait()
        df = spark.createDataFrame(
            [(i * 10 + writer_id, f"writer_{writer_id}")],
            schema=["id", "label"],
        )
        df.writeTo(TABLE).using("iceberg").createOrReplace()

    with ThreadPoolExecutor(max_workers=2) as pool:
        futures = [pool.submit(write, writer_id=w) for w in (1, 2)]
        for fut in futures:
            fut.result()

    post_count = spark.sql(f"SELECT * FROM {TABLE}.snapshots").count()
    added = post_count - pre_count

    if added < 2:
        lost_count += 1
        print(f"  iteration {i}: LOST — 2 writers committed but only {added} new snapshot(s) appeared")
    else:
        print(f"  iteration {i}: ok")

print()
if lost_count > 0:
    print(f"BUG CONFIRMED: {lost_count}/{ITERATIONS} iterations lost a snapshot.")
    print()
    print("Root cause: BaseTransaction.commitReplaceTransaction refreshes `base`")
    print("on retry but does not rebuild `current`, so the retried commit overwrites")
    print("the table metadata with a stale version that is missing the concurrent")
    print("writer's snapshot.")
else:
    print(f"No snapshot loss detected in {ITERATIONS} iterations.")
    print("The race condition did not trigger — try increasing ITERATIONS.")

spark.stop()

I would expect createOrReplace to preserve the snapshot history, including snapshots that committed concurrently.

I'm not quite certain if this is a bug, or if this behavior is intended. In the use-case I'm working on, it most definitely manifests as a bug--we expect snapshots to that commit concurrently to be preserved in the history. I'd argue that this expectation makes sense, since the snapshot at least temporarily could have been read by clients (before being clobbered by the second write).

Environment

Iceberg: 1.11.0
Spark: 3.5.5
Catalog: Hadoop

Initially saw this happening in EMR with an AWS Glue catalog, but was able to get the minimal repro above.

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions