Skip to content

Use MSK CreateTopic API, upgrade Kafka to 3.8.x, and replace pykafka with confluent-kafka#4

Open
revsystem wants to merge 14 commits intoaws-samples:mainfrom
revsystem:contrib/msk-create-topic-api
Open

Use MSK CreateTopic API, upgrade Kafka to 3.8.x, and replace pykafka with confluent-kafka#4
revsystem wants to merge 14 commits intoaws-samples:mainfrom
revsystem:contrib/msk-create-topic-api

Conversation

@revsystem
Copy link

Issue #, if available: #3

Description of changes:

Summary

This PR simplifies the setup process and replaces deprecated dependencies across the sample project.

Changes

CloudFormation template (templates/bedrock-kb-stream-ingest.yml)

  • Upgrade MSK Kafka version from 2.8.0 to 3.8.x (AWS recommended; required for CreateTopic API)
  • Add KnowledgeBaseName and DataSourceName as CloudFormation Parameters with sensible defaults
  • Add stack Outputs for KnowledgeBaseName and DataSourceName
  • Add IAM permissions for MSK topic management APIs (kafka:CreateTopic, kafka:ListTopics, kafka:DescribeTopic, kafka-cluster:CreateTopic) and cloudformation:DescribeStacks

1.Setup.ipynb

  • Replace 7-step terminal procedure (Java, wget, Kafka client, IAM auth JAR installation) with 2 notebook cells using MSK CreateTopic and ListTopics APIs via boto3
  • Add boto3/botocore upgrade cell with guidance for SageMaker Studio dependency conflict messages
  • Retrieve KB/DS names from CloudFormation stack Outputs via DescribeStacks, then look up Knowledge Base and Data Source by exact name match
  • Store KBName and DSName via %store for use in subsequent notebooks

2.StreamIngest.ipynb

  • Replace pykafka (archived 2021, last release 2018) with confluent-kafka v2.13.0 (actively maintained, Kafka 3.9.x compatible)
  • Add %store -r guard before the data ingestion cell to provide a clear error message if variables from 1.Setup are missing
  • Use timezone-aware datetime (timezone.utc) instead of deprecated datetime.utcfromtimestamp()
  • Place TestData.csv in notebooks/ (same directory as notebook) for simpler path handling

Other

  • Add notebooks/TestData.csv (was previously at data/TestData.csv which did not exist in the repository)

Testing

All changes verified end-to-end:

  1. Deployed CloudFormation stack with default parameters
  2. Ran 1.Setup.ipynb: Kafka topic created via CreateTopic API, KB/DS discovered by name
  3. Ran 2.StreamIngest.ipynb: Messages produced via confluent-kafka, ingested into Bedrock KB
  4. Ran 3.Cleanup.ipynb: Event source mapping deleted successfully

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Made with Cursor

revsystem and others added 14 commits February 15, 2026 04:50
…de TestData.csv for stream ingestion testing; update notebooks for Kafka integration using confluent-kafka
…a in 2.StreamIngest

- Template: KafkaVersion 2.8.0 -> 3.8.x (AWS recommended)
- Template: Add Parameters KnowledgeBaseName, DataSourceName with defaults; Outputs; DescribeStacks IAM
- Template: SageMakerMSKAccessPolicy + kafka:CreateTopic, kafka:ListTopics, kafka:DescribeTopic, kafka-cluster:CreateTopic
- 1.Setup: Replace terminal topic creation with boto3 upgrade + CreateTopic + ListTopics; get KBName/DSName from stack outputs; KB/DS lookup by KBName/DSName
- 2.StreamIngest: Migrate from pykafka to confluent-kafka; ../data/TestData.csv; timezone-aware datetime
- 3.Cleanup: No change (identical to upstream)

Co-authored-by: Cursor <cursoragent@cursor.com>
…aint

Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
- Move data/TestData.csv to notebooks/TestData.csv
- Update 2.StreamIngest to pd.read_csv('TestData.csv')
- Update CLAUDE.md references

Co-authored-by: Cursor <cursoragent@cursor.com>
…enhancing clarity

This update aims to simplify the setup process and improve the overall readability of the notebook.
…instructions and code structure improvements

- Added markdown cell in 1.Setup notebook to clarify boto3/botocore upgrade requirements and handling dependency conflicts.
- Improved code structure in 2.StreamIngest notebook by removing redundant lines and ensuring consistent formatting.
- Included installation command for confluent-kafka in 2.StreamIngest notebook.
Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
…nused Export)

- Align notebook JSON indentation with upstream (1-space)
- Preserve upstream trailing whitespace in unchanged lines
- Remove unused Export from CloudFormation Outputs

Co-authored-by: Cursor <cursoragent@cursor.com>
…3 layer integration

- Add instructions for creating and using a boto3 Lambda layer in CLAUDE.md
- Update CloudFormation template to include Boto3LayerArn parameter and reference it in the Lambda function
- Modify .gitignore to exclude additional build and configuration files

Co-authored-by: Cursor <cursoragent@cursor.com>
@revsystem revsystem force-pushed the contrib/msk-create-topic-api branch from 60b4930 to 11e35dc Compare February 26, 2026 15:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant