Skip to main content
Build sophisticated applications that leverage the best of both TypeScript and Python ecosystems. This example demonstrates how to create seamless multi-language workflows where TypeScript and Python steps work together as a unified system.

Overview

This example shows how to:
  • Combine TypeScript and Python steps in a single application
  • Share data between TypeScript and Python using queues
  • Leverage Python ML/AI libraries with TypeScript APIs
  • Build cohesive workflows across language boundaries
  • Deploy polyglot applications with Motia

Architecture

A multi-language application with complementary responsibilities:
1
1. TypeScript API Layer
2
Handle HTTP requests, input validation, and client-facing endpoints.
3
2. Python Processing Layer
4
Perform data science, machine learning, or compute-intensive operations.
5
3. Queue-Based Communication
6
Exchange data between TypeScript and Python steps using typed queues.
7
4. Shared State Management
8
Access common state from both TypeScript and Python steps.

Implementation

Step 1: TypeScript API Endpoint

Create a TypeScript endpoint that receives requests:
// steps/api/submit-analysis.step.ts
import { type Handlers, http, type StepConfig } from 'motia'
import { z } from 'zod'

const requestSchema = z.object({
  text: z.string(),
  userId: z.string(),
  options: z.object({
    sentiment: z.boolean().optional().default(true),
    entities: z.boolean().optional().default(true),
    summary: z.boolean().optional().default(false),
  }).optional().default({}),
})

