-
Notifications
You must be signed in to change notification settings - Fork 70
Description
Hi team,
i am using below configuration to read the file every 5 min from file pulse connector. Sometimes its working but most of time its getting failed. with below error i tried lot of way but luck. Kindly help to read this JSON file file
{
"name": "FilePulseConnector-Node1-Metric-last-data",
"config": {
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"tasks.max": "1",
"fs.listing.directory.path": "/var/lib/zookeeper/worker-node-1",
"fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
"file.filter.regex.pattern": "last_data.json",
"fs.scan.interval.ms": "300000",
"topic": "DT-Node1-Metric-last-data",
"read.max.wait.ms": "100000",
"read.max.bytes": "20485760",
"reader.buffer.bytes": "20485760",
"reader.json.with.schema": true,
"reader.json.fail.on.missing.field": false,
"reader.json.ignore.null": false,
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
"tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalBytesArrayInputReader",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
"offset.strategy": "attributes",
"offset.attributes.string": "name+lastModified",
"file.offset.strategy": "path+modification-date",
"always.reload": "false",
"max.uncommitted.offsets": "1",
"file.offset.hash.algorithm": "MD5",
"filters": "ParseJSON",
"filters.ParseJSON.type": "io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
"internal.kafka.reporter.bootstrap.servers": "10.187.XX.XX:9091",
"internal.kafka.reporter.topic": "filepulse-Node1-Metric-last-data",
"tasks.file.status.storage.bootstrap.servers": "10.187.XX.X:9091",
"tasks.file.status.storage.topic": "filepulse-Node1-last-data-status",
"tasks.file.status.storage.partitions": 1,
"tasks.file.status.storage.replication.factor": 1,
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"errors.retry.timeout": "300000",
"errors.retry.delay.max.ms": "10000",
"consumer.override.auto.offset.reset": "latest",
"producer.override.max.request.size": "25971520",
"producer.override.buffer.memory": "33554432",
"producer.override.batch.size": "16384"
}
}
Error
[2025-04-02 09:25:04,550] ERROR [FilePulseConnector-Node1-Metric-last-data|task-0] Caught unexpected error while processing file. Ignore and continue (io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask:226)
io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException: Failed to convert data into Kafka Connect record at offset [position=4312993, rows=-1, timestamp=1743557104139] from object-file: [uri=file:/var/lib/zookeeper/worker-node-1/last_data.json, name='last_data.json', contentLength=4312993, lastModified=1743557101260, contentDigest=[digest=470470922, algorithm='CRC32'], userDefinedMetadata={system.inode=132, system.hostname=BDK-DI-APP01}]'
at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.buildSourceRecord(FilePulseSourceTask.java:331)
at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.lambda$poll$0(FilePulseSourceTask.java:211)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:1003)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1845)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask.poll(FilePulseSourceTask.java:212)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:465)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:353)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:78)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.NullPointerException
Seems its inconsistent Even i tried LocalRowFileInputReader but no luck still getting same issue
Activity