Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
1faf117
[Fix][Transform-V2][JsonPath] Fix date/time conversion error when mul…
yht0827 Jan 23, 2026
dba5272
[Fix][Transform-V2][JsonPath] Fix date/time conversion error when mul…
yht0827 Jan 25, 2026
abf95c9
[Fix][Transform-V2][JsonPath] Fix date/time conversion error when mul…
yht0827 Jan 25, 2026
5aac4a1
Merge remote-tracking branch 'origin/fix/jsonpath-destfield-conversio…
yht0827 Jan 25, 2026
03a69cb
[Test][Transform-V2][JsonPath] Add edge case tests for date/time colu…
yht0827 Jan 26, 2026
5f5c5d2
chore: trigger CI
yht0827 Jan 28, 2026
c8d3ae3
Fix: restore accidentally deleted files
yht0827 Jan 28, 2026
8824182
[Fix][Format][Json] Add fieldName null check in date/time converters
yht0827 Jan 29, 2026
7a60ab0
[Test][Transform-V2][JsonPath] Use specific exception type in assertions
yht0827 Jan 29, 2026
c1a7ed6
[Fix][Test][Transform-V2][JsonPath] Fix incorrect exception assertion…
yht0827 Jan 29, 2026
24bb52e
[Revert][Format][Json] Remove empty string null check in date/time co…
yht0827 Feb 1, 2026
3d37cb5
[Revert][Test][Transform-V2][JsonPath] Remove empty string date test
yht0827 Feb 1, 2026
7807f3c
[Test][Format][Json] Align multi-date format test with supported patt…
yht0827 Feb 2, 2026
4158a9f
`chore: trigger ci by whitespace change
yht0827 Mar 10, 2026
8e624b3
chore: trigger ci by whitespace change
yht0827 Mar 10, 2026
b4c05fe
[Fix][Transform-V2][JsonPath] Fix date/time conversion error when mul…
yht0827 Jan 23, 2026
1a0fb5c
[Fix][Transform-V2][JsonPath] Fix date/time conversion error when mul…
yht0827 Jan 25, 2026
deaa18c
[Test][Transform-V2][JsonPath] Add edge case tests for date/time colu…
yht0827 Jan 26, 2026
e982251
chore: trigger CI
yht0827 Jan 28, 2026
4e44f50
Fix: restore accidentally deleted files
yht0827 Jan 28, 2026
d9f2db4
[Fix][Format][Json] Add fieldName null check in date/time converters
yht0827 Jan 29, 2026
60a4c82
[Test][Transform-V2][JsonPath] Use specific exception type in assertions
yht0827 Jan 29, 2026
51425ca
[Fix][Test][Transform-V2][JsonPath] Fix incorrect exception assertion…
yht0827 Jan 29, 2026
dae1753
[Revert][Format][Json] Remove empty string null check in date/time co…
yht0827 Feb 1, 2026
8347f30
[Revert][Test][Transform-V2][JsonPath] Remove empty string date test
yht0827 Feb 1, 2026
b77b2ef
[Test][Format][Json] Align multi-date format test with supported patt…
yht0827 Feb 2, 2026
b3b366f
`chore: trigger ci by whitespace change
yht0827 Mar 10, 2026
cf92a02
chore: trigger ci by whitespace change
yht0827 Mar 10, 2026
0217326
Merge remote-tracking branch 'origin/fix/jsonpath-destfield-conversio…
yht0827 Mar 12, 2026
07cdaba
Merge branch 'apache:dev' into fix/jsonpath-destfield-conversion
yht0827 Mar 18, 2026
237f7a4
Merge branch 'apache:dev' into fix/jsonpath-destfield-conversion
yht0827 Mar 19, 2026
af7f4eb
Merge branch 'apache:dev' into fix/jsonpath-destfield-conversion
yht0827 Mar 19, 2026
f2f10b8
Merge branch 'apache:dev' into fix/jsonpath-destfield-conversion
yht0827 Mar 20, 2026
d12fc34
Merge branch 'apache:dev' into fix/jsonpath-destfield-conversion
yht0827 Mar 23, 2026
5ed57a5
Merge branch 'apache:dev' into fix/jsonpath-destfield-conversion
yht0827 Mar 25, 2026
8c8b70f
Merge branch 'apache:dev' into fix/jsonpath-destfield-conversion
yht0827 Mar 26, 2026
1e4936e
Merge branch 'apache:dev' into fix/jsonpath-destfield-conversion
yht0827 Mar 26, 2026
21d7d2b
Merge branch 'apache:dev' into fix/jsonpath-destfield-conversion
yht0827 Mar 28, 2026
51f016d
Merge branch 'apache:dev' into fix/jsonpath-destfield-conversion
yht0827 Mar 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,16 +258,25 @@ private float convertToFloat(JsonNode jsonNode) {

private LocalDate convertToLocalDate(JsonNode jsonNode, String fieldName) {
String dateStr = jsonNode.asText();
DateTimeFormatter dateFormatter = fieldFormatterMap.get(fieldName);

DateTimeFormatter dateFormatter = null;

if (fieldName != null) {
dateFormatter = fieldFormatterMap.get(fieldName);
}

if (dateFormatter == null) {
dateFormatter = DateUtils.matchDateFormatter(dateStr);
fieldFormatterMap.put(fieldName, dateFormatter);
if (fieldName != null) {
fieldFormatterMap.put(fieldName, dateFormatter);
}
}

if (dateFormatter == null) {
throw CommonError.formatDateError(dateStr, fieldName);
}

return dateFormatter.parse(jsonNode.asText()).query(TemporalQueries.localDate());
return dateFormatter.parse(dateStr).query(TemporalQueries.localDate());
}

private LocalTime convertToLocalTime(JsonNode jsonNode) {
Expand All @@ -277,11 +286,20 @@ private LocalTime convertToLocalTime(JsonNode jsonNode) {

private LocalDateTime convertToLocalDateTime(JsonNode jsonNode, String fieldName) {
String datetimeStr = jsonNode.asText();
DateTimeFormatter dateTimeFormatter = fieldFormatterMap.get(fieldName);

DateTimeFormatter dateTimeFormatter = null;

if (fieldName != null) {
dateTimeFormatter = fieldFormatterMap.get(fieldName);
}

if (dateTimeFormatter == null) {
dateTimeFormatter = DateTimeUtils.matchDateTimeFormatter(datetimeStr);
fieldFormatterMap.put(fieldName, dateTimeFormatter);
if (fieldName != null) {
fieldFormatterMap.put(fieldName, dateTimeFormatter);
}
}

if (dateTimeFormatter == null) {
throw CommonError.formatDateTimeError(datetimeStr, fieldName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,4 +713,22 @@ public void testSerializationWithNumber() {
String expected = "{\"id\":1,\"code\":\"1001015\",\"fe_result\":80}";
assertEquals(new String(serialize), expected);
}

@Test
public void testMultipleDateColumnsWithDifferentFormats() throws IOException {
SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {"date_dash", "date_dot"},
new SeaTunnelDataType<?>[] {
LocalTimeType.LOCAL_DATE_TYPE, LocalTimeType.LOCAL_DATE_TYPE
});
JsonDeserializationSchema deserializationSchema =
new JsonDeserializationSchema(false, false, rowType);

String json = "{\"date_dash\":\"2024-01-15\",\"date_dot\":\"2024.06.20\"}";
SeaTunnelRow row = deserializationSchema.deserialize(json.getBytes());

assertEquals(LocalDate.of(2024, 1, 15), row.getField(0));
assertEquals(LocalDate.of(2024, 6, 20), row.getField(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private Object doTransform(
}
Object result = JSON_PATH_CACHE.get(columnConfig.getPath()).read(jsonString);
JsonNode jsonNode = JsonUtils.toJsonNode(result);
return converter.convert(jsonNode, null);
return converter.convert(jsonNode, columnConfig.getDestField());
} catch (JsonPathException e) {
if (columnConfig.errorHandleWay() != null
&& columnConfig.errorHandleWay().allowSkip()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.transform.common.ErrorHandleWay;
import org.apache.seatunnel.transform.common.TransformCommonOptions;
import org.apache.seatunnel.transform.exception.ErrorDataTransformException;
Expand All @@ -39,6 +40,8 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -544,4 +547,284 @@ public void testAllFieldsInSingleBatchConfig() {
"2023-10-30T12:00:00Z",
outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("created_at")));
}

@Test
public void testMultipleDateColumnsWithDifferentFormats() {
Map<String, Object> configMap = new HashMap<>();
configMap.put(
JsonPathTransformConfig.COLUMNS.key(),
Arrays.asList(
ImmutableMap.of(
JsonPathTransformConfig.SRC_FIELD.key(), "data",
JsonPathTransformConfig.PATH.key(), "$.birth",
JsonPathTransformConfig.DEST_FIELD.key(), "birth_date",
JsonPathTransformConfig.DEST_TYPE.key(), "date"),
ImmutableMap.of(
JsonPathTransformConfig.SRC_FIELD.key(), "data",
JsonPathTransformConfig.PATH.key(), "$.hired",
JsonPathTransformConfig.DEST_FIELD.key(), "hire_date",
JsonPathTransformConfig.DEST_TYPE.key(), "date"),
ImmutableMap.of(
JsonPathTransformConfig.SRC_FIELD.key(), "data",
JsonPathTransformConfig.PATH.key(), "$.created",
JsonPathTransformConfig.DEST_FIELD.key(), "created_at",
JsonPathTransformConfig.DEST_TYPE.key(), "timestamp"),
ImmutableMap.of(
JsonPathTransformConfig.SRC_FIELD.key(), "data",
JsonPathTransformConfig.PATH.key(), "$.updated",
JsonPathTransformConfig.DEST_FIELD.key(), "updated_at",
JsonPathTransformConfig.DEST_TYPE.key(), "timestamp")));
ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
CatalogTable table =
CatalogTableUtil.getCatalogTable(
"test",
new SeaTunnelRowType(
new String[] {"data"},
new SeaTunnelDataType[] {BasicType.STRING_TYPE}));
JsonPathTransform transform =
new JsonPathTransform(JsonPathTransformConfig.of(config, table), table);

CatalogTable outputTable = transform.getProducedCatalogTable();
String jsonData =
"{\"birth\": \"1990/05/20\","
+ " \"hired\": \"2024-01-15\","
+ " \"created\": \"2024/01/15 10:30:00\","
+ " \"updated\": \"2024-03-20 14:00:00\"}";
SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[] {jsonData}));

Assertions.assertEquals(
LocalDate.of(1990, 5, 20),
outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("birth_date")));
Assertions.assertEquals(
LocalDate.of(2024, 1, 15),
outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("hire_date")));
Assertions.assertEquals(
LocalDateTime.of(2024, 1, 15, 10, 30, 0),
outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("created_at")));
Assertions.assertEquals(
LocalDateTime.of(2024, 3, 20, 14, 0, 0),
outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("updated_at")));
}

@Test
public void testMultipleDateColumnsWithNullValues() {
Map<String, Object> configMap = new HashMap<>();
configMap.put(
JsonPathTransformConfig.COLUMNS.key(),
Arrays.asList(
ImmutableMap.of(
JsonPathTransformConfig.SRC_FIELD.key(), "data",
JsonPathTransformConfig.PATH.key(), "$.birth",
JsonPathTransformConfig.DEST_FIELD.key(), "birth_date",
JsonPathTransformConfig.DEST_TYPE.key(), "date"),
ImmutableMap.of(
JsonPathTransformConfig.SRC_FIELD.key(), "data",
JsonPathTransformConfig.PATH.key(), "$.hired",
JsonPathTransformConfig.DEST_FIELD.key(), "hire_date",
JsonPathTransformConfig.DEST_TYPE.key(), "date"),
ImmutableMap.of(
JsonPathTransformConfig.SRC_FIELD.key(), "data",
JsonPathTransformConfig.PATH.key(), "$.created",
JsonPathTransformConfig.DEST_FIELD.key(), "created_at",
JsonPathTransformConfig.DEST_TYPE.key(), "timestamp")));
ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
CatalogTable table =
CatalogTableUtil.getCatalogTable(
"test",
new SeaTunnelRowType(
new String[] {"data"},
new SeaTunnelDataType[] {BasicType.STRING_TYPE}));
JsonPathTransform transform =
new JsonPathTransform(JsonPathTransformConfig.of(config, table), table);

CatalogTable outputTable = transform.getProducedCatalogTable();
String jsonData =
"{\"birth\": \"1990/05/20\","
+ " \"hired\": null,"
+ " \"created\": \"2024/01/15 10:30:00\"}";
SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[] {jsonData}));