export const config = {
  name: 'Submit Analysis Request',
  description: 'TypeScript API endpoint for text analysis',
  flows: ['multi-language-app'],
  triggers: [
    {
      type: 'http',
      method: 'POST',
      path: '/api/analyze',
      bodySchema: requestSchema,
    },
  ],
  enqueues: ['python.analyze.text'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  { request },
  { logger, enqueue, state }
) => {
  const { text, userId, options } = request.body

  logger.info('Analysis request received', {
    userId,
    textLength: text.length,
    options,
  })

  // Generate analysis ID
  const analysisId = crypto.randomUUID()

  // Store initial state
  await state.set('analyses', analysisId, {
    userId,
    status: 'pending',
    createdAt: Date.now(),
    textLength: text.length,
    options,
  })

  // Enqueue for Python processing
  await enqueue({
    topic: 'python.analyze.text',
    data: {
      analysisId,
      text,
      userId,
      options,
    },
  })

  logger.info('Analysis request queued', { analysisId })

  return {
    status: 202,
    body: {
      message: 'Analysis request accepted',
      analysisId,
      status: 'pending',
    },
  }
}

Step 2: Python Text Analysis

Process the text using Python NLP libraries:
# steps/processing/analyze_text_step.py
import json
from typing import Any
from motia import FlowContext, queue

# Import Python ML/NLP libraries
try:
    from transformers import pipeline
    from textblob import TextBlob
except ImportError:
    print("Warning: ML libraries not installed")

config = {
    "name": "Analyze Text (Python)",
    "description": "Performs NLP analysis using Python libraries",
    "flows": ["multi-language-app"],
    "triggers": [
        {
            "type": "queue",
            "topic": "python.analyze.text",
        },
    ],
    "enqueues": ["ts.analysis.complete"],
}

# Initialize ML models (in production, use lazy loading)
sentiment_analyzer = None
entity_extractor = None

def get_sentiment_analyzer():
    global sentiment_analyzer
    if sentiment_analyzer is None:
        sentiment_analyzer = pipeline("sentiment-analysis")
    return sentiment_analyzer

def get_entity_extractor():
    global entity_extractor
    if entity_extractor is None:
        entity_extractor = pipeline("ner")
    return entity_extractor

async def handler(request: dict[str, Any], ctx: FlowContext[Any]) -> None:
    """Analyze text using Python NLP libraries."""
    analysis_id = request.get("analysisId")
    text = request.get("text", "")
    user_id = request.get("userId")
    options = request.get("options", {})

    ctx.logger.info(f"Starting Python analysis: {analysis_id}")

    # Update status to processing
    await ctx.state.update("analyses", analysis_id, [
        {"type": "set", "path": "status", "value": "processing"},
        {"type": "set", "path": "processingStartedAt", "value": ctx.now()},
    ])

    results = {}

    # Perform sentiment analysis
    if options.get("sentiment"):
        try:
            analyzer = get_sentiment_analyzer()
            sentiment_result = analyzer(text[:512])[0]  # Limit text length
            
            results["sentiment"] = {
                "label": sentiment_result["label"],
                "score": sentiment_result["score"],
            }
            
            ctx.logger.info(f"Sentiment: {sentiment_result['label']}")
        except Exception as e:
            ctx.logger.error(f"Sentiment analysis failed: {e}")
            results["sentiment"] = {"error": str(e)}

    # Extract named entities
    if options.get("entities"):
        try:
            extractor = get_entity_extractor()
            entities = extractor(text[:512])
            
            # Group entities by type
            entity_groups = {}
            for entity in entities:
                entity_type = entity["entity"]
                if entity_type not in entity_groups:
                    entity_groups[entity_type] = []
                entity_groups[entity_type].append({
                    "text": entity["word"],
                    "score": entity["score"],
                })
            
            results["entities"] = entity_groups
            ctx.logger.info(f"Found {len(entities)} entities")
        except Exception as e:
            ctx.logger.error(f"Entity extraction failed: {e}")
            results["entities"] = {"error": str(e)}

    # Generate summary using TextBlob
    if options.get("summary"):
        try:
            blob = TextBlob(text)
            sentences = blob.sentences[:3]  # First 3 sentences as summary
            
            results["summary"] = {
                "text": " ".join(str(s) for s in sentences),
                "sentenceCount": len(blob.sentences),
                "wordCount": len(blob.words),
            }
            
            ctx.logger.info(f"Summary generated: {len(blob.sentences)} sentences")
        except Exception as e:
            ctx.logger.error(f"Summary generation failed: {e}")
            results["summary"] = {"error": str(e)}

    # Calculate processing time
    processing_time = ctx.now() - request.get("timestamp", ctx.now())

    # Enqueue results for TypeScript step to handle
    await ctx.enqueue(
        topic="ts.analysis.complete",
        data={
            "analysisId": analysis_id,
            "userId": user_id,
            "results": results,
            "processingTime": processing_time,
            "completedAt": ctx.now(),
        },
    )

    ctx.logger.info(
        f"Python analysis completed: {analysis_id}",
        {"processingTime": processing_time},
    )

Step 3: TypeScript Result Handler

Handle analysis results and notify users:
// steps/api/handle-analysis-results.step.ts
import { type Handlers, type StepConfig } from 'motia'
import { z } from 'zod'

const resultsSchema = z.object({
  analysisId: z.string(),
  userId: z.string(),
  results: z.record(z.any()),
  processingTime: z.number(),
  completedAt: z.number(),
})

export const config = {
  name: 'Handle Analysis Results',
  description: 'TypeScript handler for Python analysis results',
  flows: ['multi-language-app'],
  triggers: [
    {
      type: 'queue',
      topic: 'ts.analysis.complete',
      input: resultsSchema,
    },
  ],
  enqueues: ['notifications.send'],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  input,
  { logger, state, enqueue }
) => {
  const { analysisId, userId, results, processingTime, completedAt } = input

  logger.info('Analysis results received', {
    analysisId,
    processingTime,
  })

  // Update analysis state with results
  await state.update('analyses', analysisId, [
    { type: 'set', path: 'status', value: 'completed' },
    { type: 'set', path: 'results', value: results },
    { type: 'set', path: 'processingTime', value: processingTime },
    { type: 'set', path: 'completedAt', value: completedAt },
  ])

  // Format results for display
  const summary = formatResults(results)

  // Send notification to user
  await enqueue({
    topic: 'notifications.send',
    data: {
      userId,
      type: 'analysis_complete',
      message: `Your text analysis is complete: ${summary}`,
      analysisId,
    },
  })

  logger.info('Analysis results processed', { analysisId, userId })
}

function formatResults(results: Record<string, any>): string {
  const parts: string[] = []

  if (results.sentiment) {
    parts.push(`Sentiment: ${results.sentiment.label}`)
  }

  if (results.entities) {
    const entityCount = Object.values(results.entities)
      .reduce((sum: number, arr: any) => sum + arr.length, 0)
    parts.push(`${entityCount} entities found`)
  }

  if (results.summary) {
    parts.push(`${results.summary.wordCount} words`)
  }

  return parts.join(', ')
}

Step 4: TypeScript Results API

Provide an API to retrieve results:
// steps/api/get-analysis.step.ts
import { type Handlers, http, type StepConfig } from 'motia'
import { z } from 'zod'

const querySchema = z.object({
  analysisId: z.string(),
})

export const config = {
  name: 'Get Analysis Results',
  description: 'Retrieve analysis results by ID',
  flows: ['multi-language-app'],
  triggers: [
    {
      type: 'http',
      method: 'GET',
      path: '/api/analyze/:analysisId',
      querySchema,
    },
  ],
} as const satisfies StepConfig

export const handler: Handlers<typeof config> = async (
  { request },
  { logger, state }
) => {
  const { analysisId } = request.params

  logger.info('Fetching analysis results', { analysisId })

  const analysis = await state.get('analyses', analysisId)

  if (!analysis) {
    return {
      status: 404,
      body: { error: 'Analysis not found', analysisId },
    }
  }

  return {
    status: 200,
    body: {
      analysisId,
      ...analysis,
    },
  }
}

Step 5: Python Data Processing

Add additional Python processing capabilities:
# steps/processing/generate_insights_step.py
import numpy as np
import pandas as pd
from typing import Any
from motia import FlowContext

config = {
    "name": "Generate Insights (Python)",
    "description": "Generate data insights using Python data science libraries",
    "flows": ["multi-language-app"],
    "triggers": [
        {
            "type": "queue",
            "topic": "python.generate.insights",
        },
    ],
    "enqueues": ["ts.insights.ready"],
}

async def handler(request: dict[str, Any], ctx: FlowContext[Any]) -> None:
    """Generate insights from data using pandas and numpy."""
    data_id = request.get("dataId")
    data_points = request.get("dataPoints", [])

    ctx.logger.info(f"Generating insights for: {data_id}")

    # Convert to pandas DataFrame
    df = pd.DataFrame(data_points)

    # Calculate statistics
    insights = {
        "mean": float(df["value"].mean()) if "value" in df.columns else None,
        "median": float(df["value"].median()) if "value" in df.columns else None,
        "std": float(df["value"].std()) if "value" in df.columns else None,
        "count": len(df),
        "min": float(df["value"].min()) if "value" in df.columns else None,
        "max": float(df["value"].max()) if "value" in df.columns else None,
    }

    # Detect outliers using IQR method
    if "value" in df.columns:
        Q1 = df["value"].quantile(0.25)
        Q3 = df["value"].quantile(0.75)
        IQR = Q3 - Q1
        outliers = df[
            (df["value"] < Q1 - 1.5 * IQR) | (df["value"] > Q3 + 1.5 * IQR)
        ]
        insights["outliers"] = len(outliers)
        insights["outlierPercentage"] = (len(outliers) / len(df)) * 100

    # Calculate trends
    if "value" in df.columns and len(df) > 1:
        trend = np.polyfit(range(len(df)), df["value"].values, 1)
        insights["trend"] = "increasing" if trend[0] > 0 else "decreasing"
        insights["trendSlope"] = float(trend[0])

    # Enqueue results back to TypeScript
    await ctx.enqueue(
        topic="ts.insights.ready",
        data={
            "dataId": data_id,
            "insights": insights,
            "generatedAt": ctx.now(),
        },
    )

    ctx.logger.info(f"Insights generated for: {data_id}", insights)

Key Features

Seamless Integration

TypeScript and Python steps communicate naturally through typed queues and shared state.

Language Strengths

Leverage TypeScript for APIs and async I/O, Python for ML, data science, and compute-intensive tasks.

Type Safety

Use Zod schemas in TypeScript and type hints in Python for end-to-end type safety.

Unified Deployment

Deploy and scale TypeScript and Python steps together as a single application.

Shared State

Access the same state and streams from both languages with consistent APIs.

Testing

Test the multi-language workflow:
# Submit analysis request (hits TypeScript endpoint)
curl -X POST http://localhost:3111/api/analyze \
  -H "Content-Type: application/json" \
  -d '{
    "text": "Motia is an amazing framework for building multi-language applications!",
    "userId": "user-123",
    "options": {
      "sentiment": true,
      "entities": true,
      "summary": true
    }
  }'

