Skip to main content

Overview

AWS Kinesis integration enables real-time streaming of product updates and order events to Kinesis data streams. This allows downstream systems to consume and process e-commerce events in near real-time for analytics, inventory management, and third-party integrations.

Configuration

Kinesis is configured with AWS credentials and stream names:
import boto3
import settings

client = boto3.client(
    'kinesis',
    aws_access_key_id=settings.AWS_ACCESS_KEY,
    aws_secret_access_key=settings.AWS_SECRET_KEY,
    region_name=settings.AWS_REGION
)

Required Environment Variables

# AWS Credentials
AWS_ACCESS_KEY=your_access_key
AWS_SECRET_KEY=your_secret_key
AWS_REGION=ap-south-1

# Kinesis Stream Names
KINESIS_PRODUCT_STREAM=product-updates
KINESIS_PRODUCT_UPDATE_STREAM=product-updates-only
KINESIS_ORDERS_STREAM=order-events

# Rate Limiting
KINESIS_INSERT_RATE_LIMIT=100/m

# Stream Separation
SEPARATE_SKU_CREATE_UPDATE_STREAM=true

Kinesis Streams

Product Stream

Streams product creation and update events:
  • Stream Name: KINESIS_PRODUCT_STREAM
  • Partition Key: Product ID
  • Event Types: Product create, product update

Product Update Stream

Optional separate stream for product updates only:
  • Stream Name: KINESIS_PRODUCT_UPDATE_STREAM
  • Partition Key: Product ID
  • Event Types: Product updates only (when SEPARATE_SKU_CREATE_UPDATE_STREAM=true)

Order Stream

Streams order events:
  • Stream Name: KINESIS_ORDERS_STREAM
  • Partition Key: Order ID
  • Event Types: Order create, status change, cancellation

Streaming Products

Trigger Product Sync

Products are synced to Kinesis via Celery tasks:
from tasks import sync_kinesis_products, insert_into_kinesis

# Sync product to Kinesis (triggered by product update)
result = sync_kinesis_products.delay([12345])

# Returns: (data, stream_name, partition_key)
data, stream_name, partition_key = result.get()

# Insert into Kinesis with rate limiting
insert_into_kinesis.delay([data, stream_name, partition_key])

Product Data Structure

Products are transformed into a comprehensive JSON structure:
{
  "product_id": 12345,
  "parent_product_id": 12340,
  "name": "Premium Cotton T-Shirt",
  "description": "Comfortable cotton t-shirt",
  "price": 999.00,
  "spl_price": 799.00,
  "status": 1,
  "stock": 150,
  "is_updated": true,
  "categories": [
    {
      "category_id": 10,
      "name": "Men's Clothing",
      "parent_id": 1
    }
  ],
  "tags": [
    {"tag_id": 5, "name": "trending"},
    {"tag_id": 12, "name": "new-arrival"}
  ],
  "variants": [
    {
      "variant_id": 12346,
      "size": "M",
      "color": "Blue",
      "stock": 50
    }
  ],
  "locations": [
    {
      "location_id": 1,
      "location_code": "MH-01",
      "stock": 25
    }
  ],
  "pricelists": [
    {
      "pricelist_id": 100,
      "price": 699.00,
      "type": "exclusive"
    }
  ],
  "images": [
    "https://cdn.example.com/products/12345/img1.jpg",
    "https://cdn.example.com/products/12345/img2.jpg"
  ],
  "created_at": "2026-03-08T10:30:00Z",
  "updated_at": "2026-03-08T15:45:00Z"
}

Product Sync Process

The product sync process (see tasks.py:199):
  1. Fetch Product Data - Get parent product ID if variant
  2. Build Relationships - Collect categories, tags, variants, locations, pricelists
  3. Transform Data - Create comprehensive product object
  4. Determine Stream - Select create or update stream based on is_updated flag
  5. Queue for Kinesis - Pass to rate-limited insertion task
  6. Insert Record - Write to Kinesis with product ID as partition key
from sync.Product import SyncKinesisProduct