Assertions.assertNotNull(outputRow);
Assertions.assertEquals(
LocalDate.of(1990, 5, 20),
outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("birth_date")));
Assertions.assertNull(
outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("hire_date")));
Assertions.assertEquals(
LocalDateTime.of(2024, 1, 15, 10, 30, 0),
outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("created_at")));
}

@Test
public void testSingleDateColumn() {
// Verify that the behavior of a single date column remains consistent with the
// modifications
Map<String, Object> configMap = new HashMap<>();
configMap.put(
JsonPathTransformConfig.COLUMNS.key(),
Arrays.asList(
ImmutableMap.of(
JsonPathTransformConfig.SRC_FIELD.key(), "data",
JsonPathTransformConfig.PATH.key(), "$.birth",
JsonPathTransformConfig.DEST_FIELD.key(), "birth_date",
JsonPathTransformConfig.DEST_TYPE.key(), "date")));
ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
CatalogTable table =
CatalogTableUtil.getCatalogTable(
"test",
new SeaTunnelRowType(
new String[] {"data"},
new SeaTunnelDataType[] {BasicType.STRING_TYPE}));
JsonPathTransform transform =
new JsonPathTransform(JsonPathTransformConfig.of(config, table), table);

CatalogTable outputTable = transform.getProducedCatalogTable();
String jsonData = "{\"birth\": \"2024-01-15\"}";
SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[] {jsonData}));

