-
Notifications
You must be signed in to change notification settings - Fork 2.9k
NIFI-15286 Add connection verification to Kafka Connection Services #10689
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Co-authored-by: Dariusz Seweryn <[email protected]>
pvillard31
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } | ||
|
|
||
| private List<InetSocketAddress> getConnectedAddresses(final ComponentLog verificationLogger, final List<ConfigVerificationResult> results, final List<InetSocketAddress> bootstrapAddresses) { | ||
| final List<InetSocketAddress> connectedAddresses = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use a thread-safe list here? Given that we have virtual threads that execute in parallel, we could have a ConcurrentModificationException, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, yes, I will wrap this in a synchronized List.
| new ConfigVerificationResult.Builder() | ||
| .verificationStepName(ADDRESSES_STEP) | ||
| .outcome(SUCCESSFUL) | ||
| .explanation("Addresses Validated [%d]".formatted(validatedAddresses.size())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| .explanation("Addresses Validated [%d]".formatted(validatedAddresses.size())) | |
| .explanation("Addresses validated [%d]".formatted(validatedAddresses.size())) |
To be consistent with, for example, "Topics found"
|
Thanks for the review @pvillard31, I pushed an update to synchronize the List and adjust the message capitalization. |
pvillard31
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, +1, will merge once checks are green (but likely tomorrow now - you can self merge before if you want)


Summary
NIFI-15286 Adds manual connection verification to Kafka Connection Services.
The implementation builds on the initial proposal in PR #10596 and provides the following approach:
KafkaConnectionVerifierclass with one reusable instance of the class for each instance of the Kafka Connection ServiceTopic Listingto the final verification stepThe implementation includes updated integration tests against Kafka running in a container to evaluate common failure and success paths.
Successful verification includes the following steps:
Broker Addressesdisplaying number of addresses validatedNode Connectiondisplaying host and port for each Node with socket connectionCluster Descriptiondisplaying number of Nodes foundTopic Listingdisplaying the number of Topics foundTracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000Pull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation