-
Notifications
You must be signed in to change notification settings - Fork 106
FLOW-7395 React to channel error count #1232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FLOW-7395 React to channel error count #1232
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
64883d8 to
60b9d91
Compare
3526c08 to
dfa462d
Compare
60b9d91 to
e2d9666
Compare
dfa462d to
614b2cf
Compare
e2d9666 to
3d66650
Compare
614b2cf to
9a1578a
Compare
9a1578a to
d7ad3f9
Compare
9313fa9 to
ef65dc7
Compare
6ee0b38 to
a53b634
Compare
ef65dc7 to
be5834f
Compare
a53b634 to
7e1b773
Compare
be5834f to
55ce579
Compare
7e1b773 to
1b44bbd
Compare
55ce579 to
5a911f8
Compare
1b44bbd to
d99a1fc
Compare
5a911f8 to
01bc9c4
Compare
d99a1fc to
6e3b256
Compare
01bc9c4 to
128048b
Compare
6e3b256 to
3a25fc6
Compare
| // Check for channel errors detected in preCommit and fail the task | ||
| if (this.channelErrorToFailOn != null) { | ||
| SnowflakeKafkaConnectorException error = this.channelErrorToFailOn; | ||
| this.channelErrorToFailOn = null; // Clear so we don't throw again on restart |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is redundant. After exception is thrown in the next line a task will essentially die and be removed from Kafka Connect state. After restart a new instance is created that will have null set on construction.
| channel.setChannelStatus(statusWithErrors); | ||
| } | ||
|
|
||
| // Produce more messages to trigger preCommit which checks channel status |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need to insert any records to trigger precommit. It's time-based operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
| Map<String, String> workerConfig = new HashMap<>(); | ||
| workerConfig.put("plugin.discovery", "hybrid_warn"); | ||
| // Set a short offset flush interval for faster preCommit calls | ||
| workerConfig.put("offset.flush.interval.ms", "5000"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reducing it from 5000 to 1000 speeds up test execution by 8 seconds. Imo it's worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
sfc-gh-mbobowski
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left two comments about redundant code, but generally it's ok.
3a7e83f to
fcb18d2
Compare
3a25fc6 to
5cb9de0
Compare
fcb18d2 to
f64ffed
Compare
5cb9de0 to
b328611
Compare
f64ffed to
c88f053
Compare
20996cd to
9ed4eca
Compare
9ed4eca to
b9aecc6
Compare

Overview
FLOW-7395 React to channel error count
Pre-review checklist
snowflake.ingestion.method.Yes- Added end to end and Unit Tests.No- Suggest why it is not param protected