Skip to content

Allow null values to be set to null for jsonformat #751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -24,7 +24,6 @@
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.SSEAlgorithm;
import io.confluent.connect.storage.common.util.StringUtils;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
@@ -67,6 +66,7 @@
import io.confluent.connect.storage.common.GenericRecommender;
import io.confluent.connect.storage.common.ParentValueRecommender;
import io.confluent.connect.storage.common.StorageCommonConfig;
import io.confluent.connect.storage.common.util.StringUtils;
import io.confluent.connect.storage.format.Format;
import io.confluent.connect.storage.partitioner.DailyPartitioner;
import io.confluent.connect.storage.partitioner.DefaultPartitioner;
@@ -184,6 +184,13 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
+ " This value is case insensitive and can be either 'BASE64' (default) or 'NUMERIC'";
private static final String DECIMAL_FORMAT_DISPLAY = "Decimal Format";

public static final String REPLACE_NULL_WITH_DEFAULT_CONFIG = "json.replace.null.with.default";
public static final boolean REPLACE_NULL_WITH_DEFAULT_DEFAULT = true;
private static final String REPLACE_NULL_WITH_DEFAULT_DOC = "Whether to replace fields that"
+ " have a default value and that are null to the default value."
+ " When set to true, the default value is used, otherwise null is used.";
private static final String REPLACE_NULL_WITH_DEFAULT_DISPLAY = "Replace null with default";

public static final String STORE_KAFKA_KEYS_CONFIG = "store.kafka.keys";
public static final String STORE_KAFKA_HEADERS_CONFIG = "store.kafka.headers";
public static final String KEYS_FORMAT_CLASS_CONFIG = "keys.format.class";
@@ -300,11 +307,11 @@ public static ConfigDef newConfigDef() {
configDef.define(
S3_BUCKET_CONFIG,
Type.STRING,
Importance.HIGH,
ConfigDef.Importance.HIGH,
"The S3 Bucket.",
group,
++orderInGroup,
Width.LONG,
ConfigDef.Width.LONG,
"S3 Bucket"
);

@@ -537,6 +544,18 @@ public static ConfigDef newConfigDef() {
DECIMAL_FORMAT_DISPLAY
);

configDef.define(
REPLACE_NULL_WITH_DEFAULT_CONFIG,
Type.BOOLEAN,
REPLACE_NULL_WITH_DEFAULT_DEFAULT,
Importance.LOW,
REPLACE_NULL_WITH_DEFAULT_DOC,
group,
++orderInGroup,
Width.SHORT,
REPLACE_NULL_WITH_DEFAULT_DISPLAY
);

configDef.define(
S3_PART_RETRIES_CONFIG,
Type.INT,
Original file line number Diff line number Diff line change
@@ -16,11 +16,11 @@
package io.confluent.connect.s3.format.json;

import org.apache.kafka.connect.json.JsonConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.confluent.connect.s3.S3SinkConnectorConfig;
import io.confluent.connect.s3.storage.S3Storage;
@@ -46,6 +46,11 @@ public JsonFormat(S3Storage storage) {
"decimal.format",
String.valueOf(storage.conf().getJsonDecimalFormat())
);
converterConfig.put(
"replace.null.with.default",
storage.conf().getBoolean(S3SinkConnectorConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG)
);

this.converter.configure(converterConfig, false);
}

Original file line number Diff line number Diff line change
@@ -17,16 +17,12 @@

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;

import io.confluent.connect.s3.format.bytearray.ByteArrayFormat;
import io.confluent.connect.s3.format.parquet.ParquetFormat;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

@@ -38,9 +34,12 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import io.confluent.connect.avro.AvroDataConfig;
import io.confluent.connect.s3.auth.AwsAssumeRoleCredentialsProvider;
import io.confluent.connect.s3.format.avro.AvroFormat;
import io.confluent.connect.s3.format.bytearray.ByteArrayFormat;
import io.confluent.connect.s3.format.json.JsonFormat;
import io.confluent.connect.s3.format.parquet.ParquetFormat;
import io.confluent.connect.s3.storage.S3Storage;
import io.confluent.connect.storage.common.StorageCommonConfig;
import io.confluent.connect.storage.partitioner.DailyPartitioner;
@@ -50,20 +49,16 @@
import io.confluent.connect.storage.partitioner.Partitioner;
import io.confluent.connect.storage.partitioner.PartitionerConfig;
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
import io.confluent.connect.avro.AvroDataConfig;

import static io.confluent.connect.s3.S3SinkConnectorConfig.AffixType;
import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_DEFAULT;
import static io.confluent.connect.s3.S3SinkConnectorConfig.HEADERS_FORMAT_CLASS_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.KEYS_FORMAT_CLASS_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.SCHEMA_PARTITION_AFFIX_TYPE_CONFIG;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;

public class S3SinkConnectorConfigTest extends S3SinkConnectorTestBase {

@@ -458,6 +453,16 @@ public void testJsonDecimalFormat() {
assertEquals(DecimalFormat.NUMERIC.name(), connectorConfig.getJsonDecimalFormat());
}

@Test
public void testJsonReplaceNullWithDefault() {
connectorConfig = new S3SinkConnectorConfig(properties);
assertTrue(connectorConfig.getBoolean("json.replace.null.with.default"));

properties.put(S3SinkConnectorConfig.REPLACE_NULL_WITH_DEFAULT_CONFIG, "false");
connectorConfig = new S3SinkConnectorConfig(properties);
assertFalse(connectorConfig.getBoolean("json.replace.null.with.default"));
}

@Test
public void testValidCompressionLevels() {
IntStream.range(-1, 9).boxed().forEach(i -> {
Original file line number Diff line number Diff line change
@@ -15,10 +15,6 @@

package io.confluent.connect.s3.integration;

import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG;
import static io.confluent.kafka.schemaregistry.ClusterTestHarness.KAFKASTORE_TOPIC;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
@@ -29,27 +25,8 @@
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
import com.google.common.collect.ImmutableMap;
import io.confluent.common.utils.IntegrationTest;
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
import io.confluent.kafka.schemaregistry.RestApp;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import io.confluent.connect.s3.util.S3Utils;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
@@ -79,6 +56,30 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import io.confluent.common.utils.IntegrationTest;
import io.confluent.connect.s3.util.S3Utils;
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
import io.confluent.kafka.schemaregistry.RestApp;

import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_ACCESS_KEY_ID_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.AWS_SECRET_ACCESS_KEY_CONFIG;
import static io.confluent.kafka.schemaregistry.ClusterTestHarness.KAFKASTORE_TOPIC;
import static org.assertj.core.api.Assertions.assertThat;

@Category(IntegrationTest.class)
@@ -332,15 +333,15 @@ protected Schema getSampleStructSchema() {
.field("myFloat32", Schema.FLOAT32_SCHEMA)
.field("myFloat64", Schema.FLOAT64_SCHEMA)
.field("myString", Schema.STRING_SCHEMA)
.field("withDefault", SchemaBuilder.bool().optional().defaultValue(true).build())
.build();
}

protected Struct getSampleStructVal(Schema structSchema) {
Date sampleDate = new Date(1111111);
sampleDate.setTime(0);
return new Struct(structSchema)
.put("ID", (long) 1)
.put("myBool", true)
.put("withDefault", null)
.put("myInt32", 32)
.put("myFloat32", 3.2f)
.put("myFloat64", 64.64)
@@ -409,12 +410,15 @@ protected static void clearBucket(String bucketName) {
* @param bucketName the name of the s3 test bucket
* @param expectedRowsPerFile the number of rows a file should have
* @param expectedRow the expected row data in each file
* @param useDefaultValues
*
* @return whether every row of the files read equals the expected row
*/
protected boolean fileContentsAsExpected(
String bucketName,
int expectedRowsPerFile,
Struct expectedRow
Struct expectedRow,
boolean useDefaultValues
) {
log.info("expectedRow: {}", expectedRow);
for (String fileName :
@@ -427,7 +431,7 @@ protected boolean fileContentsAsExpected(
String fileExtension = getExtensionFromKey(fileName);
List<JsonNode> downloadedFileContents = contentGetters.get(fileExtension)
.apply(destinationPath);
if (!fileContentsMatchExpected(downloadedFileContents, expectedRowsPerFile, expectedRow)) {
if (!fileContentsMatchExpected(downloadedFileContents, expectedRowsPerFile, expectedRow, useDefaultValues)) {
return false;
}
downloadedFile.delete();
@@ -481,20 +485,23 @@ protected boolean keyfileContentsAsExpected(
* @param fileContents the file contents as a list of JsonNodes
* @param expectedRowsPerFile the number of rows expected in the file
* @param expectedRow the expected values of each row
* @param useDefaultValues use default values from struct
*
* @return whether the file contents match the expected row
*/
protected boolean fileContentsMatchExpected(
List<JsonNode> fileContents,
int expectedRowsPerFile,
Struct expectedRow
Struct expectedRow,
boolean useDefaultValues
) {
if (fileContents.size() != expectedRowsPerFile) {
log.error("Number of rows in file do not match the expected count, actual: {}, expected: {}",
fileContents.size(), expectedRowsPerFile);
return false;
}
for (JsonNode row : fileContents) {
if (!fileRowMatchesExpectedRow(row, expectedRow)) {
if (!fileRowMatchesExpectedRow(row, expectedRow, useDefaultValues)) {
return false;
}
}
@@ -512,18 +519,34 @@ private List<String> getS3KeyFileList(List<S3ObjectSummary> summaries) {
/**
* Compare the row in the file and its values to the expected row's values.
*
* @param fileRow the row read from the file as a JsonNode
* @param expectedRow the expected contents of the row
* @param fileRow the row read from the file as a JsonNode
* @param expectedRow the expected contents of the row
* @param useDefaultValues
*
* @return whether the file row matches the expected row
*/
private boolean fileRowMatchesExpectedRow(JsonNode fileRow, Struct expectedRow) {
log.debug("Comparing rows: file: {}, expected: {}", fileRow, expectedRow);
private boolean fileRowMatchesExpectedRow(JsonNode fileRow, Struct expectedRow, boolean useDefaultValues) {
log.info("Comparing rows: file: {}, expected: {}", fileRow, expectedRow);
// compare the field values
for (Field key : expectedRow.schema().fields()) {
String expectedValue = expectedRow.get(key).toString();
String rowValue = fileRow.get(key.name()).toString().replaceAll("^\"|\"$", "");
log.debug("Comparing values: {}, {}", expectedValue, rowValue);
if (!rowValue.equals(expectedValue)) {
String expectedValue = null;
if (useDefaultValues) {
expectedValue = expectedRow.get(key).toString();
} else {
Object withoutDefault = expectedRow.getWithoutDefault(key.name());
if (withoutDefault != null) {
expectedValue = withoutDefault.toString();
}
}

JsonNode jsonValue = fileRow.get(key.name());
String rowValue = null;
if (!(jsonValue instanceof NullNode)) {
rowValue = jsonValue.toString().replaceAll("^\"|\"$", "");
}

log.info("Comparing values: {}, {}, {}, {}", key.name(), expectedValue, rowValue, Objects.equals(rowValue, expectedValue));
if (!Objects.equals(rowValue, expectedValue)) {
return false;
}
}
Loading