Skip to content

Commit 4850db0

Browse files
committed
Handle AvroSchemaConverterWithTimestampNTZ
1 parent 4401c99 commit 4850db0

File tree

11 files changed

+1380
-33
lines changed

11 files changed

+1380
-33
lines changed

.github/workflows/bot.yml

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,22 @@ jobs:
5858
strategy:
5959
matrix:
6060
include:
61+
- scalaProfile: "scala-2.11"
62+
sparkProfile: "spark2.4"
63+
sparkModules: "hudi-spark-datasource/hudi-spark2"
64+
65+
- scalaProfile: "scala-2.12"
66+
sparkProfile: "spark3.0"
67+
sparkModules: "hudi-spark-datasource/hudi-spark3.0.x"
68+
69+
- scalaProfile: "scala-2.12"
70+
sparkProfile: "spark3.1"
71+
sparkModules: "hudi-spark-datasource/hudi-spark3.1.x"
72+
73+
- scalaProfile: "scala-2.12"
74+
sparkProfile: "spark3.2"
75+
sparkModules: "hudi-spark-datasource/hudi-spark3.2.x"
76+
6177
- scalaProfile: "scala-2.12"
6278
sparkProfile: "spark3.3"
6379
sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
@@ -117,6 +133,22 @@ jobs:
117133
strategy:
118134
matrix:
119135
include:
136+
- scalaProfile: "scala-2.11"
137+
sparkProfile: "spark2.4"
138+
sparkModules: "hudi-spark-datasource/hudi-spark2"
139+
140+
- scalaProfile: "scala-2.12"
141+
sparkProfile: "spark3.0"
142+
sparkModules: "hudi-spark-datasource/hudi-spark3.0.x"
143+
144+
- scalaProfile: "scala-2.12"
145+
sparkProfile: "spark3.1"
146+
sparkModules: "hudi-spark-datasource/hudi-spark3.1.x"
147+
148+
- scalaProfile: "scala-2.12"
149+
sparkProfile: "spark3.2"
150+
sparkModules: "hudi-spark-datasource/hudi-spark3.2.x"
151+
120152
- scalaProfile: "scala-2.12"
121153
sparkProfile: "spark3.3"
122154
sparkModules: "hudi-spark-datasource/hudi-spark3.3.x"
@@ -636,4 +668,48 @@ jobs:
636668
SCALA_PROFILE: ${{ matrix.scalaProfile }}
637669
run: |
638670
HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
639-
./packaging/bundle-validation/ci_run.sh hudi_docker_java17 $HUDI_VERSION openjdk17
671+
./packaging/bundle-validation/ci_run.sh hudi_docker_java17 $HUDI_VERSION openjdk17
672+
673+
integration-tests:
674+
runs-on: ubuntu-latest
675+
strategy:
676+
matrix:
677+
include:
678+
- sparkProfile: 'spark2.4'
679+
sparkArchive: 'spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz'
680+
steps:
681+
- uses: actions/checkout@v3
682+
- name: Set up JDK 8
683+
uses: actions/setup-java@v3
684+
with:
685+
java-version: '8'
686+
distribution: 'temurin'
687+
architecture: x64
688+
cache: maven
689+
- name: Build Project
690+
env:
691+
SPARK_PROFILE: ${{ matrix.sparkProfile }}
692+
SCALA_PROFILE: '-Dscala-2.11 -Dscala.binary.version=2.11'
693+
run:
694+
mvn clean install -T 2 $SCALA_PROFILE -D"$SPARK_PROFILE" -Pintegration-tests -DskipTests=true $MVN_ARGS
695+
- name: 'UT integ-test'
696+
env:
697+
SPARK_PROFILE: ${{ matrix.sparkProfile }}
698+
SCALA_PROFILE: '-Dscala-2.11 -Dscala.binary.version=2.11'
699+
run:
700+
mvn test $SCALA_PROFILE -D"$SPARK_PROFILE" -Pintegration-tests -DskipUTs=false -DskipITs=true -pl hudi-integ-test $MVN_ARGS
701+
- name: 'IT'
702+
env:
703+
SPARK_PROFILE: ${{ matrix.sparkProfile }}
704+
SPARK_ARCHIVE: ${{ matrix.sparkArchive }}
705+
SCALA_PROFILE: '-Dscala-2.11 -Dscala.binary.version=2.11'
706+
run: |
707+
echo "Downloading $SPARK_ARCHIVE"
708+
curl https://archive.apache.org/dist/spark/$SPARK_ARCHIVE --create-dirs -o $GITHUB_WORKSPACE/$SPARK_ARCHIVE
709+
tar -xvf $GITHUB_WORKSPACE/$SPARK_ARCHIVE -C $GITHUB_WORKSPACE/
710+
mkdir /tmp/spark-events/
711+
SPARK_ARCHIVE_BASENAME=$(basename $SPARK_ARCHIVE)
712+
export SPARK_HOME=$GITHUB_WORKSPACE/${SPARK_ARCHIVE_BASENAME%.*}
713+
rm -f $GITHUB_WORKSPACE/$SPARK_ARCHIVE
714+
docker system prune --all --force
715+
mvn verify $SCALA_PROFILE -D"$SPARK_PROFILE" -Pintegration-tests -pl !hudi-flink-datasource/hudi-flink $MVN_ARGS

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkInternalSchemaConverter.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@
5757
import org.apache.spark.sql.types.StructField;
5858
import org.apache.spark.sql.types.StructType;
5959
import org.apache.spark.sql.types.StructType$;
60-
import org.apache.spark.sql.types.TimestampNTZType$;
6160
import org.apache.spark.sql.types.TimestampType;
6261
import org.apache.spark.sql.types.TimestampType$;
6362
import org.apache.spark.sql.types.UserDefinedType;
6463
import org.apache.spark.sql.types.VarcharType;
6564

