Description
Provide details of the setup you're running
I am running Kafka Connect File Pulse version 2.14.1 on a Linux-based operating system.
Outline your question
I am using the following configuration to deploy the connector:
{
"goyaltu-file-pulse-source-connector-2": {
"connector.name": "filepulse-source-connector",
"transforms.AlignSchemaWithRegistry.schema.registry.urls": "<>",
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"tasks.max": "1",
"tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
"topic": "raft.public.goyaltu.example_app.filepulse2",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
"fs.listing.directory.path": "/codemill/goyaltu/example_streaming_webapp/csvfiles",
"fs.listing.interval.ms": "10000",
"file.filter.regex.pattern": ".*\\.csv",
"offset.strategy": "name + size + lastmodified",
"file.input.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
"filters": "ParseCSVLine",
"filters.ParseCSVLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
"filters.ParseCSVLine.extract.column.name": "headers",
"filters.ParseCSVLine.trim.column": "true",
"filters.ParseCSVLine.separator": ";",
"tasks.file.status.storage.bootstrap.servers": "<>",
"tasks.file.status.storage.topic": "raft.public.goyaltu.connect-file-pulse-status-2",
"tasks.file.status.storage.producer.security.protocol": "SASL_PLAINTEXT",
"tasks.file.status.storage.producer.sasl.mechanism": "GSSAPI",
"tasks.file.status.storage.producer.request.timeout.ms": "20000",
"tasks.file.status.storage.consumer.security.protocol": "SASL_PLAINTEXT",
"tasks.file.status.storage.consumer.sasl.mechanism": "GSSAPI",
"tasks.file.status.storage.consumer.request.timeout.ms": "20000"
}
}
It is not clear from the documentation (https://streamthoughts.github.io/kafka-connect-file-pulse/docs/developer-guide/) whether the internal topic and output topic need to be pre-created or if the connector will create them automatically.
Additionally, I frequently receive this WARN log, even though my connector is in a running state without creating the internal topic:
[2024-07-23T03:11:23,582 [connector-thread-goyaltu-file-pulse-source-connector-2] io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore.createTopic():78 WARN ]: Failed to create topic '(name=raft.public.goyaltu.connect-file-pulse-status-2, numPartitions=default, replicationFactor=default, replicasAssignments=null, configs={cleanup.policy=compact})'
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: createTopics
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[?:?]
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073) ~[?:?]
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165) ~[kafka-clients-3.1.2.jar:?]
at io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore.createTopic(KafkaFileObjectStateBackingStore.java:72) ~[kafka-connect-filepulse-plugin-2.14.1.jar:?]
at io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore.configure(KafkaFileObjectStateBackingStore.java:62) ~[kafka-connect-filepulse-plugin-2.14.1.jar:?]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:405) ~[kafka-clients-3.1.2.jar:?]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434) ~[kafka-clients-3.1.2.jar:?]
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419) ~[kafka-clients-3.1.2.jar:?]
at io.streamthoughts.kafka.connect.filepulse.config.CommonSourceConfig.getStateBackingStore(CommonSourceConfig.java:296) ~[kafka-connect-filepulse-plugin-2.14.1.jar:?]
at io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreAccess.lambda$initSharedStateBackingStore$0(StateBackingStoreAccess.java:46) ~[kafka-connect-filepulse-plugin-2.14.1.jar:?]
at io.streamthoughts.kafka.connect.filepulse.state.FileObjectStateBackingStoreManager.getOrCreateSharedStore(FileObjectStateBackingStoreManager.java:58) [kafka-connect-filepulse-plugin-2.14.1.jar:?]
at io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreAccess.initSharedStateBackingStore(StateBackingStoreAccess.java:43) [kafka-connect-filepulse-plugin-2.14.1.jar:?]
at io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreAccess.<init>(StateBackingStoreAccess.java:33) [kafka-connect-filepulse-plugin-2.14.1.jar:?]
at io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector.start(FilePulseSourceConnector.java:97) [kafka-connect-filepulse-plugin-2.14.1.jar:2.14.1]
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:184) [connect-runtime-3.1.2.jar:?]
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:209) [connect-runtime-3.1.2.jar:?]
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:348) [connect-runtime-3.1.2.jar:?]
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:331) [connect-runtime-3.1.2.jar:?]
at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:140) [connect-runtime-3.1.2.jar:?]
at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:117) [connect-runtime-3.1.2.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) [?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: createTopics
Could you please clarify if the internal topic and output topic need to be pre-created, or if the connector should automatically create them? Additionally, any guidance on resolving the timeout issue would be greatly appreciated.
Thank you!