Overview
AWS Kinesis integration enables real-time streaming of product updates and order events to Kinesis data streams. This allows downstream systems to consume and process e-commerce events in near real-time for analytics, inventory management, and third-party integrations.Configuration
Kinesis is configured with AWS credentials and stream names:Required Environment Variables
Kinesis Streams
Product Stream
Streams product creation and update events:- Stream Name:
KINESIS_PRODUCT_STREAM - Partition Key: Product ID
- Event Types: Product create, product update
Product Update Stream
Optional separate stream for product updates only:- Stream Name:
KINESIS_PRODUCT_UPDATE_STREAM - Partition Key: Product ID
- Event Types: Product updates only (when
SEPARATE_SKU_CREATE_UPDATE_STREAM=true)
Order Stream
Streams order events:- Stream Name:
KINESIS_ORDERS_STREAM - Partition Key: Order ID
- Event Types: Order create, status change, cancellation
Streaming Products
Trigger Product Sync
Products are synced to Kinesis via Celery tasks:Product Data Structure
Products are transformed into a comprehensive JSON structure:Product Sync Process
The product sync process (see tasks.py:199):- Fetch Product Data - Get parent product ID if variant
- Build Relationships - Collect categories, tags, variants, locations, pricelists
- Transform Data - Create comprehensive product object
- Determine Stream - Select create or update stream based on
is_updatedflag - Queue for Kinesis - Pass to rate-limited insertion task
- Insert Record - Write to Kinesis with product ID as partition key
Streaming Orders
Trigger Order Sync
Order Data Structure
Order Sync Process
The order sync process (see tasks.py:247):- Fetch Order Details - Get order from database
- Build Line Items - Collect product details for each order item
- Build Gift Vouchers - Include applied voucher information
- Transform Data - Create comprehensive order object
- Queue for Kinesis - Pass to insertion task
- Insert Record - Write to Kinesis with order ID as partition key
Kinesis Client Implementation
The Kinesis client is implemented in sync/Kinesis.py:Rate Limiting
Kinesis insertions are rate-limited to prevent throttling:KINESIS_INSERT_RATE_LIMIT (e.g., 100/m for 100 records per minute)
Partition Keys
Partition keys determine data distribution across Kinesis shards:- Product Stream: Product ID ensures all updates for a product go to the same shard
- Order Stream: Order ID ensures order events are processed in sequence
Stream Separation
Optionally separate product creates and updates into different streams:Error Handling
Failed Records
Kinesis API returns failed record count:Retry Logic
Implement exponential backoff for failed inserts:Performance Metrics
Track sync performance:Monitoring
Monitor Kinesis integration health:
- CloudWatch Metrics: Monitor PutRecords success/failure rates
- Task Queue Depth: Monitor Celery queue length for Kinesis tasks
- Processing Lag: Track time between product update and Kinesis insertion
- Failed Records: Alert on failed record count > 0
- Rate Limit Hits: Monitor for throttling errors
Best Practices
Consumer Implementation
Consume Kinesis streams with AWS Lambda or KCL:Related Resources
Celery Tasks
Background tasks for Kinesis sync operations
Search API
Dgraph synchronization for search functionality