65+
import java.lang.reflect.Field;
6666
import java.sql.Date;
6767
import java.util.ArrayList;
6868
import java.util.Deque;
@@ -83,6 +83,21 @@ private SparkInternalSchemaConverter() {
8383
public static final String HOODIE_TABLE_PATH = "hoodie.tablePath";
8484
public static final String HOODIE_VALID_COMMITS_LIST = "hoodie.valid.commits.list";
8585

86+
/**
87+
* Get TimestampNTZType$ using reflection, as it's only available in Spark 3.3+.
88+
* Falls back to TimestampType$ if TimestampNTZType is not available.
89+
*/
90+
private static DataType getTimestampNTZType() {
91+
try {
92+
Class<?> timestampNTZTypeClass = Class.forName("org.apache.spark.sql.types.TimestampNTZType$");
93+
Field moduleField = timestampNTZTypeClass.getField("MODULE$");
94+
return (DataType) moduleField.get(null);
95+
} catch (ClassNotFoundException | NoSuchFieldException | IllegalAccessException e) {
96+
// TimestampNTZType is not available in this Spark version, fall back to TimestampType
97+
return TimestampType$.MODULE$;
98+
}
99+
}
100+
86101
public static Type buildTypeFromStructType(DataType sparkType, Boolean firstVisitRoot, AtomicInteger nextId) {
87102
if (sparkType instanceof StructType) {
88103
StructField[] fields = ((StructType) sparkType).fields();
@@ -275,7 +290,7 @@ private static DataType constructSparkSchemaFromType(Type type) {
275290
return TimestampType$.MODULE$;
276291
case LOCAL_TIMESTAMP_MILLIS:
277292
case LOCAL_TIMESTAMP_MICROS:
278-
return TimestampNTZType$.MODULE$;
293+
return getTimestampNTZType();
279294
case STRING:
280295
return StringType$.MODULE$;
281296
case UUID:

hudi-common/src/main/java/org/apache/hudi/avro/ConvertingGenericData.java

Lines changed: 84 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.avro.generic.GenericData;
2626
import org.apache.avro.generic.GenericFixed;
2727

28+
import java.lang.reflect.Constructor;
2829
import java.util.Map;
2930

3031
/**
@@ -42,12 +43,12 @@ public class ConvertingGenericData extends GenericData {
4243
private static final TimeConversions.TimeMicrosConversion TIME_MICROS_CONVERSION = new TimeConversions.TimeMicrosConversion();
4344
private static final TimeConversions.TimestampMicrosConversion TIMESTAMP_MICROS_CONVERSION = new TimeConversions.TimestampMicrosConversion();
4445

45-
// NOTE: Those are not supported in Avro 1.8.2
46-
// TODO re-enable upon upgrading to 1.10
47-
private static final TimeConversions.TimestampMillisConversion TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.TimestampMillisConversion();
48-
private static final TimeConversions.TimeMillisConversion TIME_MILLIS_CONVERSION = new TimeConversions.TimeMillisConversion();
49-
private static final TimeConversions.LocalTimestampMillisConversion LOCAL_TIMESTAMP_MILLIS_CONVERSION = new TimeConversions.LocalTimestampMillisConversion();
50-
private static final TimeConversions.LocalTimestampMicrosConversion LOCAL_TIMESTAMP_MICROS_CONVERSION = new TimeConversions.LocalTimestampMicrosConversion();
46+
// NOTE: Those are not supported in Avro 1.8.2 (used by Spark 2)
47+
// Use reflection to conditionally initialize them only if available
48+
private static final Object TIMESTAMP_MILLIS_CONVERSION = createConversionIfAvailable("org.apache.avro.data.TimeConversions$TimestampMillisConversion");
49+
private static final Object TIME_MILLIS_CONVERSION = createConversionIfAvailable("org.apache.avro.data.TimeConversions$TimeMillisConversion");
50+
private static final Object LOCAL_TIMESTAMP_MILLIS_CONVERSION = createConversionIfAvailable("org.apache.avro.data.TimeConversions$LocalTimestampMillisConversion");
51+
private static final Object LOCAL_TIMESTAMP_MICROS_CONVERSION = createConversionIfAvailable("org.apache.avro.data.TimeConversions$LocalTimestampMicrosConversion");
5152
public static final GenericData INSTANCE = new ConvertingGenericData();
5253

5354
private ConvertingGenericData() {
@@ -56,11 +57,20 @@ private ConvertingGenericData() {
5657
addLogicalTypeConversion(DATE_CONVERSION);
5758
addLogicalTypeConversion(TIME_MICROS_CONVERSION);
5859
addLogicalTypeConversion(TIMESTAMP_MICROS_CONVERSION);
59-
// NOTE: Those are not supported in Avro 1.8.2
60-
addLogicalTypeConversion(TIME_MILLIS_CONVERSION);
61-
addLogicalTypeConversion(TIMESTAMP_MILLIS_CONVERSION);
62-
addLogicalTypeConversion(LOCAL_TIMESTAMP_MILLIS_CONVERSION);
63-
addLogicalTypeConversion(LOCAL_TIMESTAMP_MICROS_CONVERSION);
60+
// NOTE: Those are not supported in Avro 1.8.2 (used by Spark 2)
61+
// Only add conversions if they're available
62+
if (TIME_MILLIS_CONVERSION != null) {
63+
addLogicalTypeConversionReflectively(TIME_MILLIS_CONVERSION);
64+
}
65+
if (TIMESTAMP_MILLIS_CONVERSION != null) {
66+
addLogicalTypeConversionReflectively(TIMESTAMP_MILLIS_CONVERSION);
67+
}
68+
if (LOCAL_TIMESTAMP_MILLIS_CONVERSION != null) {
69+
addLogicalTypeConversionReflectively(LOCAL_TIMESTAMP_MILLIS_CONVERSION);
70+
}
71+
if (LOCAL_TIMESTAMP_MICROS_CONVERSION != null) {
72+
addLogicalTypeConversionReflectively(LOCAL_TIMESTAMP_MICROS_CONVERSION);
73+
}
6474
}
6575

6676
@Override
@@ -123,12 +133,31 @@ public boolean validate(Schema schema, Object datum) {
123133
return isInteger(datum)
124134
|| DATE_CONVERSION.getConvertedType().isInstance(datum);
125135
case LONG:
126-
return isLong(datum)
127-
|| TIME_MICROS_CONVERSION.getConvertedType().isInstance(datum)
128-
|| TIMESTAMP_MICROS_CONVERSION.getConvertedType().isInstance(datum)
129-
|| TIMESTAMP_MILLIS_CONVERSION.getConvertedType().isInstance(datum)
130-
|| LOCAL_TIMESTAMP_MICROS_CONVERSION.getConvertedType().isInstance(datum)
131-
|| LOCAL_TIMESTAMP_MILLIS_CONVERSION.getConvertedType().isInstance(datum);
136+
if (isLong(datum)) {
137+
return true;
138+
}
139+
if (TIME_MICROS_CONVERSION.getConvertedType().isInstance(datum)
140+
|| TIMESTAMP_MICROS_CONVERSION.getConvertedType().isInstance(datum)) {
141+
return true;
142+
}
143+
// Check optional conversions that may not be available in Avro 1.8.2
144+
Class<?> convertedType;
145+
if (TIMESTAMP_MILLIS_CONVERSION != null
146+
&& (convertedType = getConvertedType(TIMESTAMP_MILLIS_CONVERSION)) != null
147+
&& convertedType.isInstance(datum)) {
148+
return true;
149+
}
150+
if (LOCAL_TIMESTAMP_MICROS_CONVERSION != null
151+
&& (convertedType = getConvertedType(LOCAL_TIMESTAMP_MICROS_CONVERSION)) != null
152+
&& convertedType.isInstance(datum)) {
153+
return true;
154+
}
155+
if (LOCAL_TIMESTAMP_MILLIS_CONVERSION != null
156+
&& (convertedType = getConvertedType(LOCAL_TIMESTAMP_MILLIS_CONVERSION)) != null
157+
&& convertedType.isInstance(datum)) {
158+
return true;
159+
}
160+
return false;
132161
case FLOAT:
133162
return isFloat(datum);
134163
case DOUBLE:
@@ -141,5 +170,43 @@ public boolean validate(Schema schema, Object datum) {
141170
return false;
142171
}
143172
}
173+
174+
/**
175+
* Creates a conversion instance using reflection if the class is available.
176+
* Returns null if the class doesn't exist (e.g., in Avro 1.8.2).
177+
*/
178+
private static Object createConversionIfAvailable(String className) {
179+
try {
180+
Class<?> clazz = Class.forName(className);
181+
Constructor<?> constructor = clazz.getConstructor();
182+
return constructor.newInstance();
183+
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException
184+
| IllegalAccessException | java.lang.reflect.InvocationTargetException e) {
185+
// Class doesn't exist or can't be instantiated (e.g., Avro 1.8.2)
186+
return null;
187+
}
188+
}
189+
190+
/**
191+
* Gets the converted type from a conversion object using reflection.
192+
*/
193+
private static Class<?> getConvertedType(Object conversion) {
194+
try {
195+
return (Class<?>) conversion.getClass().getMethod("getConvertedType").invoke(conversion);
196+
} catch (Exception e) {
197+
// Should not happen if conversion is valid, but handle gracefully
198+
return null;
199+
}
200+
}
201+
202+
/**
203+
* Adds a logical type conversion using unchecked cast to avoid compile-time dependency
204+
* on classes that may not exist in older Avro versions.
205+
*/
206+
private void addLogicalTypeConversionReflectively(Object conversion) {
207+
// Cast to Conversion<?> since we know it's a Conversion if not null
208+
// This avoids compile-time dependency on specific Conversion subclasses
209+
addLogicalTypeConversion((org.apache.avro.Conversion<?>) conversion);
210+
}
144211
}
145212

hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.avro.Conversions;
5050
import org.apache.avro.Conversions.DecimalConversion;
5151
import org.apache.avro.JsonProperties;
52+
import org.apache.avro.LogicalType;
5253
import org.apache.avro.LogicalTypes;
5354
import org.apache.avro.LogicalTypes.Decimal;
5455
import org.apache.avro.Schema;
@@ -1056,12 +1057,12 @@ private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Sche
10561057
if (newSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
10571058
return DateTimeUtils.microsToMillis((Long) oldValue);
10581059
}
1059-
} else if (oldSchema.getLogicalType() instanceof LogicalTypes.LocalTimestampMillis) {
1060-
if (newSchema.getLogicalType() instanceof LogicalTypes.LocalTimestampMicros) {
1060+
} else if (isLocalTimestampMillis(oldSchema.getLogicalType())) {
1061+
if (isLocalTimestampMicros(newSchema.getLogicalType())) {
10611062
return DateTimeUtils.millisToMicros((Long) oldValue);
10621063
}
1063-
} else if (oldSchema.getLogicalType() instanceof LogicalTypes.LocalTimestampMicros) {
1064-
if (newSchema.getLogicalType() instanceof LogicalTypes.LocalTimestampMillis) {
1064+
} else if (isLocalTimestampMicros(oldSchema.getLogicalType())) {
1065+
if (isLocalTimestampMillis(newSchema.getLogicalType())) {
10651066
return DateTimeUtils.microsToMillis((Long) oldValue);
10661067
}
10671068
}
@@ -1458,4 +1459,38 @@ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper) {
14581459
}
14591460
}
14601461

1462+
/**
1463+
* Checks if a logical type is an instance of LocalTimestampMillis using reflection.
1464+
* Returns false if the class doesn't exist (e.g., in Avro 1.8.2).
1465+
*/
1466+
private static boolean isLocalTimestampMillis(LogicalType logicalType) {
1467+
if (logicalType == null) {
1468+
return false;
1469+
}
1470+
try {
1471+
Class<?> localTimestampMillisClass = Class.forName("org.apache.avro.LogicalTypes$LocalTimestampMillis");
1472+
return localTimestampMillisClass.isInstance(logicalType);
1473+
} catch (ClassNotFoundException e) {
1474+
// Class doesn't exist (e.g., Avro 1.8.2)
1475+
return false;
1476+
}
1477+
}
1478+
1479+
/**
1480+
* Checks if a logical type is an instance of LocalTimestampMicros using reflection.
1481+
* Returns false if the class doesn't exist (e.g., in Avro 1.8.2).
1482+
*/
1483+
private static boolean isLocalTimestampMicros(LogicalType logicalType) {
1484+
if (logicalType == null) {
1485+
return false;
1486+
}
1487+
try {
1488+
Class<?> localTimestampMicrosClass = Class.forName("org.apache.avro.LogicalTypes$LocalTimestampMicros");
1489+
return localTimestampMicrosClass.isInstance(logicalType);
1490+
} catch (ClassNotFoundException e) {
1491+
// Class doesn't exist (e.g., Avro 1.8.2)
1492+
return false;
1493+
}
1494+
}
1495+
14611496
}

0 commit comments

Comments
 (0)