Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 50 additions & 4 deletions connector/kinesis-asl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,60 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>${aws.kinesis.client.version}</version>
<exclusions>
<!--
mbknor-jackson-jsonschema is necessary at runtime only if JSON format schema is
registered using GlueSchemaRegistry. kinesis-asl currently doesn't use this feature
so it should be safe to exclude the dependency.
-->
<exclusion>
<groupId>com.kjetland</groupId>
<artifactId>mbknor-jackson-jsonschema_2.12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
<version>${aws.java.sdk.v2.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
<version>${aws.java.sdk.v2.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
<version>${aws.java.sdk.v2.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>regions</artifactId>
<version>${aws.java.sdk.v2.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
<version>${aws.java.sdk.v2.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kinesis</artifactId>
<version>${aws.java.sdk.v2.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>cloudwatch</artifactId>
<version>${aws.java.sdk.v2.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws.java.sdk.version}</version>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sdk-core</artifactId>
<version>${aws.java.sdk.v2.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.kinesis</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.spark.examples.streaming;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -38,8 +39,10 @@
import scala.Tuple2;
import scala.reflect.ClassTag$;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;

/**
* Consumes messages from a Amazon Kinesis streams and does wordcount.
Expand All @@ -66,7 +69,7 @@
* There is a companion helper class called KinesisWordProducerASL which puts dummy data
* onto the Kinesis stream.
*
* This code uses the DefaultAWSCredentialsProviderChain to find credentials
* This code uses the DefaultCredentialsProvider to find credentials
* in the following order:
* Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
* Java System Properties - aws.accessKeyId and aws.secretKey
Expand Down Expand Up @@ -106,11 +109,19 @@ public static void main(String[] args) throws Exception {
String endpointUrl = args[2];

// Create a Kinesis client in order to determine the number of shards for the given stream
AmazonKinesisClient kinesisClient =
new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain());
kinesisClient.setEndpoint(endpointUrl);
KinesisClient kinesisClient =
KinesisClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create())
.endpointOverride(URI.create(endpointUrl))
.httpClientBuilder(ApacheHttpClient.builder())
.build();

DescribeStreamRequest describeStreamRequest =
DescribeStreamRequest.builder()
.streamName(streamName)
.build();
int numShards =
kinesisClient.describeStream(streamName).getStreamDescription().getShards().size();
kinesisClient.describeStream(describeStreamRequest).streamDescription().shards().size();


// In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.streaming.kinesis;

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStream;

import java.io.Serializable;
import java.util.Date;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ package org.apache.spark.examples.streaming

import scala.jdk.CollectionConverters._

import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesis
import software.amazon.awssdk.regions.servicemetadata.KinesisServiceMetadata

private[streaming] object KinesisExampleUtils {
def getRegionNameByEndpoint(endpoint: String): String = {
val uri = new java.net.URI(endpoint)
RegionUtils.getRegionsForService(AmazonKinesis.ENDPOINT_PREFIX)
val kinesisServiceMetadata = new KinesisServiceMetadata()
kinesisServiceMetadata.regions
.asScala
.find(_.getAvailableEndpoints.asScala.toSeq.contains(uri.getHost))
.map(_.getName)
.find(r => kinesisServiceMetadata.endpointFor(r).toString.equals(uri.getHost))
.map(_.id)
.getOrElse(
throw new IllegalArgumentException(s"Could not resolve region for endpoint: $endpoint"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
// scalastyle:off println
package org.apache.spark.examples.streaming

import java.net.URI
import java.nio.ByteBuffer

import scala.util.Random

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.model.PutRecordRequest
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.core.config.Configurator
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.{DescribeStreamRequest, PutRecordRequest}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -101,13 +104,22 @@ object KinesisWordCountASL extends Logging {

// Determine the number of shards from the stream using the low-level Kinesis Client
// from the AWS Java SDK.
val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
require(credentials != null,
val credentialsProvider = DefaultCredentialsProvider.create
require(credentialsProvider.resolveCredentials() != null,
"No AWS credentials found. Please specify credentials using one of the methods specified " +
"in http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/credentials.html")
val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
"in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html")
val kinesisClient = KinesisClient.builder()
.credentialsProvider(credentialsProvider)
.endpointOverride(URI.create(endpointUrl))
.httpClientBuilder(ApacheHttpClient.builder())
.build()
val describeStreamRequest = DescribeStreamRequest.builder()
.streamName(streamName)
.build()
val numShards = kinesisClient.describeStream(describeStreamRequest)
.streamDescription
.shards
.size


// In this example, we're going to create 1 Kinesis Receiver/input DStream for each shard.
Expand Down Expand Up @@ -221,8 +233,11 @@ object KinesisWordProducerASL {
val totals = scala.collection.mutable.Map[String, Int]()

// Create the low-level Kinesis Client from the AWS Java SDK.
val kinesisClient = new AmazonKinesisClient(new DefaultAWSCredentialsProviderChain())
kinesisClient.setEndpoint(endpoint)
val kinesisClient = KinesisClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create())
.endpointOverride(URI.create(endpoint))
.httpClientBuilder(ApacheHttpClient.builder())
.build()

println(s"Putting records onto stream $stream and endpoint $endpoint at a rate of" +
s" $recordsPerSecond records per second and $wordsPerRecord words per record")
Expand All @@ -247,12 +262,14 @@ object KinesisWordProducerASL {
val partitionKey = s"partitionKey-$recordNum"

// Create a PutRecordRequest with an Array[Byte] version of the data
val putRecordRequest = new PutRecordRequest().withStreamName(stream)
.withPartitionKey(partitionKey)
.withData(ByteBuffer.wrap(data.getBytes()))
val putRecordRequest = PutRecordRequest.builder()
.streamName(stream)
.partitionKey(partitionKey)
.data(SdkBytes.fromByteBuffer(ByteBuffer.wrap(data.getBytes())))
.build()

// Put the record onto the stream and capture the PutRecordResult
val putRecordResult = kinesisClient.putRecord(putRecordRequest)
kinesisClient.putRecord(putRecordRequest)
}

// Sleep for a second
Expand Down
Loading