Skip to content

SerializationException: Error registering Avro schema #500

@neeru-bhaskar

Description

@neeru-bhaskar

Hi, I am trying to create a s3 source connector with below configuration

curl \
  -i -X PUT -H "Accept:application/json" \
  -H  "Content-Type:application/json" http://localhost:8080/connectors/s3_source_connector/config \
-d '{
        "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "name": "s3_source_connector",
        "topic": "topicname",
        "tasks.max": 1,

        "fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3FileSystemListing",
        "fs.listing.interval.ms": 10000,
        "fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
        "file.filter.regex.pattern": "test\\.csv",
        
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",

        "aws.access.key.id":"key",
        "aws.secret.access.key":"secret",
        "aws.s3.bucket.name":"bucket",
        "aws.s3.region":"us-east-1",
        "aws.secret.session.token": "session_token",
        "aws.s3.bucket.prefix": "bucket",
        "tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.AmazonS3RowFileInputReader",

        "skip.headers": "1",
        "offset.attributes.string": "uri",

        "tasks.file.status.storage.bootstrap.servers":"bootstrapserver",
        "tasks.file.status.storage.topic":"storage-topic",
        "tasks.file.status.storage.topic.partitions":10,
        "tasks.file.status.storage.topic.replication.factor":1,
        "tasks.file.status.storage.topic.creation.enable": false,

        "filters":"CSVFilter",
        "filters.CSVFilter.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
        "filters.CSVFilter.extractColumnName": "headers",
        "filters.CSVFilter.trimColumn": "true",
        "filters.CSVFilter.separator": ",",

        "tasks.file.status.storage.producer.security.protocol": "SSL",
        "tasks.file.status.storage.producer.ssl.endpoint.identification.algorithm": "",
        "tasks.file.status.storage.producer.sasl.mechanism": "GSSAPI",
        "tasks.file.status.storage.producer.ssl.key.password": "",
        "tasks.file.status.storage.producer.ssl.keystore.location": "keystore.jks",
        "tasks.file.status.storage.producer.ssl.keystore.password": "password",
        "tasks.file.status.storage.producer.ssl.truststore.location": "truststore.jks",
        "tasks.file.status.storage.producer.ssl.truststore.password": "password",
        "tasks.file.status.storage.consumer.security.protocol": "SSL",
        "tasks.file.status.storage.consumer.ssl.endpoint.identification.algorithm": "",
        "tasks.file.status.storage.consumer.sasl.mechanism": "GSSAPI",
        "tasks.file.status.storage.consumer.ssl.key.password": "",
        "tasks.file.status.storage.consumer.ssl.keystore.location": "keystore.jks",
        "tasks.file.status.storage.consumer.ssl.keystore.password": "password",
        "tasks.file.status.storage.consumer.ssl.truststore.location": "truststore.jks",
        "tasks.file.status.storage.consumer.ssl.truststore.password": "password",

        "errors.log.include.messages": "true",
        "errors.log.enable": "true"

    }';

I am getting the following exception

[2023-07-13 22:49:25,231] ERROR Error encountered in task jdbc_sink_connector_s3_src_connect-0. Executing stage 'VALUE_CONVERTER' with class 'io.confluent.connect.avro.AvroConverter', where source record is = SourceRecord{sourcePartition={uri=s3://bucket-aws-useast1-apps-dev-1-dev/bucket/test.csv}, sourceOffset={position=132, rows=2, timestamp=1689288565168}} ConnectRecord{topic='topicname', kafkaPartition=null, key=null, keySchema=null, value=Struct{empName=John}, valueSchema=Schema{STRUCT}, timestamp=1689288565168, headers=ConnectHeaders(headers=[ConnectHeader(key=connect.file.name, value=bucket/test.csv, schema=Schema{STRING}), ConnectHeader(key=connect.file.uri, value=s3://bucket-aws-useast1-apps-dev-1-dev/bucket/test.csv, schema=Schema{STRING}), ConnectHeader(key=connect.file.contentLength, value=132, schema=Schema{INT64}), ConnectHeader(key=connect.file.lastModified, value=1689275469000, schema=Schema{INT64}), ConnectHeader(key=connect.file.s3.object.summary.key, value=bucket/test.csv, schema=Schema{STRING}), ConnectHeader(key=connect.file.s3.object.summary.etag, value=df96017ba0e96ddacd2de1736ade34eb, schema=Schema{STRING}), ConnectHeader(key=connect.file.s3.object.summary.bucketName, value=bucket, schema=Schema{STRING}), ConnectHeader(key=connect.task.hostname, value=kafka-connect-s3sourceconn-79cdd7466f-fnjzw, schema=Schema{STRING})])}. (org.apache.kafka.connect.runtime.errors.LogReporter)
org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic topicname :
	at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:93)
	at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:329)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:329)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:355)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"empName","type":["null","string"],"default":null}]}
	at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:259)
	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:156)
	at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:153)
	at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:86)
	... 15 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:297)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:367)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:544)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:532)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:490)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:257)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:366)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:337)
	at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:115)
	... 17 more

I tried to add below schema key and value converters with schema registry url and keystore and truststore to the connector config. But i am getting the same error.

        "key.converter.schemas.enable": "false",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",

        "value.converter.schema.registry.ssl.keystore.password": "password",
        "value.converter.schema.registry.ssl.truststore.password": "password",
        "value.converter.schema.registry.ssl.truststore.location": "truststore.jks",
        "value.converter.schema.registry.url": "sehema-registry-url",
        "value.converter.schema.registry.ssl.key.password": "",
        "value.converter.schema.registry.ssl.keystore.location": "keystore.jks",
        "value.converter.schemas.enable": "true",
        "value.converter": "io.confluent.connect.avro.AvroConverter",

Any suggestions on how to fix this?

Metadata

Metadata

Assignees

No one assigned

    Labels

    wontfixThis will not be worked on

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions