Skip to content

Commit 62c297e

Browse files
pratyakshsharmabhasudha
authored andcommitted
[HUDI-1177]: fixed TaskNotSerializableException in TimestampBasedKeyGenerator (#1987)
Co-authored-by: Bhavani Sudha Saktheeswaran <[email protected]>
1 parent 85a17b2 commit 62c297e

File tree

3 files changed

+41
-28
lines changed

3 files changed

+41
-28
lines changed

hudi-spark/src/main/java/org/apache/hudi/keygen/TimestampBasedKeyGenerator.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.hudi.DataSourceWriteOptions;
2323
import org.apache.hudi.avro.HoodieAvroUtils;
2424
import org.apache.hudi.common.config.TypedProperties;
25+
import org.apache.hudi.common.util.Option;
2526
import org.apache.hudi.exception.HoodieDeltaStreamerException;
2627
import org.apache.hudi.exception.HoodieException;
2728
import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -40,7 +41,6 @@
4041
import java.io.UnsupportedEncodingException;
4142
import java.net.URLEncoder;
4243
import java.nio.charset.StandardCharsets;
43-
import java.text.ParseException;
4444
import java.util.concurrent.TimeUnit;
4545

4646
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -61,7 +61,8 @@ public enum TimestampType implements Serializable {
6161
private final TimeUnit timeUnit;
6262
private final TimestampType timestampType;
6363
private final String outputDateFormat;
64-
private DateTimeFormatter inputFormatter;
64+
private transient Option<DateTimeFormatter> inputFormatter;
65+
private transient DateTimeFormatter partitionFormatter;
6566
private final HoodieDateTimeParser parser;
6667

6768
// TimeZone detailed settings reference
@@ -108,13 +109,8 @@ public TimestampBasedKeyGenerator(TypedProperties config) throws IOException {
108109
this.parser = DataSourceUtils.createDateTimeParser(config, dateTimeParserClass);
109110
this.outputDateTimeZone = parser.getOutputDateTimeZone();
110111
this.outputDateFormat = parser.getOutputDateFormat();
111-
this.inputFormatter = parser.getInputFormatter();
112112
this.timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
113113

114-
if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
115-
this.inputFormatter = parser.getInputFormatter();
116-
}
117-
118114
switch (this.timestampType) {
119115
case EPOCHMILLISECONDS:
120116
timeUnit = MILLISECONDS;
@@ -146,18 +142,29 @@ public String getPartitionPath(GenericRecord record) {
146142
}
147143
}
148144

145+
/**
146+
* The function takes care of lazily initialising dateTimeFormatter variables only once.
147+
*/
148+
private void initIfNeeded() {
149+
if (this.inputFormatter == null) {
150+
this.inputFormatter = parser.getInputFormatter();
151+
}
152+
if (this.partitionFormatter == null) {
153+
this.partitionFormatter = DateTimeFormat.forPattern(outputDateFormat);
154+
if (this.outputDateTimeZone != null) {
155+
partitionFormatter = partitionFormatter.withZone(outputDateTimeZone);
156+
}
157+
}
158+
}
159+
149160
/**
150161
* Parse and fetch partition path based on data type.
151162
*
152163
* @param partitionVal partition path object value fetched from record/row
153164
* @return the parsed partition path based on data type
154-
* @throws ParseException on any parse exception
155165
*/
156-
private String getPartitionPath(Object partitionVal) throws ParseException {
157-
DateTimeFormatter partitionFormatter = DateTimeFormat.forPattern(outputDateFormat);
158-
if (this.outputDateTimeZone != null) {
159-
partitionFormatter = partitionFormatter.withZone(outputDateTimeZone);
160-
}
166+
private String getPartitionPath(Object partitionVal) {
167+
initIfNeeded();
161168
long timeMs;
162169
if (partitionVal instanceof Double) {
163170
timeMs = convertLongTimeToMillis(((Double) partitionVal).longValue());
@@ -166,13 +173,16 @@ private String getPartitionPath(Object partitionVal) throws ParseException {
166173
} else if (partitionVal instanceof Long) {
167174
timeMs = convertLongTimeToMillis((Long) partitionVal);
168175
} else if (partitionVal instanceof CharSequence) {
169-
DateTime parsedDateTime = inputFormatter.parseDateTime(partitionVal.toString());
176+
if (!inputFormatter.isPresent()) {
177+
throw new HoodieException("Missing inputformatter. Ensure " + Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP + " config is set when timestampType is DATE_STRING or MIXED!");
178+
}
179+
DateTime parsedDateTime = inputFormatter.get().parseDateTime(partitionVal.toString());
170180
if (this.outputDateTimeZone == null) {
171181
// Use the timezone that came off the date that was passed in, if it had one
172182
partitionFormatter = partitionFormatter.withZone(parsedDateTime.getZone());
173183
}
174184

175-
timeMs = inputFormatter.parseDateTime(partitionVal.toString()).getMillis();
185+
timeMs = inputFormatter.get().parseDateTime(partitionVal.toString()).getMillis();
176186
} else {
177187
throw new HoodieNotSupportedException(
178188
"Unexpected type for partition field: " + partitionVal.getClass().getName());

hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParser.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.hudi.keygen.parser;
1919

20+
import org.apache.hudi.common.util.Option;
2021
import org.joda.time.DateTimeZone;
2122
import org.joda.time.format.DateTimeFormatter;
2223

23-
public interface HoodieDateTimeParser {
24+
import java.io.Serializable;
25+
26+
public interface HoodieDateTimeParser extends Serializable {
2427

2528
/**
2629
* Returns the output date format in which the partition paths will be created for the hudi dataset.
@@ -32,7 +35,7 @@ public interface HoodieDateTimeParser {
3235
* Returns input formats in which datetime based values might be coming in incoming records.
3336
* @return
3437
*/
35-
DateTimeFormatter getInputFormatter();
38+
Option<DateTimeFormatter> getInputFormatter();
3639

3740
/**
3841
* Returns the datetime zone one should expect the incoming values into.

hudi-spark/src/main/java/org/apache/hudi/keygen/parser/HoodieDateTimeParserImpl.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.hudi.DataSourceUtils;
2121
import org.apache.hudi.common.config.TypedProperties;
22+
import org.apache.hudi.common.util.Option;
2223
import org.apache.hudi.keygen.TimestampBasedKeyGenerator.Config;
2324
import org.apache.hudi.keygen.TimestampBasedKeyGenerator.TimestampType;
2425
import org.joda.time.DateTimeZone;
@@ -37,7 +38,6 @@ public class HoodieDateTimeParserImpl implements HoodieDateTimeParser, Serializa
3738
private String configInputDateFormatList;
3839
private final String configInputDateFormatDelimiter;
3940
private final TypedProperties config;
40-
private DateTimeFormatter inputFormatter;
4141

4242
// TimeZone detailed settings reference
4343
// https://docs.oracle.com/javase/8/docs/api/java/util/TimeZone.html
@@ -48,14 +48,6 @@ public HoodieDateTimeParserImpl(TypedProperties config) {
4848
DataSourceUtils.checkRequiredProperties(config, Arrays.asList(Config.TIMESTAMP_TYPE_FIELD_PROP, Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP));
4949
this.inputDateTimeZone = getInputDateTimeZone();
5050
this.configInputDateFormatDelimiter = getConfigInputDateFormatDelimiter();
51-
52-
TimestampType timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
53-
if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
54-
DataSourceUtils.checkRequiredProperties(config,
55-
Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
56-
this.configInputDateFormatList = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "");
57-
inputFormatter = getInputDateFormatter();
58-
}
5951
}
6052

6153
private String getConfigInputDateFormatDelimiter() {
@@ -94,8 +86,16 @@ public String getOutputDateFormat() {
9486
}
9587

9688
@Override
97-
public DateTimeFormatter getInputFormatter() {
98-
return this.inputFormatter;
89+
public Option<DateTimeFormatter> getInputFormatter() {
90+
TimestampType timestampType = TimestampType.valueOf(config.getString(Config.TIMESTAMP_TYPE_FIELD_PROP));
91+
if (timestampType == TimestampType.DATE_STRING || timestampType == TimestampType.MIXED) {
92+
DataSourceUtils.checkRequiredProperties(config,
93+
Collections.singletonList(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP));
94+
this.configInputDateFormatList = config.getString(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "");
95+
return Option.of(getInputDateFormatter());
96+
}
97+
98+
return Option.empty();
9999
}
100100

101101
@Override

0 commit comments

Comments
 (0)