# Get results (TypeScript API retrieves Python-processed data)
curl http://localhost:3111/api/analyze/{analysisId}

Advanced Patterns

Bidirectional Communication

Python can call back to TypeScript for additional processing:
# Python step
if needs_additional_processing:
    await ctx.enqueue(
        topic="ts.additional.processing",
        data={"analysisId": analysis_id, "reason": "complex_case"},
    )

Shared Types

Define types once, use in both languages:
// types/analysis.ts
export const AnalysisSchema = z.object({
  analysisId: z.string(),
  status: z.enum(['pending', 'processing', 'completed', 'failed']),
  results: z.record(z.any()),
})

export type Analysis = z.infer<typeof AnalysisSchema>

Error Handling

Handle errors consistently across languages:
try:
    result = perform_analysis(text)
except Exception as e:
    ctx.logger.error(f"Analysis failed: {e}")
    await ctx.enqueue(
        topic="ts.analysis.failed",
        data={"analysisId": analysis_id, "error": str(e)},
    )
    return

Environment Setup

# Install TypeScript dependencies
npm install motia zod

# Install Python dependencies
pip install motia-py transformers textblob pandas numpy

# Download ML models
python -m textblob.download_corpora

Production Considerations

  1. Model Loading: Use lazy loading and model caching for ML models
  2. Resource Isolation: Run Python steps in separate containers if needed
  3. Error Recovery: Implement retry logic for both TypeScript and Python steps
  4. Monitoring: Track performance metrics for both languages separately
  5. Scaling: Scale TypeScript and Python workers independently based on load

Use Cases

  • AI/ML APIs: TypeScript APIs with Python ML processing
  • Data Pipelines: TypeScript orchestration with Python data processing
  • Image Processing: TypeScript uploads, Python OpenCV processing
  • Scientific Computing: TypeScript UI, Python NumPy/SciPy calculations
  • NLP Applications: TypeScript APIs, Python spaCy/transformers
  • Financial Analysis: TypeScript services, Python pandas/QuantLib

Next Steps

  • Add more Python processing capabilities (computer vision, time series)
  • Implement streaming responses from Python to TypeScript
  • Build shared configuration management
  • Add cross-language monitoring and tracing
  • Implement polyglot testing strategies

Build docs developers (and LLMs) love