# Initialize sync
sync_product = SyncKinesisProduct()
sync_product.product_ids = [12345]

# Build related data
sync_product.make_product_categories()
sync_product.make_product_tags()
sync_product.make_product_variants()
sync_product.make_product_locations()
sync_product.make_product_pricelists()

# Generate payload
data = sync_product.make_products()

Streaming Orders

Trigger Order Sync

from tasks import sync_kinesis_orders, insert_into_kinesis

# Sync order to Kinesis
result = sync_kinesis_orders.delay(order_id=54321)
data, stream_name, partition_key = result.get()

# Insert into Kinesis
insert_into_kinesis.delay([data, stream_name, partition_key])

Order Data Structure

{
  "order_id": 54321,
  "user_id": 12345,
  "order_number": "ORD-2026-54321",
  "status": "confirmed",
  "payment_status": "paid",
  "total_amount": 2499.00,
  "discount_amount": 200.00,
  "shipping_amount": 100.00,
  "tax_amount": 449.82,
  "line_items": [
    {
      "product_id": 12345,
      "variant_id": 12346,
      "quantity": 2,
      "price": 799.00,
      "discount": 100.00
    }
  ],
  "gift_vouchers": [
    {
      "voucher_code": "GIFT100",
      "amount": 100.00
    }
  ],
  "shipping_address": {
    "name": "John Doe",
    "phone": "9876543210",
    "address_line1": "123 Main St",
    "city": "Mumbai",
    "state": "Maharashtra",
    "pincode": "400001"
  },
  "created_at": "2026-03-08T10:30:00Z",
  "updated_at": "2026-03-08T10:35:00Z"
}

Order Sync Process

The order sync process (see tasks.py:247):
  1. Fetch Order Details - Get order from database
  2. Build Line Items - Collect product details for each order item
  3. Build Gift Vouchers - Include applied voucher information
  4. Transform Data - Create comprehensive order object
  5. Queue for Kinesis - Pass to insertion task
  6. Insert Record - Write to Kinesis with order ID as partition key
from sync.Order import SyncKinesisOrder

# Initialize sync
sync_order = SyncKinesisOrder()
sync_order.order_id = 54321

# Build related data
sync_order.make_line_items()
sync_order.make_gift_vouchers()

# Generate payload
data = sync_order.make_orders()

Kinesis Client Implementation

The Kinesis client is implemented in sync/Kinesis.py:
import json
import boto3
import settings
from utils.logger import get_logger

logger = get_logger()

class Kinesis:
    @staticmethod
    def insert_into_kinesis(data, stream_name, partition_key):
        """
        Insert record into Kinesis stream
        
        Args:
            data: Dictionary containing event data
            stream_name: Name of Kinesis stream
            partition_key: Partition key for sharding (product_id or order_id)
        """
        client = boto3.client(
            'kinesis',
            aws_access_key_id=settings.AWS_ACCESS_KEY,
            aws_secret_access_key=settings.AWS_SECRET_KEY,
            region_name=settings.AWS_REGION
        )
        
        partition_key = str(partition_key)
        logger.debug(f"Kinesis data: {data}")
        
        try:
            response = client.put_records(
                Records=[
                    {
                        'Data': json.dumps(data),
                        'PartitionKey': partition_key
                    },
                ],
                StreamName=stream_name
            )
            
            logger.info(f"Kinesis response: {response}, synced_id: {partition_key}")
            
            if response['FailedRecordCount'] > 0:
                logger.warning(f"Failed to sync some records for id: {partition_key}")
                # Failed records are logged and can be retried based on application logic
                
        except Exception as e:
            logger.error(f"Exception in sync for id: {partition_key}, error: {str(e)}")
            raise

Rate Limiting

Kinesis insertions are rate-limited to prevent throttling:
@app.task(rate_limit=settings.KINESIS_INSERT_RATE_LIMIT)
def insert_into_kinesis(data):
    data_to_insert = data[0]
    stream_name = data[1]
    partition_key = data[2]
    
    logger.debug(
        f"Kinesis insert details | "
        f"StreamName={stream_name} | "
        f"PartitionKey={partition_key} | "
        f"Data={data_to_insert}"
    )
    
    if data_to_insert and stream_name and partition_key:
        Kinesis.insert_into_kinesis(data_to_insert, stream_name, partition_key)
    else:
        logger.warning(f"Nothing to sync to Kinesis")