Assertions.assertEquals(
LocalDate.of(2024, 1, 15),
outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("birth_date")));
}

@Test
public void testMultipleDateColumnsWithSameFormat() {
// Verify that multiple date columns with the same format can share cache
Map<String, Object> configMap = new HashMap<>();
configMap.put(
JsonPathTransformConfig.COLUMNS.key(),
Arrays.asList(
ImmutableMap.of(
JsonPathTransformConfig.SRC_FIELD.key(), "data",
JsonPathTransformConfig.PATH.key(), "$.birth",
JsonPathTransformConfig.DEST_FIELD.key(), "birth_date",
JsonPathTransformConfig.DEST_TYPE.key(), "date"),
ImmutableMap.of(
JsonPathTransformConfig.SRC_FIELD.key(), "data",
JsonPathTransformConfig.PATH.key(), "$.hired",
JsonPathTransformConfig.DEST_FIELD.key(), "hire_date",
JsonPathTransformConfig.DEST_TYPE.key(), "date")));
ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
CatalogTable table =
CatalogTableUtil.getCatalogTable(
"test",
new SeaTunnelRowType(
new String[] {"data"},
new SeaTunnelDataType[] {BasicType.STRING_TYPE}));
JsonPathTransform transform =
new JsonPathTransform(JsonPathTransformConfig.of(config, table), table);

CatalogTable outputTable = transform.getProducedCatalogTable();
String jsonData = "{\"birth\": \"2024-01-15\", \"hired\": \"2024-02-20\"}";
SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[] {jsonData}));

Assertions.assertEquals(
LocalDate.of(2024, 1, 15),
outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("birth_date")));
Assertions.assertEquals(
LocalDate.of(2024, 2, 20),
outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("hire_date")));
}

@Test
public void testMixedTypeColumns() {
// Verify mixed configuration of date columns with string and integer columns
Map<String, Object> configMap = new HashMap<>();
configMap.put(
JsonPathTransformConfig.COLUMNS.key(),
Arrays.asList(
ImmutableMap.of(
JsonPathTransformConfig.SRC_FIELD.key(), "data",
JsonPathTransformConfig.PATH.key(), "$.name",
JsonPathTransformConfig.DEST_FIELD.key(), "user_name",
JsonPathTransformConfig.DEST_TYPE.key(), "string"),
ImmutableMap.of(
JsonPathTransformConfig.SRC_FIELD.key(), "data",
JsonPathTransformConfig.PATH.key(), "$.age",
JsonPathTransformConfig.DEST_FIELD.key(), "user_age",
JsonPathTransformConfig.DEST_TYPE.key(), "int"),
ImmutableMap.of(
JsonPathTransformConfig.SRC_FIELD.key(), "data",
JsonPathTransformConfig.PATH.key(), "$.birth",
JsonPathTransformConfig.DEST_FIELD.key(), "birth_date",
JsonPathTransformConfig.DEST_TYPE.key(), "date")));
ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
CatalogTable table =
CatalogTableUtil.getCatalogTable(
"test",
new SeaTunnelRowType(
new String[] {"data"},
new SeaTunnelDataType[] {BasicType.STRING_TYPE}));
JsonPathTransform transform =
new JsonPathTransform(JsonPathTransformConfig.of(config, table), table);

CatalogTable outputTable = transform.getProducedCatalogTable();
String jsonData = "{\"name\": \"John\", \"age\": 30, \"birth\": \"2024-01-15\"}";
SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[] {jsonData}));

Assertions.assertEquals(
"John", outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("user_name")));
Assertions.assertEquals(
30, outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("user_age")));
Assertions.assertEquals(
LocalDate.of(2024, 1, 15),
outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("birth_date")));
}

@Test
public void testInvalidDateFormat() {
Map<String, Object> configMap = new HashMap<>();
configMap.put(
JsonPathTransformConfig.COLUMNS.key(),
Arrays.asList(
ImmutableMap.of(
JsonPathTransformConfig.SRC_FIELD.key(), "data",
JsonPathTransformConfig.PATH.key(), "$.birth",
JsonPathTransformConfig.DEST_FIELD.key(), "birth_date",
JsonPathTransformConfig.DEST_TYPE.key(), "date")));
ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
CatalogTable table =
CatalogTableUtil.getCatalogTable(
"test",
new SeaTunnelRowType(
new String[] {"data"},
new SeaTunnelDataType[] {BasicType.STRING_TYPE}));
JsonPathTransform transform =
new JsonPathTransform(JsonPathTransformConfig.of(config, table), table);

String jsonData = "{\"birth\": \"invalid-date\"}";
Assertions.assertThrows(
SeaTunnelRuntimeException.class,
() -> transform.map(new SeaTunnelRow(new Object[] {jsonData})));
}

@Test
public void testLeapYearDate() {
Map<String, Object> configMap = new HashMap<>();
configMap.put(
JsonPathTransformConfig.COLUMNS.key(),
Arrays.asList(
ImmutableMap.of(
JsonPathTransformConfig.SRC_FIELD.key(), "data",
JsonPathTransformConfig.PATH.key(), "$.birth",
JsonPathTransformConfig.DEST_FIELD.key(), "birth_date",
JsonPathTransformConfig.DEST_TYPE.key(), "date")));
ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
CatalogTable table =
CatalogTableUtil.getCatalogTable(
"test",
new SeaTunnelRowType(
new String[] {"data"},
new SeaTunnelDataType[] {BasicType.STRING_TYPE}));
JsonPathTransform transform =
new JsonPathTransform(JsonPathTransformConfig.of(config, table), table);

CatalogTable outputTable = transform.getProducedCatalogTable();
String jsonData = "{\"birth\": \"2024-02-29\"}";
SeaTunnelRow outputRow = transform.map(new SeaTunnelRow(new Object[] {jsonData}));

Assertions.assertEquals(
LocalDate.of(2024, 2, 29),
outputRow.getField(outputTable.getSeaTunnelRowType().indexOf("birth_date")));
}
}
Loading