diff --git a/docs/src/operations/ddl/create-table.md b/docs/src/operations/ddl/create-table.md index 5468fabc..0842fa06 100644 --- a/docs/src/operations/ddl/create-table.md +++ b/docs/src/operations/ddl/create-table.md @@ -423,26 +423,71 @@ To create a table with large string columns, use the table property pattern `.lance.` TBLPROPERTIES to `lance-encoding:` Arrow field metadata, which the -Lance Rust encoder reads at write time. +Lance supports per-column compression and encoding tuning via table properties, mapped onto +`lance-encoding:` Arrow field metadata, which the Lance Rust encoder reads at write time. +Two key formats are accepted: + +| Format | Shape | Targets | +|---|---|---| +| Legacy | `.lance.` | top-level columns only | +| New | `lance..column.....` | top-level **and** nested fields | + +When both formats target the same field, the new format wins (an invalid stale legacy value is +not validated when the new format covers the same path). The legacy format remains supported +indefinitely. + +If one literal property key is both a valid legacy key for an existing top-level column and a +valid new-format key for a different nested path, the legacy interpretation is used to preserve +backward compatibility for top-level column names that begin with `lance..column.`. +More generally, keys ending in a supported legacy suffix such as `.lance.compression` are +reserved for legacy parsing and are not interpreted as new-format nested paths. ### Supported keys (Spark connector) -These five `.lance.*` properties are mapped to Arrow field metadata by the connector. Other -`lance.*` keys are ignored until implemented. +These five encoding keys are mapped to Arrow field metadata by the connector. Other `lance.*` keys +are ignored until implemented. -| TBLPROPERTIES key | Arrow field metadata key | Valid values | -|---|---|---| -| `.lance.compression` | `lance-encoding:compression` | `zstd`, `lz4`, `fsst`, `none` | -| `.lance.compression-level` | `lance-encoding:compression-level` | integer string (codec-specific) | -| `.lance.structural-encoding` | `lance-encoding:structural-encoding` | `miniblock`, `fullzip` | -| `.lance.rle-threshold` | `lance-encoding:rle-threshold` | float string in `(0.0, 1.0]` | -| `.lance.bss` | `lance-encoding:bss` | `off`, `on`, `auto` | +| Encoding key | Arrow field metadata key | Valid values | Notes | +|---|---|---|---| +| `compression` | `lance-encoding:compression` | `zstd`, `lz4`, `fsst`, `none` | `zstd` general-purpose high ratio; `lz4` general-purpose fast; `fsst` strings/binary; `none` disables compression | +| `compression-level` | `lance-encoding:compression-level` | integer string (codec-specific upper bound) | honored by `zstd` (other codecs may silently ignore) | +| `structural-encoding` | `lance-encoding:structural-encoding` | `miniblock`, `fullzip` | controls Lance's structural layout | +| `rle-threshold` | `lance-encoding:rle-threshold` | float string in `(0.0, 1.0]` | run-length encoding cutoff | +| `bss` | `lance-encoding:bss` | `off`, `on`, `auto` | byte-stream-split for floats | + +Invalid values produce a clear `IllegalArgumentException` at table-creation time. -Invalid values produce a clear `IllegalArgumentException` at table creation time. +### Nested field addressing (new format only) -**Scope**: only top-level columns are supported. Nested field addressing is deferred. +The dotted path after `.column.` navigates through Spark types: + +| Spark node | Path segment | Example key | +|---|---|---| +| Struct child | child name | `lance.compression.column.events.payload` | +| Array element | literal `element` | `lance.compression.column.tags.element` | +| FixedSizeList element | literal `element` | `lance.compression.column.embeddings.element` | +| Map key | literal `key` | `lance.compression.column.props.key` | +| Map value | literal `value` | `lance.compression.column.props.value` | + +Roles compose for chained array/map nesting: + +```text +lance.compression.column.items.element.value = zstd # ARRAY> — value of map inside array +lance.compression.column.m.value.element = lz4 # MAP<.., ARRAY<..>> — element of array inside map value +``` + +Path resolution is type-guided: the current Spark node decides how the next segment is interpreted, +so a struct child literally named `element` / `key` / `value` is unambiguous (one extra segment to +reach it). Path depth is bounded at 16 segments (matches `LanceEncodingUtils.MAX_PATH_DEPTH`). +Top-level columns whose names contain a literal dot remain reachable via the legacy format only; +nested fields whose names contain a dot are unreachable in this revision (follow-up). + +Unrecognised keys, paths that don't resolve to any field, and unknown role segments are silently +ignored — consistent with the other connector property passes (`addVectorMetadata`, etc.). +Malformed new-format keys with an empty path are rejected with `IllegalArgumentException` at +table-creation time. Empty-segment or over-depth keys are also rejected unless the same literal key +is a legacy-shaped `.lance.` property. Those keys keep legacy behavior: they are +applied to the top-level column when it exists, or ignored when that legacy column does not exist. ### Deferred keys (not wired in Spark yet) @@ -535,3 +580,55 @@ of previously written data. Compression properties are applied only when the tab .tableProperty("ts.lance.compression", "none") .createOrReplace(); ``` + +### Nested example + +=== "SQL" + ```sql + CREATE TABLE orders ( + customer STRUCT>, + items ARRAY>, + tags ARRAY, + metadata MAP + ) USING lance + TBLPROPERTIES ( + 'lance.compression.column.customer.address.street' = 'zstd', + 'lance.compression-level.column.customer.address.street' = '3', + 'lance.compression.column.items.element.sku' = 'fsst', + 'lance.compression.column.tags.element' = 'lz4', + 'lance.compression.column.metadata.value' = 'zstd' + ); + ``` + +=== "Python" + ```python + df.writeTo("orders") \ + .tableProperty("lance.compression.column.customer.address.street", "zstd") \ + .tableProperty("lance.compression-level.column.customer.address.street", "3") \ + .tableProperty("lance.compression.column.items.element.sku", "fsst") \ + .tableProperty("lance.compression.column.tags.element", "lz4") \ + .tableProperty("lance.compression.column.metadata.value", "zstd") \ + .createOrReplace() + ``` + +=== "Scala" + ```scala + df.writeTo("orders") + .tableProperty("lance.compression.column.customer.address.street", "zstd") + .tableProperty("lance.compression-level.column.customer.address.street", "3") + .tableProperty("lance.compression.column.items.element.sku", "fsst") + .tableProperty("lance.compression.column.tags.element", "lz4") + .tableProperty("lance.compression.column.metadata.value", "zstd") + .createOrReplace() + ``` + +=== "Java" + ```java + df.writeTo("orders") + .tableProperty("lance.compression.column.customer.address.street", "zstd") + .tableProperty("lance.compression-level.column.customer.address.street", "3") + .tableProperty("lance.compression.column.items.element.sku", "fsst") + .tableProperty("lance.compression.column.tags.element", "lz4") + .tableProperty("lance.compression.column.metadata.value", "zstd") + .createOrReplace(); + ``` diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/LanceEncodingUtils.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/LanceEncodingUtils.java index 65277be4..8b214386 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/LanceEncodingUtils.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/LanceEncodingUtils.java @@ -13,7 +13,11 @@ */ package org.lance.spark.utils; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.function.BiConsumer; @@ -21,19 +25,57 @@ * Utility methods and constants for per-column Lance encoding configuration via Spark * TBLPROPERTIES. * - *

The TBLPROPERTIES key convention is {@code .lance.}. These map 1-to-1 to {@code - * lance-encoding:} Arrow field metadata keys, which the Lance Rust encoder reads at write - * time. + *

Two TBLPROPERTIES key formats are supported: * - *

Only connector-supported keys are handled (the five mapped below); dict/minichunk keys are - * deferred. Unmatched column names and type-incompatible combinations are silently ignored — - * semantic validation is left to the Lance Rust encoder. + *

    + *
  1. Legacy (top-level only): {@code .lance.} — e.g. {@code + * payload.lance.compression = zstd}. The {@code } segment is taken verbatim (no + * inner-dot splitting), so legacy keys cannot reach nested fields. + *
  2. New (top-level or nested): {@code lance..column.....} — + * e.g. {@code lance.compression.column.events.payload = zstd}. Path segments navigate struct + * children by name; literal {@code element}, {@code key}, {@code value} navigate array + * elements, map keys, and map values respectively. + *
+ * + *

Both formats accepted indefinitely. When two distinct properties target the same {@code (path, + * rule)}, the new-format entry wins. When a single literal key is parseable as both (because the + * user has a top-level column name that itself looks like a new-format key), the legacy + * interpretation is reserved — keys ending in a legacy suffix such as {@code .lance.compression} + * are never parsed as nested new-format paths. Field names containing dots are not reachable via + * the new format in this revision. + * + *

