Skip to content

Conversation

@sarutak
Copy link
Member

@sarutak sarutak commented Nov 28, 2025

What changes were proposed in this pull request?

This PR proposes to upgrade KCL to 2.7.1 based on @junyuc25 's PR with some updates.
By upgrading KCL, we can remove AWS SDK for Java 1.x dependency.

Why are the changes needed?

Does this PR introduce any user-facing change?

Expect the behavior is not changed.

How was this patch tested?

Confirmed all kinesis tests passed with the following commands.

  • SBT
$ ENABLE_KINESIS_TESTS=1 nohup ./build/sbt -Pkinesis-asl 'streaming-kinesis-asl/test'
  • Maven
$ ENABLE_KINESIS_TESTS=1 build/mvn  -Pkinesis-asl -Dtest=org.apache.spark.streaming.kinesis.JavaKinesisInputDStreamBuilderSuite -DwildcardSuit\
es=org.apache.spark.streaming.kinesis test

Also confirmed existing examples work.

# Need to do `build/sbt -Pkinesis-asl package` beforehand

# Producer
$ bin/run-example streaming.KinesisWordProducerASL kinesis-example-stream  https://kinesis.us-west-2.amazonaws.com 10 5	

# Consumer
$ bin/run-example streaming.KinesisWordCountASL my-stream-app kinesis-example-stream https://kinesis.us-west-2.amazonaws.com

Was this patch authored or co-authored using generative AI tooling?

No.

<!-- Should be consistent with SparkBuild.scala and docs -->
<avro.version>1.12.1</avro.version>
<aws.kinesis.client.version>1.15.3</aws.kinesis.client.version>
<aws.kinesis.client.version>2.7.1</aws.kinesis.client.version>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2.7.1 is not the latest version but this version works with AWS SDK for Java 2.29.52 Spark currently depends on.
2.7.2 depends on the SDK 2.33.0 and noticed it doesn't works with 2.29.52.

val testData3 = 21 to 30

eventually(timeout(1.minute), interval(10.seconds)) {
eventually(timeout(2.minute), interval(10.seconds)) {
Copy link
Member Author

@sarutak sarutak Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* @param shardEndedInput provides access to a checkpointer method for completing processing of
* the shard.
*/
override def shutdown(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown is divided into three functions for the corresponding shutdown reasons.

  • TERMINATE => shardEnded
  • REQUESTED => shutdownRequested
  • ZOMBIE => leaseLost

In the current code, REQUESTED is not cared but in this case checkpoint can be done. So, checkpoint is done in shutdownRequested.

@dongjoon-hyun
Copy link
Member

Thank you so much, @sarutak .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants