A comprehensive notification processing system built with Python, FastAPI, and Kafka for scalable, real-time event-driven notifications across multiple channels.
NotificationHub consists of two main services that work together to process events and deliver notifications:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Event Sources │───▶│ Core Service │───▶│ Channels Service│
│ (order, payment,│ │ (transformation │ │ (email, sms, │
│ user events) │ │ & routing) │ │ webhook, slack) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Kafka Topics │ │ Notification │
│ (input events) │ │ Delivery │
└─────────────────┘ └─────────────────┘
- Event Ingestion: External systems send events (orders, payments, user actions) to Kafka input topics
- Event Processing: Core Service consumes events, transforms them into notifications, and applies routing rules
- Notification Delivery: Channels Service consumes notifications and delivers them through appropriate channels
- Multi-Channel Support: Each notification channel (email, SMS, webhook, Slack) runs as an independent consumer
NotificationHub/
├── main.py # Main application entry point
├── requirements.txt # Python dependencies
├── kafka-local/ # Local Kafka setup
│ ├── docker-compose.yml # Kafka cluster configuration
│ └── scripts/ # Kafka management scripts
├── services/
│ ├── notification-core-service/ # Event processing service
│ │ ├── app/
│ │ │ ├── main.py # Core service entry point
│ │ │ ├── core/ # Business logic components
│ │ │ │ ├── consumer.py # Kafka event consumer
│ │ │ │ ├── producer.py # Kafka notification producer
│ │ │ │ └── transformer.py # Event-to-notification transformer
│ │ │ ├── config/ # Configuration management
│ │ │ └── utils/ # Utility functions
│ │ ├── config.env # Service configuration
│ │ └── requirements.txt # Service dependencies
│ └── notification-channels-service/ # Notification delivery service
│ ├── consumers/ # Channel-specific consumers
│ │ ├── email_consumer.py # Email notification consumer
│ │ ├── sms_consumer.py # SMS notification consumer
│ │ ├── webhook_consumer.py # Webhook notification consumer
│ │ └── slack_consumer.py # Slack notification consumer
│ ├── common/ # Shared utilities
│ ├── config.env # Service configuration
│ ├── docker-compose.yml # Multi-container setup
│ └── requirements.txt # Service dependencies
└── tests/ # Integration tests
├── e2e/ # End-to-end tests
├── run_e2e_test.py # Test runner
└── requirements.txt # Test dependencies
- Docker & Docker Compose (for Kafka)
- Python 3.10+
- Git
git clone <repository-url>
cd NotificationHub
# Create virtual environment
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install main dependencies
pip install -r requirements.txtcd kafka-local
docker-compose up -dThis starts:
- Zookeeper (port 2181)
- Kafka broker (port 9094)
- Kafka UI (port 8080)
cd services/notification-core-service
pip install -r requirements.txt
python app/main.pycd services/notification-channels-service
pip install -r requirements.txt
docker-compose up -dThis starts all notification consumers:
- Email consumer
- SMS consumer
- Webhook consumer
- Slack consumer
# Run integration tests
cd tests
pip install -r requirements.txt
python run_e2e_test.pyThe system uses these Kafka topics:
Input Topics (Core Service consumes):
order.events- Order-related eventspayment.events- Payment-related eventsuser.events- User-related events
Output Topics (Channels Service consumes):
notifications.email- Email notificationsnotifications.sms- SMS notificationsnotifications.webhook- Webhook notificationsnotifications.slack- Slack notifications
| Event Type | Channels |
|---|---|
order.created |
email, webhook, slack |
order.cancelled |
email, sms |
payment.success |
|
payment.failed |
email, sms |
user.registered |
- Core Service: Check logs for event processing
- Channels Service: Check Docker container status
- Kafka: Access Kafka UI at http://localhost:8080
# Core Service logs
tail -f services/notification-core-service/logs/app.log
# Channels Service logs
docker-compose -f services/notification-channels-service/docker-compose.yml logs -fcd tests
python run_e2e_test.pyTests verify:
- Event ingestion from input topics
- Event transformation and routing
- Notification delivery through all channels
- End-to-end message flow
# Send test events to Kafka
cd kafka-local/scripts
./producer.sh order.events '{"event_type":"order.created","user_id":"test123","order_id":"ORD001","amount":99.99}'- Create consumer in
services/notification-channels-service/consumers/ - Add topic configuration in
common/config.py - Update routing rules in Core Service
- Add to Docker Compose configuration
- Update routing rules in
services/notification-core-service/app/config/settings.py - Test with integration tests
- Update documentation
Kafka Connection Failed
# Check Kafka status
docker-compose -f kafka-local/docker-compose.yml ps
# Check logs
docker-compose -f kafka-local/docker-compose.yml logs kafkaMessages Not Consumed
# Check consumer group offsets
cd kafka-local/scripts
./consumer.sh notifications.emailServices Not Starting
# Check dependencies
pip install -r requirements.txt
# Check configuration
cat services/*/config.env- Throughput: Handles thousands of events per second
- Latency: Sub-second notification delivery
- Scalability: Horizontal scaling via Kafka partitioning
- Reliability: At-least-once delivery guarantees
- Environment-based configuration
- Kafka SASL authentication support
- Input validation on all events
- Secure Docker container deployment
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
MIT License - see LICENSE file for details.
Built with ❤️ using Python, FastAPI, and Kafka