Skip to content

Commit 3efea4c

Browse files
committed
Fix logical timestamp issue
1 parent ed1fa57 commit 3efea4c

File tree

31 files changed

+1520
-58
lines changed

31 files changed

+1520
-58
lines changed

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkInternalSchemaConverter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.spark.sql.types.StructField;
5858
import org.apache.spark.sql.types.StructType;
5959
import org.apache.spark.sql.types.StructType$;
60+
import org.apache.spark.sql.types.TimestampNTZType$;
6061
import org.apache.spark.sql.types.TimestampType;
6162
import org.apache.spark.sql.types.TimestampType$;
6263
import org.apache.spark.sql.types.UserDefinedType;
@@ -267,10 +268,14 @@ private static DataType constructSparkSchemaFromType(Type type) {
267268
case DATE:
268269
return DateType$.MODULE$;
269270
case TIME:
271+
case TIME_MILLIS:
270272
throw new UnsupportedOperationException(String.format("cannot convert %s type to Spark", type));
271273
case TIMESTAMP:
272-
// todo support TimeStampNTZ
274+
case TIMESTAMP_MILLIS:
273275
return TimestampType$.MODULE$;
276+
case LOCAL_TIMESTAMP_MILLIS:
277+
case LOCAL_TIMESTAMP_MICROS:
278+
return TimestampNTZType$.MODULE$;
274279
case STRING:
275280
return StringType$.MODULE$;
276281
case UUID:

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.apache.avro.generic.GenericRecord;
4242
import org.apache.hadoop.conf.Configuration;
4343
import org.apache.hadoop.fs.Path;
44-
import org.apache.parquet.avro.AvroSchemaConverter;
4544
import org.apache.parquet.format.converter.ParquetMetadataConverter;
4645
import org.apache.parquet.hadoop.ParquetFileReader;
4746
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -58,6 +57,7 @@
5857

5958
import static org.apache.hudi.io.HoodieBootstrapHandle.METADATA_BOOTSTRAP_RECORD_SCHEMA;
6059
import static org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
60+
import static org.apache.parquet.avro.HoodieAvroParquetSchemaConverter.getAvroSchemaConverter;
6161

6262
class ParquetBootstrapMetadataHandler extends BaseBootstrapMetadataHandler {
6363

@@ -71,7 +71,7 @@ Schema getAvroSchema(StoragePath sourceFilePath) throws IOException {
7171
(Configuration) table.getStorageConf().unwrap(), new Path(sourceFilePath.toUri()),
7272
ParquetMetadataConverter.NO_FILTER);
7373
MessageType parquetSchema = readFooter.getFileMetaData().getSchema();
74-
return new AvroSchemaConverter().convert(parquetSchema);
74+
return getAvroSchemaConverter((Configuration) table.getStorageConf().unwrap()).convert(parquetSchema);
7575
}
7676

7777
@Override

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ trait SparkAdapter extends Serializable {
5757
*/
5858
def isColumnarBatchRow(r: InternalRow): Boolean
5959

60+
def isTimestampNTZType(dataType: DataType): Boolean
61+
6062
/**
6163
* Creates Catalyst [[Metadata]] for Hudi's meta-fields (designating these w/
6264
* [[METADATA_COL_ATTR_KEY]] if available (available in Spark >= 3.2)

hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.hudi.avro;
2020

21+
import org.apache.hudi.common.util.Option;
22+
import org.apache.hudi.common.util.StringUtils;
2123
import org.apache.hudi.exception.HoodieAvroSchemaException;
2224
import org.apache.hudi.exception.InvalidUnionTypeException;
2325
import org.apache.hudi.exception.MissingSchemaFieldException;
@@ -199,6 +201,25 @@ private static boolean isProjectionOfInternal(Schema sourceSchema,
199201
return atomicTypeEqualityPredicate.apply(sourceSchema, targetSchema);
200202
}
201203

204+
public static Option<Schema> findNestedFieldSchema(Schema schema, String fieldName) {
205+
if (StringUtils.isNullOrEmpty(fieldName)) {
206+
return Option.empty();
207+
}
208+
String[] parts = fieldName.split("\\.");
209+
for (String part : parts) {
210+
Schema.Field foundField = resolveNullableSchema(schema).getField(part);
211+
if (foundField == null) {
212+
throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema);
213+
}
214+
schema = foundField.schema();
215+
}
216+
return Option.of(resolveNullableSchema(schema));
217+
}
218+
219+
public static Option<Schema.Type> findNestedFieldType(Schema schema, String fieldName) {
220+
return findNestedFieldSchema(schema, fieldName).map(Schema::getType);
221+
}
222+
202223
/**
203224
* Appends provided new fields at the end of the given schema
204225
*

hudi-common/src/main/java/org/apache/hudi/avro/ConvertingGenericData.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,10 @@ public class ConvertingGenericData extends GenericData {
4444

4545
// NOTE: Those are not supported in Avro 1.8.2
4646
// TODO re-enable upon upgrading to 1.10
47-
// private static final TimeConversions.TimestampMillisConversion TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.TimestampMillisConversion();
48-
// private static final TimeConversions.TimeMillisConversion TIME_MILLIS_CONVERSION = new TimeConversions.TimeMillisConversion();
49-
// private static final TimeConversions.LocalTimestampMillisConversion LOCAL_TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.LocalTimestampMillisConversion();
50-
// private static final TimeConversions.LocalTimestampMicrosConversion LOCAL_TIMESTAMP_MICROS_CONVERSION = new TimeConversions.LocalTimestampMicrosConversion();
51-
47+
private static final TimeConversions.TimestampMillisConversion TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.TimestampMillisConversion();
48+
private static final TimeConversions.TimeMillisConversion TIME_MILLIS_CONVERSION = new TimeConversions.TimeMillisConversion();
49+
private static final TimeConversions.LocalTimestampMillisConversion LOCAL_TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.LocalTimestampMillisConversion();
50+
private static final TimeConversions.LocalTimestampMicrosConversion LOCAL_TIMESTAMP_MICROS_CONVERSION = new TimeConversions.LocalTimestampMicrosConversion();
5251
public static final GenericData INSTANCE = new ConvertingGenericData();
5352

5453
private ConvertingGenericData() {
@@ -58,11 +57,10 @@ private ConvertingGenericData() {
5857
addLogicalTypeConversion(TIME_MICROS_CONVERSION);
5958
addLogicalTypeConversion(TIMESTAMP_MICROS_CONVERSION);
6059
// NOTE: Those are not supported in Avro 1.8.2
61-
// TODO re-enable upon upgrading to 1.10
62-
// addLogicalTypeConversion(TIME_MILLIS_CONVERSION);
63-
// addLogicalTypeConversion(TIMESTAMP_MILLIS_CONVERSION);
64-
// addLogicalTypeConversion(LOCAL_TIMESTAMP_MILLIS_CONVERSION);
65-
// addLogicalTypeConversion(LOCAL_TIMESTAMP_MICROS_CONVERSION);
60+
addLogicalTypeConversion(TIME_MILLIS_CONVERSION);
61+
addLogicalTypeConversion(TIMESTAMP_MILLIS_CONVERSION);
62+
addLogicalTypeConversion(LOCAL_TIMESTAMP_MILLIS_CONVERSION);
63+
addLogicalTypeConversion(LOCAL_TIMESTAMP_MICROS_CONVERSION);
6664
}
6765

6866
@Override
@@ -127,7 +125,10 @@ public boolean validate(Schema schema, Object datum) {
127125
case LONG:
128126
return isLong(datum)
129127
|| TIME_MICROS_CONVERSION.getConvertedType().isInstance(datum)
130-
|| TIMESTAMP_MICROS_CONVERSION.getConvertedType().isInstance(datum);
128+
|| TIMESTAMP_MICROS_CONVERSION.getConvertedType().isInstance(datum)
129+
|| TIMESTAMP_MILLIS_CONVERSION.getConvertedType().isInstance(datum)
130+
|| LOCAL_TIMESTAMP_MICROS_CONVERSION.getConvertedType().isInstance(datum)
131+
|| LOCAL_TIMESTAMP_MILLIS_CONVERSION.getConvertedType().isInstance(datum);
131132
case FLOAT:
132133
return isFloat(datum);
133134
case DOUBLE:

hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.hudi.common.model.HoodieAvroRecord;
3434
import org.apache.hudi.common.model.HoodieOperation;
3535
import org.apache.hudi.common.model.HoodieRecord;
36+
import org.apache.hudi.common.util.DateTimeUtils;
3637
import org.apache.hudi.common.util.Option;
3738
import org.apache.hudi.common.util.SpillableMapUtils;
3839
import org.apache.hudi.common.util.StringUtils;
@@ -1040,12 +1041,33 @@ private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Sche
10401041
case NULL:
10411042
case BOOLEAN:
10421043
case INT:
1043-
case LONG:
10441044
case FLOAT:
10451045
case DOUBLE:
10461046
case BYTES:
10471047
case STRING:
10481048
return oldValue;
1049+
case LONG:
1050+
if (oldSchema.getLogicalType() != newSchema.getLogicalType()) {
1051+
if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
1052+
if (newSchema.getLogicalType() instanceof LogicalTypes.TimestampMicros) {
1053+
return DateTimeUtils.millisToMicros((Long) oldValue);
1054+
}
1055+
} else if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMicros) {
1056+
if (newSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
1057+
return DateTimeUtils.microsToMillis((Long) oldValue);
1058+
}
1059+
} else if (oldSchema.getLogicalType() instanceof LogicalTypes.LocalTimestampMillis) {
1060+
if (newSchema.getLogicalType() instanceof LogicalTypes.LocalTimestampMicros) {
1061+
return DateTimeUtils.millisToMicros((Long) oldValue);
1062+
}
1063+
} else if (oldSchema.getLogicalType() instanceof LogicalTypes.LocalTimestampMicros) {
1064+
if (newSchema.getLogicalType() instanceof LogicalTypes.LocalTimestampMillis) {
1065+
return DateTimeUtils.microsToMillis((Long) oldValue);
1066+
}
1067+
}
1068+
throw new HoodieAvroSchemaException("Long type logical change from " + oldSchema.getLogicalType() + " to " + newSchema.getLogicalType() + " is not supported");
1069+
}
1070+
return oldValue;
10491071
case FIXED:
10501072
if (oldSchema.getFixedSize() != newSchema.getFixedSize()) {
10511073
// Check whether this is a [[Decimal]]'s precision change

hudi-common/src/main/java/org/apache/hudi/common/util/DateTimeUtils.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ public static Instant microsToInstant(long microsFromEpoch) {
5252
return Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
5353
}
5454

55+
public static Instant nanosToInstant(long nanosFromEpoch) {
56+
long epochSeconds = nanosFromEpoch / (1_000_000_000L);
57+
long nanoAdjustment = nanosFromEpoch % (1_000_000_000L);
58+
return Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
59+
}
60+
5561
/**
5662
* Converts provided {@link Instant} to microseconds (from epoch)
5763
*/
@@ -71,6 +77,45 @@ public static long instantToMicros(Instant instant) {
7177
}
7278
}
7379

80+
/**
81+
* This is based off instantToMicros above.
82+
* */
83+
public static long instantToNanos(Instant instant) {
84+
long seconds = instant.getEpochSecond();
85+
int nanos = instant.getNano();
86+
87+
if (seconds < 0 && nanos > 0) {
88+
// Shift seconds by +1, then subtract a full second in nanos
89+
long totalNanos = Math.multiplyExact(seconds + 1, 1_000_000_000L);
90+
long adjustment = nanos - 1_000_000_000L;
91+
return Math.addExact(totalNanos, adjustment);
92+
} else {
93+
long totalNanos = Math.multiplyExact(seconds, 1_000_000_000L);
94+
return Math.addExact(totalNanos, nanos);
95+
}
96+
}
97+
98+
public static final long MICROS_PER_MILLIS = 1000L;
99+
100+
/**
101+
* Converts the timestamp to milliseconds since epoch. In Spark timestamp values have microseconds
102+
* precision, so this conversion is lossy.
103+
*/
104+
public static Long microsToMillis(Long micros) {
105+
// When the timestamp is negative i.e before 1970, we need to adjust the milliseconds portion.
106+
// Example - 1965-01-01 10:11:12.123456 is represented as (-157700927876544) in micro precision.
107+
// In millis precision the above needs to be represented as (-157700927877).
108+
return Math.floorDiv(micros, MICROS_PER_MILLIS);
109+
}
110+
111+
/**
112+
* Converts milliseconds since the epoch to microseconds.
113+
*/
114+
public static Long millisToMicros(Long millis) {
115+
return Math.multiplyExact(millis, MICROS_PER_MILLIS);
116+
}
117+
118+
74119
/**
75120
* Parse input String to a {@link java.time.Instant}.
76121
*

hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,12 @@ enum TypeID {
6464
TIME(Long.class),
6565
TIMESTAMP(Long.class),
6666
DECIMAL(BigDecimal.class),
67-
UUID(UUID.class);
67+
UUID(UUID.class),
68+
TIME_MILLIS(Integer.class),
69+
TIMESTAMP_MILLIS(Long.class),
70+
LOCAL_TIMESTAMP_MILLIS(Long.class),
71+
LOCAL_TIMESTAMP_MICROS(Long.class);
72+
6873
private final String name;
6974
private final Class<?> classTag;
7075

hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,78 @@ public int hashCode() {
383383
}
384384
}
385385

386+
public static class TimeMillisType extends PrimitiveType {
387+
private static final TimeMillisType INSTANCE = new TimeMillisType();
388+
389+
public static TimeMillisType get() {
390+
return INSTANCE;
391+
}
392+
393+
@Override
394+
public TypeID typeId() {
395+
return TypeID.TIME_MILLIS;
396+
}
397+
398+
@Override
399+
public String toString() {
400+
return "time-millis";
401+
}
402+
}
403+
404+
public static class TimestampMillisType extends PrimitiveType {
405+
private static final TimestampMillisType INSTANCE = new TimestampMillisType();
406+
407+
public static TimestampMillisType get() {
408+
return INSTANCE;
409+
}
410+
411+
@Override
412+
public TypeID typeId() {
413+
return TypeID.TIMESTAMP_MILLIS;
414+
}
415+
416+
@Override
417+
public String toString() {
418+
return "timestamp-millis";
419+
}
420+
}
421+
422+
public static class LocalTimestampMillisType extends PrimitiveType {
423+
private static final LocalTimestampMillisType INSTANCE = new LocalTimestampMillisType();
424+
425+
public static LocalTimestampMillisType get() {
426+
return INSTANCE;
427+
}
428+
429+
@Override
430+
public TypeID typeId() {
431+
return TypeID.LOCAL_TIMESTAMP_MILLIS;
432+
}
433+
434+
@Override
435+
public String toString() {
436+
return "local-timestamp-millis";
437+
}
438+
}
439+
440+
public static class LocalTimestampMicrosType extends PrimitiveType {
441+
private static final LocalTimestampMicrosType INSTANCE = new LocalTimestampMicrosType();
442+
443+
public static LocalTimestampMicrosType get() {
444+
return INSTANCE;
445+
}
446+
447+
@Override
448+
public TypeID typeId() {
449+
return TypeID.LOCAL_TIMESTAMP_MICROS;
450+
}
451+
452+
@Override
453+
public String toString() {
454+
return "local-timestamp-micros";
455+
}
456+
}
457+
386458
/**
387459
* UUID primitive type.
388460
*/

hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -260,15 +260,18 @@ private static Type visitAvroPrimitiveToBuildInternalType(Schema primitive) {
260260
} else if (logical instanceof LogicalTypes.Date) {
261261
return Types.DateType.get();
262262

263-
} else if (
264-
logical instanceof LogicalTypes.TimeMillis
265-
|| logical instanceof LogicalTypes.TimeMicros) {
263+
} else if (logical instanceof LogicalTypes.TimeMillis) {
264+
return Types.TimeMillisType.get();
265+
} else if (logical instanceof LogicalTypes.TimeMicros) {
266266
return Types.TimeType.get();
267-
268-
} else if (
269-
logical instanceof LogicalTypes.TimestampMillis
270-
|| logical instanceof LogicalTypes.TimestampMicros) {
267+
} else if (logical instanceof LogicalTypes.TimestampMillis) {
268+
return Types.TimestampMillisType.get();
269+
} else if (logical instanceof LogicalTypes.TimestampMicros) {
271270
return Types.TimestampType.get();
271+
} else if (logical instanceof LogicalTypes.LocalTimestampMillis) {
272+
return Types.LocalTimestampMillisType.get();
273+
} else if (logical instanceof LogicalTypes.LocalTimestampMicros) {
274+
return Types.LocalTimestampMicrosType.get();
272275
} else if (LogicalTypes.uuid().getName().equals(name)) {
273276
return Types.UUIDType.get();
274277
}
@@ -457,9 +460,21 @@ private static Schema visitInternalPrimitiveToBuildAvroPrimitiveType(Type.Primit
457460
case TIME:
458461
return LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
459462

463+
case TIME_MILLIS:
464+
return LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT));
465+
460466
case TIMESTAMP:
461467
return LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
462468

469+
case TIMESTAMP_MILLIS:
470+
return LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
471+
472+
case LOCAL_TIMESTAMP_MICROS:
473+
return LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
474+
475+
case LOCAL_TIMESTAMP_MILLIS:
476+
return LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
477+
463478
case STRING:
464479
return Schema.create(Schema.Type.STRING);
465480

0 commit comments

Comments
 (0)