Nested encoding metadata that crosses an array or map boundary (which has no + * per-element {@link org.apache.spark.sql.types.StructField}) is smuggled on the nearest enclosing + * {@code StructField}'s {@code metadata} under the {@link #LANCE_NESTED_PREFIX} prefix; the Scala + * write path unpacks it onto the corresponding Arrow child {@code Field.metadata}. Metadata for + * paths that traverse only struct children is written natively on the deepest child's metadata with + * no smuggling required. */ public final class LanceEncodingUtils { - /** Domain segment used in TBLPROPERTIES keys: {@code .lance.}. */ + /** Domain segment used in TBLPROPERTIES keys. */ static final String LANCE_PROPERTY_DOMAIN = "lance"; + /** Separator that splits the new-format key into {@code lance.} and dotted path. */ + static final String NEW_FORMAT_SEGMENT = ".column."; + + /** + * Maximum number of dotted segments allowed in a new-format path. Bounds walker recursion cost on + * pathological keys. + */ + static final int MAX_PATH_DEPTH = 16; + + /** + * Prefix for keys smuggled on a {@link org.apache.spark.sql.types.StructField}'s metadata to + * carry nested encoding metadata. {@code LanceArrowUtils.toArrowField} (Scala) consumes these and + * emits them on the corresponding Arrow child {@code Field.metadata}. + * + *

Wire-format invariant: this prefix round-trips through {@code Metadata.json}; + * changing it is forwards-incompatible with datasets whose Spark schema was written under the + * previous value. + */ + public static final String LANCE_NESTED_PREFIX = "lance-nested."; + // TBLPROPERTIES key suffixes (after ".") static final String COMPRESSION_SUFFIX = LANCE_PROPERTY_DOMAIN + ".compression"; static final String COMPRESSION_LEVEL_SUFFIX = LANCE_PROPERTY_DOMAIN + ".compression-level"; @@ -76,6 +118,20 @@ public final class LanceEncodingUtils { LanceEncodingUtils::validateRleThreshold), rule(BSS_SUFFIX, LANCE_ENCODING_BSS, LanceEncodingUtils::validateBssMode)); + /** + * Rules sorted by suffix length descending so {@link #parseLegacy} matches the longest suffix + * first. None of the current five suffixes is a true suffix of another, so removing the sort + * would not change behavior today — kept as defense against future suffix ambiguity. + */ + private static final List RULES_BY_SUFFIX_LENGTH_DESC = sortRulesDesc(); + + private static List sortRulesDesc() { + List sorted = new ArrayList<>(SUPPORTED_ENCODING_PROPERTY_RULES); + sorted.sort( + (a, b) -> Integer.compare(b.getPropertySuffix().length(), a.getPropertySuffix().length())); + return List.copyOf(sorted); + } + private LanceEncodingUtils() { // Utility class } @@ -88,90 +144,312 @@ static List getSupportedEncodingPropertyRules() { return SUPPORTED_ENCODING_PROPERTY_RULES; } - private static void validateCompressionScheme(String columnName, String value) { + /** + * Parse legacy-format keys ({@code .lance.}). The {@code } prefix is taken + * verbatim — NOT split on inner dots — so legacy keys can only address top-level fields. + * Unrecognised keys are skipped silently. Validation of values is deferred to the walker so that + * path-not-found errors can suppress validation noise. + */ + static List parseLegacy(Map properties) { + if (properties == null || properties.isEmpty()) { + return Collections.emptyList(); + } + List out = new ArrayList<>(); + for (Map.Entry e : properties.entrySet()) { + String key = e.getKey(); + if (key == null) { + continue; + } + for (EncodingPropertyRule rule : RULES_BY_SUFFIX_LENGTH_DESC) { + String marker = "." + rule.getPropertySuffix(); + if (!key.endsWith(marker)) { + continue; + } + String columnName = key.substring(0, key.length() - marker.length()); + if (columnName.isEmpty()) { + break; + } + out.add(new ParsedEncodingKey(Collections.singletonList(columnName), rule, e.getValue())); + break; + } + } + return out; + } + + /** + * Parse new-format keys ({@code lance..column.....}). + * + *

    + *
  • Keys not starting with {@code lance.}, keys that are valid legacy-format keys, or keys + * not containing {@code .column.} are skipped silently (other code paths may interpret + * them). + *
  • Keys whose shape matches but whose {@code } portion is not recognised are + * skipped silently — users may have unrelated TBLPROPERTIES in this namespace. + *
  • Non-legacy keys whose shape matches and whose rule is recognised but whose path is empty, + * contains an empty segment, or exceeds {@link #MAX_PATH_DEPTH} segments throw {@link + * IllegalArgumentException}. + *
+ */ + static List parseNewFormat(Map properties) { + if (properties == null || properties.isEmpty()) { + return Collections.emptyList(); + } + String domainPrefix = LANCE_PROPERTY_DOMAIN + "."; + List out = new ArrayList<>(); + for (Map.Entry e : properties.entrySet()) { + String key = e.getKey(); + if (key == null || !key.startsWith(domainPrefix) || isLegacyEncodingPropertyKey(key)) { + continue; + } + int sepIdx = key.indexOf(NEW_FORMAT_SEGMENT); + if (sepIdx < 0) { + continue; + } + String rulePrefix = key.substring(0, sepIdx); + String pathString = key.substring(sepIdx + NEW_FORMAT_SEGMENT.length()); + + EncodingPropertyRule matchedRule = null; + for (EncodingPropertyRule rule : SUPPORTED_ENCODING_PROPERTY_RULES) { + if (rule.getPropertySuffix().equals(rulePrefix)) { + matchedRule = rule; + break; + } + } + if (matchedRule == null) { + continue; + } + + if (pathString.isEmpty()) { + throw new IllegalArgumentException( + "Invalid TBLPROPERTIES key '" + + sanitizeForMessage(key) + + "': missing column path after '" + + NEW_FORMAT_SEGMENT + + "'. Expected shape: 'lance..column.[....]'" + + ", e.g. 'lance.compression.column.payload'."); + } + String[] segments = pathString.split("\\.", -1); + for (int i = 0; i < segments.length; i++) { + if (segments[i].isEmpty()) { + throw new IllegalArgumentException( + "Invalid TBLPROPERTIES key '" + + sanitizeForMessage(key) + + "': empty path segment at index " + + i + + " (segments cannot be empty; check for leading, trailing, " + + "or consecutive dots)."); + } + } + if (segments.length > MAX_PATH_DEPTH) { + throw new IllegalArgumentException( + "Invalid TBLPROPERTIES key '" + + sanitizeForMessage(key) + + "': nested column path has " + + segments.length + + " segments, but at most " + + MAX_PATH_DEPTH + + " are allowed."); + } + out.add(new ParsedEncodingKey(Arrays.asList(segments), matchedRule, e.getValue())); + } + return out; + } + + private static boolean isLegacyEncodingPropertyKey(String key) { + for (EncodingPropertyRule rule : RULES_BY_SUFFIX_LENGTH_DESC) { + String marker = "." + rule.getPropertySuffix(); + if (key.endsWith(marker) && key.length() > marker.length()) { + return true; + } + } + return false; + } + + private static void validateCompressionScheme(String fullPath, String value) { if (!VALID_COMPRESSION_SCHEMES.contains(value)) { throw new IllegalArgumentException( "Column '" - + columnName + + sanitizeForMessage(fullPath) + "': invalid compression scheme '" - + value + + sanitizeForMessage(value) + "'. Valid values: " + VALID_COMPRESSION_SCHEMES); } } // codec-specific upper-bound enforcement is left to the Rust encoder - private static void validateCompressionLevel(String columnName, String value) { + private static void validateCompressionLevel(String fullPath, String value) { int level; try { level = Integer.parseInt(value); } catch (NumberFormatException e) { throw new IllegalArgumentException( "Column '" - + columnName + + sanitizeForMessage(fullPath) + "': invalid compression-level '" - + value + + sanitizeForMessage(value) + "'. Must be a non-negative integer."); } if (level < 0) { throw new IllegalArgumentException( - "Column '" + columnName + "': compression-level '" + value + "' must be non-negative."); + "Column '" + + sanitizeForMessage(fullPath) + + "': compression-level '" + + sanitizeForMessage(value) + + "' must be non-negative."); } } - private static void validateStructuralEncoding(String columnName, String value) { + private static void validateStructuralEncoding(String fullPath, String value) { if (!VALID_STRUCTURAL_ENCODINGS.contains(value)) { throw new IllegalArgumentException( "Column '" - + columnName + + sanitizeForMessage(fullPath) + "': invalid structural-encoding '" - + value + + sanitizeForMessage(value) + "'. Valid values: " + VALID_STRUCTURAL_ENCODINGS); } } - private static void validateRleThreshold(String columnName, String value) { + private static void validateRleThreshold(String fullPath, String value) { float threshold; try { threshold = Float.parseFloat(value); } catch (NumberFormatException e) { throw new IllegalArgumentException( "Column '" - + columnName + + sanitizeForMessage(fullPath) + "': invalid rle-threshold '" - + value + + sanitizeForMessage(value) + "'. Must be a float in (0.0, 1.0]."); } - if (threshold <= 0.0f || threshold > 1.0f) { + // Use a positive predicate so NaN (for which every comparison returns false) is rejected. + if (!(threshold > 0.0f && threshold <= 1.0f)) { throw new IllegalArgumentException( "Column '" - + columnName + + sanitizeForMessage(fullPath) + "': rle-threshold '" - + value + + sanitizeForMessage(value) + "' is out of range. Must be in (0.0, 1.0]."); } } - private static void validateBssMode(String columnName, String value) { + private static void validateBssMode(String fullPath, String value) { if (!VALID_BSS_MODES.contains(value)) { throw new IllegalArgumentException( "Column '" - + columnName + + sanitizeForMessage(fullPath) + "': invalid bss '" - + value + + sanitizeForMessage(value) + "'. Valid values: " + VALID_BSS_MODES); } } + /** + * Replace control characters that some log appenders or terminals interpret as line/record + * separators or render-direction overrides with their printable escapes, before interpolating + * user-controlled strings into exception messages. Covers: + * + *
    + *
  • ASCII CR ({@code U+000D}) and LF ({@code U+000A}) — line separators. + *
  • NUL ({@code U+0000}) — truncates C-string consumers. + *
  • Unicode NEL ({@code U+0085}), Line Separator ({@code U+2028}), Paragraph Separator + * ({@code U+2029}) — line separators in many environments. + *
  • Bidi-override controls ({@code U+202A}–{@code U+202E}, {@code U+2066}–{@code U+2069}) — + * can reorder rendered text on terminals ("Trojan Source", CVE-2021-42574). + *
+ */ + private static String sanitizeForMessage(String s) { + if (s == null) { + return null; + } + StringBuilder sb = null; + for (int i = 0; i < s.length(); i++) { + char c = s.charAt(i); + String escape = controlEscape(c); + if (escape == null) { + if (sb != null) { + sb.append(c); + } + continue; + } + if (sb == null) { + sb = new StringBuilder(s.length() + 8); + sb.append(s, 0, i); + } + sb.append(escape); + } + return sb == null ? s : sb.toString(); + } + + private static String controlEscape(char c) { + switch (c) { + case '\r': + return "\\r"; + case '\n': + return "\\n"; + case '\u0000': + case '\u0085': + case '\u2028': + case '\u2029': + case '\u202A': // LRE + case '\u202B': // RLE + case '\u202C': // PDF + case '\u202D': // LRO + case '\u202E': // RLO + case '\u2066': // LRI + case '\u2067': // RLI + case '\u2068': // FSI + case '\u2069': // PDI + return String.format("\\u%04X", (int) c); + default: + return null; + } + } + private static EncodingPropertyRule rule( String propertySuffix, String arrowMetadataKey, BiConsumer validator) { return new EncodingPropertyRule(propertySuffix, arrowMetadataKey, validator); } + /** A parsed TBLPROPERTY key paired with the rule and value it carries. */ + static final class ParsedEncodingKey { + private final List pathSegments; + private final EncodingPropertyRule rule; + private final String value; + + ParsedEncodingKey(List pathSegments, EncodingPropertyRule rule, String value) { + this.pathSegments = List.copyOf(pathSegments); + this.rule = rule; + this.value = value; + } + + List getPathSegments() { + return pathSegments; + } + + EncodingPropertyRule getRule() { + return rule; + } + + String getValue() { + return value; + } + + /** + * {@code (pathSegments, arrowMetadataKey)} tuple used as a HashSet element to suppress legacy + * entries that a new-format key covers. Returning the segment list directly avoids the + * collision risk a delimited-string encoding would have for user-supplied control characters. + */ + List identityKey() { + return List.of(pathSegments, rule.getArrowMetadataKey()); + } + } + /** Rule descriptor for mapping a Spark TBLPROPERTY to an Arrow field metadata key. */ - static class EncodingPropertyRule { + static final class EncodingPropertyRule { private final String propertySuffix; private final String arrowMetadataKey; private final BiConsumer validator; @@ -195,8 +473,22 @@ String createPropertyKey(String columnName) { return LanceEncodingUtils.createPropertyKey(columnName, propertySuffix); } - void validate(String columnName, String value) { - validator.accept(columnName, value); + /** + * Null-check first so {@link Float#parseFloat(String)} cannot leak an NPE through the + * validator's {@link NumberFormatException} catch, then dispatch to the rule-specific + * validator. The error label is derived from {@link #propertySuffix} (the part after {@code + * lance.}). + */ + void validate(String fullPath, String value) { + if (value == null) { + throw new IllegalArgumentException( + "Column '" + + sanitizeForMessage(fullPath) + + "': " + + propertySuffix.substring(LANCE_PROPERTY_DOMAIN.length() + 1) + + " value must not be null."); + } + validator.accept(fullPath, value); } } } diff --git a/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/SchemaConverter.java b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/SchemaConverter.java index 4a6b046d..a72222a7 100644 --- a/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/SchemaConverter.java +++ b/lance-spark-base_2.12/src/main/java/org/lance/spark/utils/SchemaConverter.java @@ -18,13 +18,19 @@ import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DoubleType; import org.apache.spark.sql.types.FloatType; +import org.apache.spark.sql.types.MapType; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.MetadataBuilder; import org.apache.spark.sql.types.StringType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import static org.lance.spark.utils.BlobUtils.LANCE_ENCODING_BLOB_KEY; import static org.lance.spark.utils.BlobUtils.LANCE_ENCODING_BLOB_VALUE; @@ -292,22 +298,29 @@ private static StructType addLargeVarCharMetadata( } /** - * Adds Lance compression metadata to top-level fields based on connector-supported TBLPROPERTIES. - * Keys matching {@code .lance.} are validated and written as {@code - * lance-encoding:} Arrow field metadata. Invalid values throw {@link - * IllegalArgumentException} at call time. + * Attaches Lance compression metadata at any field depth, parsed from connector-supported + * TBLPROPERTIES (both the legacy {@code .lance.} and the new {@code + * lance..column.} formats — see {@link LanceEncodingUtils} for the format spec and + * smuggling protocol). * - *

Silent-ignore cases (no exception, no metadata written): + *

Throws {@link IllegalArgumentException} for: * *

    - *
  • TBLPROPERTIES keys whose {@code } segment does not match any top-level field — - * consistent with the behavior of the other {@code addX} methods in this class. - *
  • Unrecognised {@code lance.*} key suffixes (e.g. deferred dict/minichunk keys). - *
  • Type-incompatible combinations (e.g. {@code fsst} on a numeric column) — semantic - * validation is left to the Lance Rust encoder. + *
  • An invalid value on a key whose path resolves in the schema. Validation runs only after + * path resolution, so a bad value on an unresolved path is silently ignored. + *
  • A new-format key whose path is empty, contains an empty segment, or exceeds {@link + * LanceEncodingUtils#MAX_PATH_DEPTH} segments. (Keys ending in a legacy suffix are reserved + * for legacy parsing and never reach this throw branch.) *
* - *

Only top-level fields are processed; nested column paths are not supported yet. + *

Silently ignored: + * + *

    + *
  • Paths that don't resolve in the schema — matches the other {@code addX} methods. + *
  • Unrecognised {@code lance.*} suffixes (e.g. deferred dict/minichunk keys). + *
  • Unknown role tokens after an array/map (not {@code element}/{@code key}/{@code value}). + *
  • Type-incompatible combinations (e.g. {@code fsst} on numeric) — left to the Rust encoder. + *
* * @param sparkSchema the Spark StructType (already processed by earlier steps) * @param properties table properties that may contain compression metadata @@ -319,30 +332,274 @@ private static StructType addCompressionMetadata( return sparkSchema; } - StructField[] newFields = new StructField[sparkSchema.fields().length]; - for (int i = 0; i < sparkSchema.fields().length; i++) { - StructField field = sparkSchema.fields()[i]; - MetadataBuilder builder = new MetadataBuilder().withMetadata(field.metadata()); - boolean modified = false; - - for (LanceEncodingUtils.EncodingPropertyRule rule : - LanceEncodingUtils.getSupportedEncodingPropertyRules()) { - String propertyKey = rule.createPropertyKey(field.name()); - if (!properties.containsKey(propertyKey)) { - continue; + List legacy = LanceEncodingUtils.parseLegacy(properties); + List newFormat = + LanceEncodingUtils.parseNewFormat(properties); + + // New-format wins on (path, rule) collisions. Drop the colliding legacy entry BEFORE + // validation, so a stale invalid legacy value doesn't throw after migration. + List parsed = + new ArrayList<>(legacy.size() + newFormat.size()); + if (!legacy.isEmpty()) { + Set> overrides = new HashSet<>(newFormat.size() * 2); + for (LanceEncodingUtils.ParsedEncodingKey pek : newFormat) { + overrides.add(pek.identityKey()); + } + for (LanceEncodingUtils.ParsedEncodingKey pek : legacy) { + if (!overrides.contains(pek.identityKey())) { + parsed.add(pek); } - String value = properties.get(propertyKey); - rule.validate(field.name(), value); - builder.putString(rule.getArrowMetadataKey(), value); - modified = true; } + } + parsed.addAll(newFormat); + if (parsed.isEmpty()) { + return sparkSchema; + } - newFields[i] = - modified - ? new StructField(field.name(), field.dataType(), field.nullable(), builder.build()) - : field; + AttachResult result = attachMetadataAtPaths(sparkSchema, parsed, Collections.emptyList()); + return (StructType) result.type; + } + + /** + * Recursive walker that attaches encoding metadata at the paths described by {@code keys}. + * Struct-child targets get metadata written directly to {@link StructField#metadata()}. Targets + * reached only through {@link ArrayType} / {@link MapType} (which have no per-element {@code + * StructField}) bubble up as {@code lance-nested.*} keys to the nearest enclosing {@code + * StructField}, with a sub-path of role names ({@code element} / {@code key} / {@code value}). + * Top-level returns always have empty {@link AttachResult#bubbleUp}. + */ + private static AttachResult attachMetadataAtPaths( + DataType dt, List keys, List currentPath) { + if (keys.isEmpty()) { + return new AttachResult(dt, Collections.emptyList()); } - return new StructType(newFields); + if (dt instanceof StructType) { + return attachOnStruct((StructType) dt, keys, currentPath); + } + if (dt instanceof ArrayType) { + return attachOnArray((ArrayType) dt, keys, currentPath); + } + if (dt instanceof MapType) { + return attachOnMap((MapType) dt, keys, currentPath); + } + // Scalar / unsupported leaf — any keys reaching here with more segments are silently + // dropped (path-not-found policy). + return new AttachResult(dt, Collections.emptyList()); + } + + private static AttachResult attachOnStruct( + StructType st, List keys, List currentPath) { + StructField[] children = st.fields(); + StructField[] newChildren = new StructField[children.length]; + boolean anyChange = false; + + for (int i = 0; i < children.length; i++) { + StructField f = children[i]; + List childPath = appendSegment(currentPath, f.name()); + + List directKeys = new ArrayList<>(); + List deeperKeys = new ArrayList<>(); + for (LanceEncodingUtils.ParsedEncodingKey pek : keys) { + List p = pek.getPathSegments(); + if (p.equals(childPath)) { + directKeys.add(pek); + } else if (startsWith(p, childPath)) { + deeperKeys.add(pek); + } + } + + MetadataBuilder builder = new MetadataBuilder().withMetadata(f.metadata()); + boolean fieldChanged = false; + + for (LanceEncodingUtils.ParsedEncodingKey pek : directKeys) { + String dotted = String.join(".", childPath); + pek.getRule().validate(dotted, pek.getValue()); + builder.putString(pek.getRule().getArrowMetadataKey(), pek.getValue()); + fieldChanged = true; + } + + DataType newChildType = f.dataType(); + if (!deeperKeys.isEmpty()) { + AttachResult r = attachMetadataAtPaths(f.dataType(), deeperKeys, childPath); + if (r.type != f.dataType()) { + newChildType = r.type; + fieldChanged = true; + } + for (NestedAssignment na : r.bubbleUp) { + builder.putString(buildNestedMetadataKey(na.subPath, na.arrowKey), na.value); + fieldChanged = true; + } + } + + newChildren[i] = + fieldChanged ? new StructField(f.name(), newChildType, f.nullable(), builder.build()) : f; + if (fieldChanged) { + anyChange = true; + } + } + + StructType newType = anyChange ? new StructType(newChildren) : st; + return new AttachResult(newType, Collections.emptyList()); + } + + private static AttachResult attachOnArray( + ArrayType at, List keys, List currentPath) { + List elementPath = appendSegment(currentPath, "element"); + + List bubbleUp = new ArrayList<>(); + List deeperKeys = new ArrayList<>(); + + for (LanceEncodingUtils.ParsedEncodingKey pek : keys) { + List p = pek.getPathSegments(); + if (!startsWith(p, elementPath)) { + // Silent ignore — segment after currentPath is not the array's role. + continue; + } + if (p.equals(elementPath)) { + bubbleUp.add(validateAndBuildLeafBubble("element", pek)); + } else { + deeperKeys.add(pek); + } + } + + DataType newElType = at.elementType(); + if (!deeperKeys.isEmpty()) { + AttachResult r = attachMetadataAtPaths(at.elementType(), deeperKeys, elementPath); + newElType = r.type; + bubbleUp.addAll(prependRole("element", r.bubbleUp)); + } + + DataType newType = + newElType == at.elementType() ? at : new ArrayType(newElType, at.containsNull()); + return new AttachResult(newType, bubbleUp); + } + + private static AttachResult attachOnMap( + MapType mt, List keys, List currentPath) { + List keyPath = appendSegment(currentPath, "key"); + List valuePath = appendSegment(currentPath, "value"); + + List bubbleUp = new ArrayList<>(); + List keyDeeper = new ArrayList<>(); + List valueDeeper = new ArrayList<>(); + + for (LanceEncodingUtils.ParsedEncodingKey pek : keys) { + List p = pek.getPathSegments(); + if (p.equals(keyPath)) { + bubbleUp.add(validateAndBuildLeafBubble("key", pek)); + } else if (p.equals(valuePath)) { + bubbleUp.add(validateAndBuildLeafBubble("value", pek)); + } else if (startsWith(p, keyPath)) { + keyDeeper.add(pek); + } else if (startsWith(p, valuePath)) { + valueDeeper.add(pek); + } + // else: silent ignore (next segment is not key/value). + } + + DataType newKeyType = mt.keyType(); + DataType newValueType = mt.valueType(); + + if (!keyDeeper.isEmpty()) { + AttachResult r = attachMetadataAtPaths(mt.keyType(), keyDeeper, keyPath); + newKeyType = r.type; + bubbleUp.addAll(prependRole("key", r.bubbleUp)); + } + if (!valueDeeper.isEmpty()) { + AttachResult r = attachMetadataAtPaths(mt.valueType(), valueDeeper, valuePath); + newValueType = r.type; + bubbleUp.addAll(prependRole("value", r.bubbleUp)); + } + + DataType newType = + (newKeyType == mt.keyType() && newValueType == mt.valueType()) + ? mt + : new MapType(newKeyType, newValueType, mt.valueContainsNull()); + return new AttachResult(newType, bubbleUp); + } + + private static String buildNestedMetadataKey(List subPath, String arrowKey) { + return LanceEncodingUtils.LANCE_NESTED_PREFIX + String.join(".", subPath) + "." + arrowKey; + } + + /** + * Validate the value of a parsed key whose target lands at an Array element / Map key / Map value + * at the current level, and return the leaf bubble-up assignment. {@code role} is one of {@code + * element} / {@code key} / {@code value}. + */ + private static NestedAssignment validateAndBuildLeafBubble( + String role, LanceEncodingUtils.ParsedEncodingKey pek) { + String dotted = String.join(".", pek.getPathSegments()); + pek.getRule().validate(dotted, pek.getValue()); + return new NestedAssignment( + Collections.singletonList(role), pek.getRule().getArrowMetadataKey(), pek.getValue()); + } + + /** + * Returns a copy of {@code bubbles} with each assignment's {@code subPath} prefixed by {@code + * role}. Used by {@code attachOnArray} / {@code attachOnMap} to compose role names as assignments + * climb to the nearest enclosing {@link StructField}. + */ + private static List prependRole(String role, List bubbles) { + if (bubbles.isEmpty()) { + return Collections.emptyList(); + } + List out = new ArrayList<>(bubbles.size()); + for (NestedAssignment na : bubbles) { + List newSub = new ArrayList<>(na.subPath.size() + 1); + newSub.add(role); + newSub.addAll(na.subPath); + out.add(new NestedAssignment(newSub, na.arrowKey, na.value)); + } + return out; + } + + private static List appendSegment(List base, String seg) { + List r = new ArrayList<>(base.size() + 1); + r.addAll(base); + r.add(seg); + return r; + } + + /** + * Returns true if {@code a} has {@code b} as a prefix (segment-wise). Equal-length lists with + * equal elements return true — call sites that need strict prefix-of check {@code + * a.equals(b)} separately. + */ + private static boolean startsWith(List a, List b) { + if (a.size() < b.size()) { + return false; + } + for (int i = 0; i < b.size(); i++) { + if (!a.get(i).equals(b.get(i))) { + return false; + } + } + return true; + } + + /** A metadata assignment bubbling up to be stamped on the nearest enclosing StructField. */ + private static final class NestedAssignment { + private final List subPath; + private final String arrowKey; + private final String value; + + NestedAssignment(List subPath, String arrowKey, String value) { + this.subPath = List.copyOf(subPath); + this.arrowKey = arrowKey; + this.value = value; + } + } + + /** Result of {@link #attachMetadataAtPaths}: the rewritten type, plus pending bubble-ups. */ + private static final class AttachResult { + private final DataType type; + private final List bubbleUp; + + AttachResult(DataType type, List bubbleUp) { + this.type = type; + this.bubbleUp = List.copyOf(bubbleUp); + } } } diff --git a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/util/LanceArrowUtils.scala b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/util/LanceArrowUtils.scala index b45ec8a4..29be3f8b 100644 --- a/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/util/LanceArrowUtils.scala +++ b/lance-spark-base_2.12/src/main/scala/org/apache/spark/sql/util/LanceArrowUtils.scala @@ -30,7 +30,7 @@ import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.sql.types._ import org.lance.spark.LanceConstant -import org.lance.spark.utils.{BlobUtils, Float16Utils, LargeVarCharUtils, VectorUtils} +import org.lance.spark.utils.{BlobUtils, Float16Utils, LanceEncodingUtils, LargeVarCharUtils, VectorUtils} import java.util.Locale import java.util.concurrent.atomic.AtomicInteger @@ -46,6 +46,59 @@ object LanceArrowUtils { val ENCODING_BLOB = BlobUtils.LANCE_ENCODING_BLOB_KEY val ARROW_LARGE_VAR_CHAR_KEY = LargeVarCharUtils.ARROW_LARGE_VAR_CHAR_KEY + private val LANCE_NESTED_PREFIX = LanceEncodingUtils.LANCE_NESTED_PREFIX + private val LANCE_ENCODING_KEY_MARKER = ".lance-encoding:" + + /** + * A nested-encoding metadata assignment threaded through the recursive `toArrowField` call. + * `subPath` lists the remaining role-segments (`element`/`key`/`value`) from the current level + * to the target child whose Arrow `FieldType.metadata` should carry `arrowKey -> value`. + */ + private case class NestedAssignment(subPath: List[String], arrowKey: String, value: String) + + private def parseLanceNestedKey(key: String): Option[(List[String], String)] = { + if (!key.startsWith(LANCE_NESTED_PREFIX)) return None + val withoutPrefix = key.substring(LANCE_NESTED_PREFIX.length) + val sepIdx = withoutPrefix.indexOf(LANCE_ENCODING_KEY_MARKER) + if (sepIdx <= 0) return None + val subPathStr = withoutPrefix.substring(0, sepIdx) + // marker is ".lance-encoding:"; sepIdx points at the leading dot, +1 advances past it, + // leaving "lance-encoding:". + val arrowKey = withoutPrefix.substring(sepIdx + 1) + // `subPathStr` is non-empty here (sepIdx > 0), so `split('.').toList` is non-empty too. + // Reject malformed shapes (empty segment from leading/consecutive dots, unknown role) — the + // Java writer never emits these, so this is defensive against corrupted/hand-edited Metadata. + val subPath = subPathStr.split('.').toList + if (subPath.exists(s => s.isEmpty || (s != "element" && s != "key" && s != "value"))) { + return None + } + Some((subPath, arrowKey)) + } + + private def buildAssignmentMetadata(applyHere: List[NestedAssignment]): Metadata = { + if (applyHere.isEmpty) { + null + } else { + val mb = new MetadataBuilder() + applyHere.foreach(a => mb.putString(a.arrowKey, a.value)) + mb.build() + } + } + + /** + * Same payload as `buildAssignmentMetadata` but returned as a `java.util.Map[String, String]` + * suitable for direct use in `FieldType.metadata`. Used by the FixedSizeList float16 inline + * branch which constructs a `FieldType` without recursing through `toArrowField`. + */ + private def assignmentsToArrowMap( + applyHere: List[NestedAssignment]): java.util.Map[String, String] = { + if (applyHere.isEmpty) { + java.util.Collections.emptyMap[String, String]() + } else { + applyHere.map(a => a.arrowKey -> a.value).toMap.asJava + } + } + def fromArrowField(field: Field): DataType = { field.getType match { // Handle unsigned integers by mapping to larger signed types @@ -196,8 +249,20 @@ object LanceArrowUtils { timeZoneId: String, metadata: org.apache.spark.sql.types.Metadata = null, largeVarTypes: Boolean = false): Field = { + toArrowField(name, dt, nullable, timeZoneId, metadata, largeVarTypes, List.empty) + } + + private def toArrowField( + name: String, + dt: DataType, + nullable: Boolean, + timeZoneId: String, + metadata: org.apache.spark.sql.types.Metadata, + largeVarTypes: Boolean, + nestedAssignments: List[NestedAssignment]): Field = { var large: Boolean = largeVarTypes var meta: Map[String, String] = Map.empty + var carriedAssignments: List[NestedAssignment] = nestedAssignments if (metadata != null) { if (metadata.contains(ENCODING_BLOB) @@ -209,13 +274,28 @@ object LanceArrowUtils { large = true } - meta = mapper + val rawMeta = mapper .readValue(metadata.json, classOf[java.util.LinkedHashMap[_, _]]) .asScala.map { case (k, v) => (k.toString, String.valueOf(v)) }.toMap + + val (nestedRaw, others) = + rawMeta.partition { case (k, _) => k.startsWith(LANCE_NESTED_PREFIX) } + meta = others + + val parsedFromMeta = nestedRaw.toList.flatMap { case (k, v) => + parseLanceNestedKey(k).map { case (sp, ak) => NestedAssignment(sp, ak, v) } + } + carriedAssignments = carriedAssignments ++ parsedFromMeta } dt match { case ArrayType(elementType, containsNull) => + val (forElement, _) = + carriedAssignments.partition(_.subPath.headOption.contains("element")) + val (applyAtEl, deeperAtEl) = forElement.partition(_.subPath.size == 1) + val elMeta = buildAssignmentMetadata(applyAtEl) + val deeperEl = deeperAtEl.map(a => a.copy(subPath = a.subPath.tail)) + if (shouldBeFixedSizeList(metadata, elementType)) { val listSize = metadata.getLong(ARROW_FIXED_SIZE_LIST_SIZE_KEY).toInt val fieldType = @@ -233,7 +313,8 @@ object LanceArrowUtils { new FieldType( containsNull, new ArrowType.FloatingPoint(FloatingPointPrecision.HALF), - null), + null, + assignmentsToArrowMap(applyAtEl)), Seq.empty[Field].asJava) } else { toArrowField( @@ -241,7 +322,9 @@ object LanceArrowUtils { elementType, containsNull, timeZoneId, - largeVarTypes = largeVarTypes) + elMeta, + largeVarTypes, + deeperEl) } new Field( name, @@ -258,9 +341,13 @@ object LanceArrowUtils { elementType, containsNull, timeZoneId, - largeVarTypes = largeVarTypes)).asJava) + elMeta, + largeVarTypes, + deeperEl)).asJava) } case StructType(fields) => + // Carried assignments should normally be empty here — struct-child metadata flows + // natively via `field.metadata`. Drop any leftover defensively. val fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, null, meta.asJava) new Field( name, @@ -275,19 +362,44 @@ object LanceArrowUtils { largeVarTypes) }.toSeq.asJava) case MapType(keyType, valueType, valueContainsNull) => + val (forKey, restAfterKey) = + carriedAssignments.partition(_.subPath.headOption.contains("key")) + val (forValue, _) = restAfterKey.partition(_.subPath.headOption.contains("value")) + val (applyAtKey, deeperAtKey) = forKey.partition(_.subPath.size == 1) + val (applyAtVal, deeperAtVal) = forValue.partition(_.subPath.size == 1) + val keyMeta = buildAssignmentMetadata(applyAtKey) + val valMeta = buildAssignmentMetadata(applyAtVal) + val deeperKey = deeperAtKey.map(a => a.copy(subPath = a.subPath.tail)) + val deeperVal = deeperAtVal.map(a => a.copy(subPath = a.subPath.tail)) + + // Build key/value entries directly so per-child metadata + assignments can be plumbed + // through. The synthesized entries struct has no metadata of its own. + val keyField = toArrowField( + MapVector.KEY_NAME, + keyType, + nullable = false, + timeZoneId, + keyMeta, + largeVarTypes, + deeperKey) + val valueField = toArrowField( + MapVector.VALUE_NAME, + valueType, + valueContainsNull, + timeZoneId, + valMeta, + largeVarTypes, + deeperVal) + val entriesField = new Field( + MapVector.DATA_VECTOR_NAME, + new FieldType( + false, + ArrowType.Struct.INSTANCE, + null, + java.util.Collections.emptyMap[String, String]()), + Seq(keyField, valueField).asJava) val mapType = new FieldType(nullable, new ArrowType.Map(false), null, meta.asJava) - // Note: Map Type struct can not be null, Struct Type key field can not be null - new Field( - name, - mapType, - Seq(toArrowField( - MapVector.DATA_VECTOR_NAME, - new StructType() - .add(MapVector.KEY_NAME, keyType, nullable = false) - .add(MapVector.VALUE_NAME, valueType, nullable = valueContainsNull), - nullable = false, - timeZoneId, - largeVarTypes = largeVarTypes)).asJava) + new Field(name, mapType, Seq(entriesField).asJava) case udt: UserDefinedType[_] => toArrowField(name, udt.sqlType, nullable, timeZoneId, largeVarTypes = largeVarTypes) case dataType => diff --git a/lance-spark-base_2.12/src/test/java/org/lance/spark/utils/SchemaConverterNestedCompressionTest.java b/lance-spark-base_2.12/src/test/java/org/lance/spark/utils/SchemaConverterNestedCompressionTest.java new file mode 100644 index 00000000..82728bf1 --- /dev/null +++ b/lance-spark-base_2.12/src/test/java/org/lance/spark/utils/SchemaConverterNestedCompressionTest.java @@ -0,0 +1,1071 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.spark.utils; + +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.LanceArrowUtils; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +/** + * Coverage for nested-column compression metadata on TBLPROPERTIES (issue #434). All cases call + * {@link SchemaConverter#processSchemaWithProperties} then {@link LanceArrowUtils#toArrowSchema} + * and walk the resulting Arrow {@link Schema} to assert metadata lands on the right Arrow {@link + * Field}. + */ +public class SchemaConverterNestedCompressionTest { + + @Test + public void testNestedStructNewFormat() { + StructType schema = + topStruct( + structField( + "events", + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("id", DataTypes.IntegerType, false), + DataTypes.createStructField("payload", DataTypes.StringType, true), + }))); + Map props = new HashMap<>(); + props.put("lance.compression.column.events.payload", "zstd"); + + Schema arrow = process(schema, props); + Field payload = walk(arrow, "events", "payload"); + assertEquals("zstd", arrowMetadata(payload).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + assertOuterClean(walk(arrow, "events")); + } + + @Test + public void testLegacyFormatTopLevelStillWorks() { + StructType schema = topStruct(structField("payload", DataTypes.StringType)); + Map props = new HashMap<>(); + props.put("payload.lance.compression", "zstd"); + + Schema arrow = process(schema, props); + Field payload = walk(arrow, "payload"); + assertEquals("zstd", arrowMetadata(payload).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testLegacyFormatWinsForTopLevelColumnNameThatLooksLikeNewFormat() { + String columnName = "lance.compression.column.a..b"; + StructType schema = topStruct(structField(columnName, DataTypes.StringType)); + Map props = new HashMap<>(); + props.put(columnName + ".lance.compression", "zstd"); + + Schema arrow = process(schema, props); + Field field = walk(arrow, columnName); + assertEquals("zstd", arrowMetadata(field).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testStaleLegacyFormatThatLooksLikeMalformedNewFormatIsIgnored() { + StructType schema = topStruct(structField("payload", DataTypes.StringType)); + Map props = new HashMap<>(); + props.put("lance.compression.column.a..b.lance.compression", "zstd"); + + Schema arrow = process(schema, props); + Field field = walk(arrow, "payload"); + assertFalse(arrowMetadata(field).containsKey(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testLegacyFormatDoesNotStampNestedPathWhenColumnNameLooksLikeNewFormat() { + String columnName = "lance.compression.column.payload"; + StructType schema = + new StructType( + new StructField[] { + structField( + "payload", + DataTypes.createStructType( + new StructField[] { + structField( + "lance", + DataTypes.createStructType( + new StructField[] { + structField("compression", DataTypes.StringType) + })) + })), + structField(columnName, DataTypes.StringType) + }); + Map props = new HashMap<>(); + props.put(columnName + ".lance.compression", "zstd"); + + Schema arrow = process(schema, props); + Field legacyField = walk(arrow, columnName); + Field nestedField = walk(arrow, "payload", "lance", "compression"); + assertEquals( + "zstd", arrowMetadata(legacyField).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + assertFalse( + arrowMetadata(nestedField).containsKey(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testLegacyShapedKeyDoesNotTargetNestedPathWhenTopLevelColumnIsStale() { + StructType schema = + topStruct( + structField( + "payload", + DataTypes.createStructType( + new StructField[] { + structField( + "lance", + DataTypes.createStructType( + new StructField[] {structField("compression", DataTypes.StringType)})) + }))); + Map props = new HashMap<>(); + props.put("lance.compression.column.payload.lance.compression", "zstd"); + + Schema arrow = process(schema, props); + Field field = walk(arrow, "payload", "lance", "compression"); + assertFalse(arrowMetadata(field).containsKey(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testLegacyFormatDoesNotReachNested() { + StructType schema = + topStruct( + structField( + "events", + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("payload", DataTypes.StringType, true), + }))); + Map props = new HashMap<>(); + props.put("events.payload.lance.compression", "zstd"); + + Schema arrow = process(schema, props); + Field payload = walk(arrow, "events", "payload"); + assertNull(arrowMetadata(payload).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testNewFormatOverridesLegacyAtTopLevel() { + StructType schema = topStruct(structField("payload", DataTypes.StringType)); + Map props = new HashMap<>(); + props.put("payload.lance.compression", "zstd"); + props.put("lance.compression.column.payload", "lz4"); + + Schema arrow = process(schema, props); + Field payload = walk(arrow, "payload"); + assertEquals("lz4", arrowMetadata(payload).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testArrayElementListCompression() { + StructType schema = + topStruct(structField("tags", DataTypes.createArrayType(DataTypes.StringType, true))); + Map props = new HashMap<>(); + props.put("lance.compression.column.tags.element", "lz4"); + + Schema arrow = process(schema, props); + Field outer = walk(arrow, "tags"); + assertTrue(outer.getType() instanceof ArrowType.List, "Expected List, got " + outer.getType()); + Field element = walk(arrow, "tags", "element"); + assertEquals("lz4", arrowMetadata(element).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + assertOuterClean(outer); + } + + @Test + public void testArrayElementFixedSizeListCompression() { + StructType schema = + topStruct(structField("embeddings", DataTypes.createArrayType(DataTypes.FloatType, false))); + Map props = new HashMap<>(); + props.put("embeddings.arrow.fixed-size-list.size", "128"); + props.put("lance.compression.column.embeddings.element", "zstd"); + + Schema arrow = process(schema, props); + Field outer = walk(arrow, "embeddings"); + assertTrue( + outer.getType() instanceof ArrowType.FixedSizeList, + "Expected FixedSizeList, got " + outer.getType()); + Field element = walk(arrow, "embeddings", "element"); + assertEquals("zstd", arrowMetadata(element).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + assertOuterClean(outer); + } + + @Test + public void testMapKeyCompression() { + StructType schema = + topStruct( + structField( + "props", + DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType, true))); + Map props = new HashMap<>(); + props.put("lance.compression.column.props.key", "zstd"); + + Schema arrow = process(schema, props); + Field key = walk(arrow, "props", "key"); + assertEquals("zstd", arrowMetadata(key).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + assertOuterClean(walk(arrow, "props")); + } + + @Test + public void testMapValueCompression() { + StructType schema = + topStruct( + structField( + "props", + DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType, true))); + Map props = new HashMap<>(); + props.put("lance.compression.column.props.value", "zstd"); + + Schema arrow = process(schema, props); + Field value = walk(arrow, "props", "value"); + assertEquals("zstd", arrowMetadata(value).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + assertOuterClean(walk(arrow, "props")); + } + + @Test + public void testDeepNestedStruct() { + StructType schema = + topStruct( + structField( + "a", + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField( + "b", + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("c", DataTypes.StringType, true), + }), + true), + }))); + Map props = new HashMap<>(); + props.put("lance.compression.column.a.b.c", "zstd"); + + Schema arrow = process(schema, props); + Field c = walk(arrow, "a", "b", "c"); + assertEquals("zstd", arrowMetadata(c).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + assertOuterClean(walk(arrow, "a")); + assertOuterClean(walk(arrow, "a", "b")); + } + + @Test + public void testMixedStructInArrayInStruct() { + StructType inner = + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("body", DataTypes.StringType, true), + }); + StructType schema = + topStruct( + structField( + "outer", + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField( + "items", DataTypes.createArrayType(inner, true), true), + }))); + Map props = new HashMap<>(); + props.put("lance.compression.column.outer.items.element.body", "zstd"); + + Schema arrow = process(schema, props); + Field body = walk(arrow, "outer", "items", "element", "body"); + assertEquals("zstd", arrowMetadata(body).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testMultipleRulesOnSamePath() { + StructType schema = + topStruct( + structField( + "events", + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("payload", DataTypes.StringType, true), + }))); + Map props = new HashMap<>(); + props.put("lance.compression.column.events.payload", "zstd"); + props.put("lance.compression-level.column.events.payload", "3"); + + Schema arrow = process(schema, props); + Field payload = walk(arrow, "events", "payload"); + Map meta = arrowMetadata(payload); + assertEquals("zstd", meta.get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + assertEquals("3", meta.get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION_LEVEL)); + } + + @Test + public void testPathNotFoundIsSilent() { + StructType schema = topStruct(structField("payload", DataTypes.StringType)); + Map props = new HashMap<>(); + props.put("lance.compression.column.nonexistent.sub", "zstd"); + + Schema arrow = process(schema, props); + Field payload = walk(arrow, "payload"); + assertNull(arrowMetadata(payload).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testPathNotFoundLegacyIsSilent() { + StructType schema = topStruct(structField("payload", DataTypes.StringType)); + Map props = new HashMap<>(); + props.put("missing.lance.compression", "zstd"); + + Schema arrow = process(schema, props); + Field payload = walk(arrow, "payload"); + assertNull(arrowMetadata(payload).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testInvalidValueThrowsWithFullPath() { + StructType schema = + topStruct( + structField( + "events", + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("payload", DataTypes.StringType, true), + }))); + Map props = new HashMap<>(); + props.put("lance.compression.column.events.payload", "gzip"); + + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> SchemaConverter.processSchemaWithProperties(schema, props)); + assertTrue( + ex.getMessage().contains("events.payload"), + "Expected error to mention full path 'events.payload', got: " + ex.getMessage()); + } + + @Test + public void testUnknownEncodingKeyIsSilent() { + StructType schema = topStruct(structField("payload", DataTypes.StringType)); + Map props = new HashMap<>(); + // Shape matches new format (lance prefix + .column.) but rule prefix "lance" matches no rule. + props.put("lance.column.foo", "zstd"); + + Schema arrow = process(schema, props); + Field payload = walk(arrow, "payload"); + assertNull(arrowMetadata(payload).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testMalformedKeyEmptyPath() { + StructType schema = topStruct(structField("payload", DataTypes.StringType)); + Map props = new HashMap<>(); + props.put("lance.compression.column.", "zstd"); + + assertThrows( + IllegalArgumentException.class, + () -> SchemaConverter.processSchemaWithProperties(schema, props)); + } + + @Test + public void testKeyMissingColumnSeparatorIsSilent() { + StructType schema = topStruct(structField("payload", DataTypes.StringType)); + Map props = new HashMap<>(); + // No trailing dot — '.column.' shape predicate fails, no legacy suffix match either. + props.put("lance.compression.column", "zstd"); + + Schema arrow = process(schema, props); + Field payload = walk(arrow, "payload"); + assertNull(arrowMetadata(payload).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testPathDepthLimitThrows() { + StructType schema = topStruct(structField("payload", DataTypes.StringType)); + Map props = new HashMap<>(); + StringBuilder sb = new StringBuilder("lance.compression.column"); + for (int i = 1; i <= LanceEncodingUtils.MAX_PATH_DEPTH + 1; i++) { + sb.append(".a").append(i); + } + props.put(sb.toString(), "zstd"); + + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> SchemaConverter.processSchemaWithProperties(schema, props)); + String msg = ex.getMessage(); + assertTrue( + msg.contains(String.valueOf(LanceEncodingUtils.MAX_PATH_DEPTH)), + "Expected depth limit value in error, got: " + msg); + assertTrue( + msg.contains("segments"), + "Expected user-friendly 'segments' wording in error, got: " + msg); + } + + @Test + public void testStaleLegacyFormatThatLooksLikeOverDepthNewFormatIsIgnored() { + StructType schema = topStruct(structField("payload", DataTypes.StringType)); + Map props = new HashMap<>(); + StringBuilder sb = new StringBuilder("lance.compression.column"); + for (int i = 1; i <= LanceEncodingUtils.MAX_PATH_DEPTH + 1; i++) { + sb.append(".a").append(i); + } + sb.append(".lance.compression"); + props.put(sb.toString(), "zstd"); + + Schema arrow = process(schema, props); + Field payload = walk(arrow, "payload"); + assertFalse(arrowMetadata(payload).containsKey(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testStructChildNamedElementInsideArrayIsUnambiguous() { + StructType inner = + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("element", DataTypes.StringType, true), + }); + StructType schema = + topStruct( + structField( + "outer", + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField( + "items", DataTypes.createArrayType(inner, true), true), + }))); + Map props = new HashMap<>(); + // outer.items (array) -> element role -> struct -> child literally named "element" + props.put("lance.compression.column.outer.items.element.element", "zstd"); + + Schema arrow = process(schema, props); + Field el = walk(arrow, "outer", "items", "element", "element"); + assertEquals("zstd", arrowMetadata(el).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + // Float16 path is gated by Arrow 18+ (Spark 4.0+); the base module uses an older Arrow that + // lacks Float2Vector. Exercise prior-pass coexistence with plain FixedSizeList only here — + // float16-specific coverage lives in `testPreservesFloat16AndNestedCompression` below. + @Test + public void testPreservesEarlierMetadataPasses() { + StructType schema = + topStruct(structField("embeddings", DataTypes.createArrayType(DataTypes.FloatType, false))); + Map props = new HashMap<>(); + props.put("embeddings.arrow.fixed-size-list.size", "128"); + props.put("lance.compression.column.embeddings.element", "zstd"); + + Schema arrow = process(schema, props); + Field outer = walk(arrow, "embeddings"); + assertTrue(outer.getType() instanceof ArrowType.FixedSizeList); + Map outerMeta = arrowMetadata(outer); + assertTrue( + outerMeta.containsKey("arrow.fixed-size-list.size"), + "Outer metadata should retain fixed-size-list size: " + outerMeta); + assertOuterClean(outer); + + Field element = walk(arrow, "embeddings", "element"); + assertEquals("zstd", arrowMetadata(element).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + // float16 + nested-compression coexistence. Skipped on Arrow <18 (Spark 3.4/3.5) + // and exercised on Spark 4.0+. Pinned here so the float16 inline-element FieldType path in + // toArrowField does pick up applyAtElement metadata. + @Test + public void testPreservesFloat16AndNestedCompression() { + assumeTrue( + Float16Utils.isFloat2VectorAvailable(), + "Float16 vectors require Arrow 18+ (Spark 4.0+); skipping on this runtime."); + + StructType schema = + topStruct(structField("embeddings", DataTypes.createArrayType(DataTypes.FloatType, false))); + Map props = new HashMap<>(); + props.put("embeddings.arrow.fixed-size-list.size", "128"); + props.put("embeddings.arrow.float16", "true"); + props.put("lance.compression.column.embeddings.element", "zstd"); + + Schema arrow = process(schema, props); + Field outer = walk(arrow, "embeddings"); + assertTrue(outer.getType() instanceof ArrowType.FixedSizeList); + Map outerMeta = arrowMetadata(outer); + assertTrue( + outerMeta.containsKey("arrow.fixed-size-list.size"), + "Outer should retain fixed-size-list size: " + outerMeta); + assertOuterClean(outer); + + Field element = walk(arrow, "embeddings", "element"); + assertEquals("zstd", arrowMetadata(element).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + assertTrue( + element.getType() instanceof ArrowType.FloatingPoint, + "Expected float16 element, got " + element.getType()); + } + + @Test + public void testNestedMetadataKeysStrippedFromOuterField() { + StructType schema = + topStruct(structField("tags", DataTypes.createArrayType(DataTypes.StringType, true))); + Map props = new HashMap<>(); + props.put("lance.compression.column.tags.element", "lz4"); + + Schema arrow = process(schema, props); + Field outer = walk(arrow, "tags"); + Map outerMeta = arrowMetadata(outer); + for (String k : outerMeta.keySet()) { + assertFalse( + k.startsWith(LanceEncodingUtils.LANCE_NESTED_PREFIX), + "lance-nested.* key leaked onto outer Arrow Field: " + k); + } + } + + // Pin: legacy `col.lance.compression-level` routes to compression-level, not compression. + // (No actual ambiguity today since neither suffix ends-with the other, but pin the behavior.) + @Test + public void testLegacyCompressionLevelKeyRoutesToCompressionLevelRule() { + StructType schema = topStruct(structField("col", DataTypes.StringType)); + Map props = new HashMap<>(); + props.put("col.lance.compression-level", "3"); + + Schema arrow = process(schema, props); + Field col = walk(arrow, "col"); + Map meta = arrowMetadata(col); + assertEquals("3", meta.get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION_LEVEL)); + assertNull( + meta.get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION), + "compression-level key should NOT match the compression rule"); + } + + @Test + public void testLegacyDottedTopLevelColumnStillWorks() { + StructType schema = topStruct(structField("a.b", DataTypes.StringType)); + Map props = new HashMap<>(); + props.put("a.b.lance.compression", "zstd"); + + Schema arrow = process(schema, props); + Field f = walk(arrow, "a.b"); + assertEquals("zstd", arrowMetadata(f).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testPathDepthAtLimitSucceeds() { + final int depth = LanceEncodingUtils.MAX_PATH_DEPTH; + DataType inner = DataTypes.StringType; + for (int i = depth; i >= 1; i--) { + inner = + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("l" + i, inner, true), + }); + } + // `inner` is now top-level wrapper STRUCT>; unwrap once for top struct. + StructType wrap = (StructType) inner; + StructField only = wrap.fields()[0]; + StructType schema = topStruct(only); + + StringBuilder pathKey = new StringBuilder("lance.compression.column"); + for (int i = 1; i <= depth; i++) { + pathKey.append(".l").append(i); + } + Map props = new HashMap<>(); + props.put(pathKey.toString(), "zstd"); + + Schema arrow = process(schema, props); + String[] segs = new String[depth]; + for (int i = 0; i < depth; i++) { + segs[i] = "l" + (i + 1); + } + Field deepest = walk(arrow, segs); + assertEquals("zstd", arrowMetadata(deepest).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testMapInsideArrayElement() { + StructType schema = + topStruct( + structField( + "items", + DataTypes.createArrayType( + DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType, true), + true))); + Map props = new HashMap<>(); + props.put("lance.compression.column.items.element.value", "zstd"); + + StructType processed = SchemaConverter.processSchemaWithProperties(schema, props); + StructField items = processed.apply("items"); + String expectedKey = + LanceEncodingUtils.LANCE_NESTED_PREFIX + + "element.value." + + LanceEncodingUtils.LANCE_ENCODING_COMPRESSION; + assertTrue( + items.metadata().contains(expectedKey), + "Expected smuggling key '" + expectedKey + "' on items metadata, got: " + items.metadata()); + assertEquals("zstd", items.metadata().getString(expectedKey)); + + Schema arrow = LanceArrowUtils.toArrowSchema(processed, "UTC", false); + Field value = walk(arrow, "items", "element", "value"); + assertEquals("zstd", arrowMetadata(value).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testArrayInsideMapValue() { + StructType schema = + topStruct( + structField( + "m", + DataTypes.createMapType( + DataTypes.StringType, + DataTypes.createArrayType(DataTypes.StringType, true), + true))); + Map props = new HashMap<>(); + props.put("lance.compression.column.m.value.element", "lz4"); + + StructType processed = SchemaConverter.processSchemaWithProperties(schema, props); + StructField m = processed.apply("m"); + String expectedKey = + LanceEncodingUtils.LANCE_NESTED_PREFIX + + "value.element." + + LanceEncodingUtils.LANCE_ENCODING_COMPRESSION; + assertTrue( + m.metadata().contains(expectedKey), + "Expected smuggling key '" + expectedKey + "' on m metadata, got: " + m.metadata()); + + Schema arrow = LanceArrowUtils.toArrowSchema(processed, "UTC", false); + Field elementOfArray = walk(arrow, "m", "value", "element"); + assertEquals( + "lz4", arrowMetadata(elementOfArray).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + // regression: schema field literally named `column` must not collide with the + // `.column.` separator. New format: parser uses indexOf which finds the first occurrence. + @Test + public void testFieldNamedColumnReachableViaNewFormat() { + StructType schema = topStruct(structField("column", DataTypes.StringType)); + Map props = new HashMap<>(); + props.put("lance.compression.column.column", "zstd"); + + Schema arrow = process(schema, props); + Field f = walk(arrow, "column"); + assertEquals("zstd", arrowMetadata(f).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + // regression: a struct child literally named `column` is reachable. + @Test + public void testStructChildLiterallyNamedColumn() { + StructType schema = + topStruct( + structField( + "outer", + DataTypes.createStructType( + new StructField[] { + DataTypes.createStructField("column", DataTypes.StringType, true), + }))); + Map props = new HashMap<>(); + props.put("lance.compression.column.outer.column", "zstd"); + + Schema arrow = process(schema, props); + Field child = walk(arrow, "outer", "column"); + assertEquals("zstd", arrowMetadata(child).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testInvalidValueOnArrayElementErrorMessageContainsRoleSegmentedPath() { + StructType schema = + topStruct(structField("tags", DataTypes.createArrayType(DataTypes.StringType, true))); + Map props = new HashMap<>(); + props.put("lance.compression.column.tags.element", "gzip"); + + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> SchemaConverter.processSchemaWithProperties(schema, props)); + assertTrue( + ex.getMessage().contains("tags.element"), + "Expected role-segmented path 'tags.element' in error, got: " + ex.getMessage()); + } + + @Test + public void testInvalidValueOnMapKeyErrorMessageContainsRoleSegmentedPath() { + StructType schema = + topStruct( + structField( + "props", + DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType, true))); + Map p = new HashMap<>(); + p.put("lance.compression.column.props.key", "gzip"); + + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> SchemaConverter.processSchemaWithProperties(schema, p)); + assertTrue( + ex.getMessage().contains("props.key"), + "Expected role-segmented path 'props.key' in error, got: " + ex.getMessage()); + } + + @Test + public void testInvalidValueOnMapValueErrorMessageContainsRoleSegmentedPath() { + StructType schema = + topStruct( + structField( + "props", + DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType, true))); + Map p = new HashMap<>(); + p.put("lance.compression.column.props.value", "gzip"); + + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> SchemaConverter.processSchemaWithProperties(schema, p)); + assertTrue( + ex.getMessage().contains("props.value"), + "Expected role-segmented path 'props.value' in error, got: " + ex.getMessage()); + } + + @Test + public void testNewFormatSpansMultipleFieldsIndependently() { + StructType schema = + new StructType( + new StructField[] { + structField("a", DataTypes.StringType), + structField("b", DataTypes.StringType), + structField("c", DataTypes.StringType), + }); + Map props = new HashMap<>(); + props.put("lance.compression.column.a", "zstd"); + props.put("lance.compression.column.b", "lz4"); + props.put("lance.bss.column.c", "on"); + + Schema arrow = process(schema, props); + assertEquals( + "zstd", arrowMetadata(walk(arrow, "a")).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + assertEquals( + "lz4", arrowMetadata(walk(arrow, "b")).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + assertEquals("on", arrowMetadata(walk(arrow, "c")).get(LanceEncodingUtils.LANCE_ENCODING_BSS)); + } + + @Test + public void testNewFormatOverridesLegacyAvoidsValidationOnLegacyValue() { + // Stale legacy value 'gzip' (invalid) coexists with valid new-format 'lz4' on the same path. + // The walker must drop legacy before validation, so this should NOT throw. + StructType schema = topStruct(structField("payload", DataTypes.StringType)); + Map props = new HashMap<>(); + props.put("payload.lance.compression", "gzip"); + props.put("lance.compression.column.payload", "lz4"); + + Schema arrow = process(schema, props); + Field payload = walk(arrow, "payload"); + assertEquals("lz4", arrowMetadata(payload).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testEmptyMiddlePathSegmentThrows() { + StructType schema = topStruct(structField("payload", DataTypes.StringType)); + Map props = new HashMap<>(); + props.put("lance.compression.column.a..b", "zstd"); + + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> SchemaConverter.processSchemaWithProperties(schema, props)); + assertTrue( + ex.getMessage().contains("empty path segment at index"), + "Expected 'empty path segment at index' message, got: " + ex.getMessage()); + } + + @Test + public void testInvalidStructuralEncodingThrows() { + StructType schema = topStruct(structField("col", DataTypes.StringType)); + Map p = new HashMap<>(); + p.put("lance.structural-encoding.column.col", "bogus"); + + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> SchemaConverter.processSchemaWithProperties(schema, p)); + assertTrue( + ex.getMessage().contains("invalid structural-encoding"), + "Expected 'invalid structural-encoding' message, got: " + ex.getMessage()); + } + + @Test + public void testInvalidBssModeThrows() { + StructType schema = topStruct(structField("col", DataTypes.StringType)); + Map p = new HashMap<>(); + p.put("lance.bss.column.col", "yes"); + + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> SchemaConverter.processSchemaWithProperties(schema, p)); + assertTrue( + ex.getMessage().contains("invalid bss"), + "Expected 'invalid bss' message, got: " + ex.getMessage()); + } + + @Test + public void testInvalidRleThresholdNonNumericThrows() { + StructType schema = topStruct(structField("col", DataTypes.StringType)); + Map p = new HashMap<>(); + p.put("lance.rle-threshold.column.col", "abc"); + + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> SchemaConverter.processSchemaWithProperties(schema, p)); + assertTrue( + ex.getMessage().contains("invalid rle-threshold"), + "Expected 'invalid rle-threshold' message, got: " + ex.getMessage()); + } + + @Test + public void testInvalidRleThresholdNaNAndInfinityRejected() { + // Regression: `Float.parseFloat("NaN")` returns NaN, for which every numeric comparison is + // false; the previous range check `threshold <= 0 || threshold > 1` failed open. The fixed + // positive predicate `!(threshold > 0 && threshold <= 1)` catches NaN and ±Infinity. + StructType schema = topStruct(structField("col", DataTypes.StringType)); + String[] badValues = {"NaN", "Infinity", "-Infinity"}; + for (String v : badValues) { + Map p = new HashMap<>(); + p.put("lance.rle-threshold.column.col", v); + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> SchemaConverter.processSchemaWithProperties(schema, p), + "Expected IAE for rle-threshold value '" + v + "'"); + assertTrue( + ex.getMessage().contains("out of range"), + "Expected 'out of range' message for value '" + v + "', got: " + ex.getMessage()); + } + } + + @Test + public void testInvalidRleThresholdOutOfRangeThrows() { + StructType schema = topStruct(structField("col", DataTypes.StringType)); + Map p = new HashMap<>(); + p.put("lance.rle-threshold.column.col", "1.5"); + + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> SchemaConverter.processSchemaWithProperties(schema, p)); + assertTrue( + ex.getMessage().contains("out of range"), + "Expected 'out of range' message, got: " + ex.getMessage()); + } + + @Test + public void testErrorMessageEscapesNewlinesInColumnName() { + StructType schema = topStruct(structField("a\nb", DataTypes.StringType)); + Map p = new HashMap<>(); + p.put("lance.compression.column.a\nb", "gzip"); + + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> SchemaConverter.processSchemaWithProperties(schema, p)); + assertTrue( + ex.getMessage().contains("\\n"), + "Expected newline in column name to be escaped in message, got: " + ex.getMessage()); + assertFalse( + ex.getMessage().contains("a\nb"), + "Raw newline must not leak into error message: " + ex.getMessage()); + } + + @Test + public void testStrayNonRoleSegmentAfterArrayIsSilentlyIgnored() { + StructType schema = + topStruct(structField("tags", DataTypes.createArrayType(DataTypes.StringType, true))); + Map p = new HashMap<>(); + p.put("lance.compression.column.tags.bogus", "zstd"); + + Schema arrow = process(schema, p); + Field tagsElement = walk(arrow, "tags", "element"); + assertNull( + arrowMetadata(tagsElement).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION), + "Non-element segment after array must not stamp metadata on the element"); + assertOuterClean(walk(arrow, "tags")); + } + + @Test + public void testNullValueOnEachRuleThrowsIllegalArgumentException() { + // Regression: Float.parseFloat(null) throws NPE, not NumberFormatException — so + // validateRleThreshold previously leaked NPE through its try/catch. All validators must + // pre-check null and throw IAE with the standard "Column 'X':" message. + StructType schema = topStruct(structField("col", DataTypes.StringType)); + String[] keys = { + "lance.compression.column.col", + "lance.compression-level.column.col", + "lance.structural-encoding.column.col", + "lance.rle-threshold.column.col", + "lance.bss.column.col", + }; + for (String k : keys) { + Map p = new HashMap<>(); + p.put(k, null); + IllegalArgumentException ex = + assertThrows( + IllegalArgumentException.class, + () -> SchemaConverter.processSchemaWithProperties(schema, p), + "Expected IAE for null value on key '" + k + "'"); + assertTrue( + ex.getMessage().contains("Column 'col'"), + "Expected sanitized column message for key '" + k + "', got: " + ex.getMessage()); + } + } + + @Test + public void testLegacyOverrideIdentityIsCollisionFreeAcrossControlChars() { + // Regression: a column name containing U+0000 must not collide with an unrelated path's + // identityKey. Two separate fields, two separate legacy keys, no new-format overrides — both + // legacy entries must be applied. + String nameA = "a\u0000b"; + String nameC = "c"; + StructType schema = + new StructType( + new StructField[] { + structField(nameA, DataTypes.StringType), structField(nameC, DataTypes.StringType), + }); + Map props = new HashMap<>(); + props.put(nameA + ".lance.compression", "zstd"); + props.put(nameC + ".lance.compression", "lz4"); + + Schema arrow = process(schema, props); + assertEquals( + "zstd", + arrowMetadata(walk(arrow, nameA)).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + assertEquals( + "lz4", + arrowMetadata(walk(arrow, nameC)).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testMapKeyStructChildLiterallyNamedValueIsUnambiguous() { + StructType inner = + DataTypes.createStructType( + new StructField[] {DataTypes.createStructField("value", DataTypes.StringType, true)}); + StructType schema = + topStruct(structField("m", DataTypes.createMapType(inner, DataTypes.StringType, true))); + Map props = new HashMap<>(); + // m -> key (map role) -> struct -> child literally named "value" + props.put("lance.compression.column.m.key.value", "zstd"); + + Schema arrow = process(schema, props); + Field value = walk(arrow, "m", "key", "value"); + assertEquals("zstd", arrowMetadata(value).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + @Test + public void testMapValueStructChildLiterallyNamedKeyIsUnambiguous() { + StructType inner = + DataTypes.createStructType( + new StructField[] {DataTypes.createStructField("key", DataTypes.StringType, true)}); + StructType schema = + topStruct(structField("m", DataTypes.createMapType(DataTypes.StringType, inner, true))); + Map props = new HashMap<>(); + // m -> value (map role) -> struct -> child literally named "key" + props.put("lance.compression.column.m.value.key", "zstd"); + + Schema arrow = process(schema, props); + Field key = walk(arrow, "m", "value", "key"); + assertEquals("zstd", arrowMetadata(key).get(LanceEncodingUtils.LANCE_ENCODING_COMPRESSION)); + } + + // ---------- helpers ---------- + + private static StructField structField(String name, DataType dt) { + return DataTypes.createStructField(name, dt, true); + } + + private static StructType topStruct(StructField only) { + return new StructType(new StructField[] {only}); + } + + private static Schema process(StructType schema, Map props) { + StructType processed = SchemaConverter.processSchemaWithProperties(schema, props); + return LanceArrowUtils.toArrowSchema(processed, "UTC", false); + } + + private static Field walk(Schema schema, String... path) { + Field current = null; + for (Field f : schema.getFields()) { + if (f.getName().equals(path[0])) { + current = f; + break; + } + } + if (current == null) { + throw new AssertionError("No top-level field: " + path[0]); + } + for (int i = 1; i < path.length; i++) { + current = walkChild(current, path[i]); + } + return current; + } + + private static Field walkChild(Field parent, String segment) { + ArrowType type = parent.getType(); + List children = parent.getChildren(); + if (type instanceof ArrowType.List || type instanceof ArrowType.FixedSizeList) { + if (!"element".equals(segment)) { + throw new AssertionError("Expected 'element' under list, got " + segment); + } + return children.get(0); + } + if (type instanceof ArrowType.Map) { + Field entries = children.get(0); + List entryChildren = entries.getChildren(); + if ("key".equals(segment)) return entryChildren.get(0); + if ("value".equals(segment)) return entryChildren.get(1); + throw new AssertionError("Expected key/value under map, got " + segment); + } + if (type instanceof ArrowType.Struct) { + for (Field f : children) { + if (f.getName().equals(segment)) return f; + } + throw new AssertionError( + "No struct child '" + + segment + + "' under " + + parent.getName() + + " (children=" + + childrenNames(children) + + ")"); + } + throw new AssertionError("Cannot walk into type " + type); + } + + private static String childrenNames(List children) { + StringBuilder sb = new StringBuilder("["); + for (int i = 0; i < children.size(); i++) { + if (i > 0) sb.append(", "); + sb.append(children.get(i).getName()); + } + return sb.append("]").toString(); + } + + private static Map arrowMetadata(Field f) { + return f.getMetadata() == null ? new HashMap<>() : f.getMetadata(); + } + + /** Asserts that no `lance-nested.*` keys leaked onto an Arrow Field's own metadata. */ + private static void assertOuterClean(Field f) { + Map meta = arrowMetadata(f); + for (String k : meta.keySet()) { + assertFalse( + k.startsWith(LanceEncodingUtils.LANCE_NESTED_PREFIX), + "lance-nested.* key leaked onto Field '" + f.getName() + "': " + k); + } + } +}