Configuration: Set KINESIS_INSERT_RATE_LIMIT (e.g., 100/m for 100 records per minute)

Partition Keys

Partition keys determine data distribution across Kinesis shards:
  • Product Stream: Product ID ensures all updates for a product go to the same shard
  • Order Stream: Order ID ensures order events are processed in sequence
# Partition key is extracted from data
partition_key = data['product_id'] if 'product_id' in data else None
partition_key = data['order_id'] if 'order_id' in data else None

Stream Separation

Optionally separate product creates and updates into different streams:
kinesis_product_stream = settings.KINESIS_PRODUCT_STREAM

if settings.SEPARATE_SKU_CREATE_UPDATE_STREAM:
    logger.info(f"Separating SKU Create Update Stream for Product {product_ids}")
    if data.get('is_updated'):
        kinesis_product_stream = settings.KINESIS_PRODUCT_UPDATE_STREAM

logger.info(f"Kinesis Stream for Product {product_ids}: {kinesis_product_stream}")
This allows different processing logic for new products vs. updates.

Error Handling

Failed Records

Kinesis API returns failed record count:
if response['FailedRecordCount'] > 0:
    logger.warning(f"Failed to sync some records for id: {partition_key}")
    # Implement retry logic

Retry Logic

Implement exponential backoff for failed inserts:
from libs.celery import app

@app.task(bind=True, max_retries=3)
def insert_with_retry(self, data, stream_name, partition_key):
    try:
        Kinesis.insert_into_kinesis(data, stream_name, partition_key)
    except Exception as exc:
        # Retry with exponential backoff
        raise self.retry(
            exc=exc,
            countdown=2 ** self.request.retries  # 2, 4, 8 seconds
        )

Performance Metrics

Track sync performance:
import time

# Track time taken
sync_product.time_taken = time.time()
# ... sync operations ...
sync_product.time_taken = time.time() - sync_product.time_taken
sync_product.print_time_information()

Monitoring

Monitor Kinesis integration health:
  • CloudWatch Metrics: Monitor PutRecords success/failure rates
  • Task Queue Depth: Monitor Celery queue length for Kinesis tasks
  • Processing Lag: Track time between product update and Kinesis insertion
  • Failed Records: Alert on failed record count > 0
  • Rate Limit Hits: Monitor for throttling errors

Best Practices

Kinesis Integration Guidelines:
  1. Use Appropriate Partition Keys - Ensures even shard distribution
  2. Rate Limit Insertions - Prevents API throttling
  3. Handle Failures Gracefully - Implement retry logic with backoff
  4. Monitor Failed Records - Set up alerts for insertion failures
  5. Separate Stream Types - Use different streams for creates vs. updates if needed
  6. Batch When Possible - Use put_records (plural) for multiple records
  7. Log Thoroughly - Track all insertions for debugging
Common Issues:
  • Throttling: Exceeding shard write capacity (1 MB/s or 1000 records/s per shard)
  • Partition Key Skew: Uneven distribution causing hot shards
  • Large Payloads: Records > 1 MB will be rejected
  • IAM Permissions: Ensure AWS credentials have kinesis:PutRecords permission

Consumer Implementation

Consume Kinesis streams with AWS Lambda or KCL:
import boto3
import json

def lambda_handler(event, context):
    """
    Lambda function to process Kinesis records
    """
    for record in event['Records']:
        # Kinesis data is base64 encoded
        payload = json.loads(record['kinesis']['data'])
        
        # Process product/order event
        if 'product_id' in payload:
            process_product_event(payload)
        elif 'order_id' in payload:
            process_order_event(payload)

Celery Tasks

Background tasks for Kinesis sync operations

Search API

Dgraph synchronization for search functionality

Build docs developers (and LLMs) love