Skip to main content

Architecture Overview

The backend is a Python FastAPI application designed to run in AWS Lambda, using DynamoDB for data storage and AWS Bedrock for AI capabilities.

Tech Stack

FastAPI 0.104.1

Modern, fast web framework for building APIs

AWS Lambda

Serverless compute with Mangum adapter

DynamoDB

Single-table NoSQL database

Bedrock (Claude)

AWS Bedrock with Claude Sonnet 4 for AI

Cognito

User authentication and authorization

S3 + S3 Vectors

Document storage and vector embeddings

Project Structure

app/
├── database/               # DynamoDB client
│   └── client.py          # Boto3 wrapper with helpers
├── handlers/               # Lambda entry points
│   └── lambda_handler.py  # Main Lambda handler
├── middleware/             # Request/response middleware
│   └── authmiddleware.py # JWT token verification
├── routers/                # Legacy routers (deprecated)
├── shared/                 # Shared across features
│   ├── ai/
│   │   └── bedrockservice.py  # Bedrock AI client
│   ├── database/
│   │   └── historyservice.py  # Query history tracking
│   ├── documents/
│   │   └── routes.py          # Document upload/download
│   ├── health/
│   │   └── routes.py          # Health check endpoint
│   ├── schemas/
│   │   └── promptmodel.py     # Pydantic models
│   └── vectors/
│       └── vectorservice.py   # S3 Vectors integration
├── tools/                  # Feature modules (tools)
│   ├── proposalwriter/    # Main feature
│   │   ├── routes.py      # All proposal endpoints
│   │   ├── rfpanalysis/
│   │   │   ├── service.py # RFP analysis logic
│   │   │   └── config.py  # AI settings
│   │   ├── conceptevaluation/
│   │   ├── conceptdocumentgeneration/
│   │   ├── structureworkplan/
│   │   ├── proposaltemplategeneration/
│   │   └── workflow/
│   │       └── worker.py  # Lambda worker for async tasks
│   ├── admin/
│   │   ├── promptsmanager/ # CRUD for AI prompts
│   │   └── settings/       # System settings
│   └── auth/
│       ├── routes.py       # Login, signup, password reset
│       └── service.py      # Cognito integration
├── utils/                  # Utility functions
│   ├── awssession.py      # Boto3 session management
│   ├── bedrockclient.py   # Bedrock client factory
│   └── docextraction.py   # PDF/DOCX text extraction
└── main.py                 # FastAPI app entry point
This follows screaming architecture - the tools/ directory immediately shows what the platform does (proposalwriter, admin, auth).

FastAPI Application

Main Application Setup

# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import os

app = FastAPI(
    title="IGAD Innovation Hub API",
    description="AI-powered proposal writing platform",
    version="1.0.0",
)

# CORS configuration
allowedOrigins = os.getenv(
    "CORSALLOWEDORIGINS",
    "http://localhost:3000"
).split(",")

app.addmiddleware(
    CORSMiddleware,
    alloworigins=allowedOrigins,
    allowcredentials=True,
    allowmethods=["*"],
    allowheaders=["*"],
)

# Register routers
from app.tools.proposalwriter.routes import router as proposalrouter
from app.tools.auth.routes import router as authrouter
from app.shared.health.routes import router as healthrouter

app.includerouter(proposalrouter)
app.includerouter(authrouter)
app.includerouter(healthrouter)

Environment Variables

Required environment variables:
VariableDescriptionExample
AWSREGIONAWS regionuseast1
TABLENAMEDynamoDB tableigadtestingmaintable
PROPOSALSBUCKETS3 bucket for documentsigadproposaldocuments123456
COGNITOUSERPOOLIDCognito user pooluseast1IMi3kSuB8
COGNITOCLIENTIDCognito app client7p11hp6gcklhctcr9qffne71vl
WORKERFUNCTIONNAMELambda worker ARNigadtestingAnalysisWorkerFunction
ENVIRONMENTEnvironment nametesting or production
CORSALLOWEDORIGINSAllowed CORS originshttp://localhost:3000,https://app.example.com

Code Patterns

Module Structure

Each feature module follows this pattern:
featurename/
├── __init__.py          # Empty or exports
├── routes.py            # FastAPI router with endpoints
├── subfeature/
│   ├── __init__.py
│   ├── service.py       # Business logic class
│   └── config.py        # Configuration constants
└── workflow/
    └── worker.py        # Lambda worker (if async)

Router Pattern

# routes.py
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from pydantic import BaseModel
from typing import Optional

router = APIRouter(prefix="/api/proposals", tags=["proposals"])
security = HTTPBearer()
authmiddleware = AuthMiddleware()

# Pydantic models for request/response
class ProposalCreate(BaseModel):
    title: str
    description: str = ""
    templateid: Optional[str] = None

class ProposalUpdate(BaseModel):
    title: Optional[str] = None
    status: Optional[str] = None

# Authentication dependency
async def getCurrentUser(
    credentials: HTTPAuthorizationCredentials = Depends(security),
):
    return authmiddleware.verifyToken(credentials)

# Endpoint
@router.post("", responsemodel=None)
async def createProposal(
    body: ProposalCreate,
    user: dict = Depends(getCurrentUser),
):
    try:
        # Business logic
        proposal = await createProposalLogic(body, user)
        
        # Filter DynamoDB internal keys from response
        response = {k: v for k, v in proposal.items() 
                   if k not in ["PK", "SK", "GSI1PK", "GSI1SK"]}
        return {"proposal": response}
    except HTTPException:
        raise  # Re-raise HTTP exceptions unchanged
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed: {str(e)}")

Service Class Pattern

# service.py
import os
from typing import Any, Dict, Optional
import boto3
from app.shared.ai.bedrockservice import BedrockService
from .config import FEATURESETTINGS

class FeatureService:
    """
    Service for [feature description].
    
    Workflow:
    1. Load data from DynamoDB
    2. Process with AI
    3. Save results back to DynamoDB
    """
    
    def __init__(self):
        """Initialize AWS clients and configuration."""
        self.s3 = boto3.client("s3")
        self.bucket = os.environ.get("PROPOSALSBUCKET")
        self.bedrock = BedrockService()
        self.dynamodb = boto3.resource("dynamodb")
        self.tablename = os.environ.get("TABLENAME", "igadtestingmaintable")
    
    def process(self, proposalid: str) -> Dict[str, Any]:
        """
        Process the feature.
        
        Args:
            proposalid: Unique proposal identifier
            
        Returns:
            Dict with result data and status
        """
        print(f"Step 1: Loading data for {proposalid}")
        # Implementation
        
        print(f"Step 2: Processing with AI...")
        response = self.bedrock.invokeClaude(
            systemprompt=prompt["systemprompt"],
            userprompt=prompt["userprompt"],
            maxtokens=FEATURESETTINGS.get("maxtokens", 12000),
            temperature=FEATURESETTINGS.get("temperature", 0.2),
        )
        
        print(f"Completed successfully")
        return {"status": "completed", "data": response}

Configuration Pattern

# config.py
"""
Configuration for RFP Analysis.

Purpose:
- Analyze Request for Proposal (RFP) documents
- Extract requirements, deadlines, and evaluation criteria

Settings:
- model: Claude Sonnet 4
- maxtokens: 12000 (long documents)
- temperature: 0.2 (more deterministic)
"""

RFPANALYSISSETTINGS = {
    # ==================== AI Model Configuration ====================
    "model": "us.anthropic.claude-sonnet-4-20250514-v1:0",
    "maxtokens": 12000,
    "temperature": 0.2,
    "topp": 0.9,
    "topk": 250,
    
    # ==================== Processing Settings ====================
    "timeout": 300,  # 5 minutes
    "maxpages": 100,
    
    # ==================== DynamoDB Prompt Lookup ====================
    "section": "proposalwriter",
    "subsection": "step1",
    "category": "RFP Analysis",
}

DynamoDB Patterns

Single-Table Design

The application uses a single DynamoDB table with composite keys:
# Primary keys
PK = f"PROPOSAL#{proposalcode}"  # Partition key
SK = "METADATA"                     # Sort key

# Alternative sort keys for different data types
SK = "RFPANALYSIS"              # Analysis result
SK = "CONCEPTANALYSIS"          # Concept evaluation
SK = "OUTLINE"                   # Proposal outline
SK = "CONCEPTDOCUMENTv2"       # Generated document

# Global Secondary Index (GSI1) for querying by user
GSI1PK = f"USER#{userid}"       # Query all proposals by user
GSI1SK = f"PROPOSAL#{timestamp}" # Sort by creation date

Database Client

from app.database.client import dbclient

# Get single item
proposal = await dbclient.getItem(
    pk=f"PROPOSAL#{proposalid}",
    sk="METADATA"
)

# Put item
await dbclient.putItem({
    "PK": f"PROPOSAL#{proposalid}",
    "SK": "METADATA",
    "title": "My Proposal",
    "status": "draft",
    "createdat": datetime.utcnow().isoformat(),
})

# Update item
await dbclient.updateItem(
    pk=f"PROPOSAL#{proposalid}",
    sk="METADATA",
    updateexpression="SET status = :s, updatedat = :u",
    expressionattributevalues={
        ":s": "completed",
        ":u": datetime.utcnow().isoformat()
    }
)

# Query items (by user)
proposals = await dbclient.queryItems(
    pk=f"USER#{userid}",
    indexname="GSI1"
)

Pagination (Critical!)

DynamoDB returns maximum 1MB per query. Always handle pagination:
def getAllItems(self, pk: str) -> List[Dict[str, Any]]:
    """
    Query all items with pagination support.
    
    CRITICAL: DynamoDB limits responses to 1MB.
    Without pagination, you may get incomplete results!
    """
    allitems = []
    
    kwargs = {
        "KeyConditionExpression": Key("PK").eq(pk)
    }
    
    # First query
    response = self.table.query(**kwargs)
    allitems.extend(response.get("Items", []))
    
    # Continue fetching while there are more pages
    while "LastEvaluatedKey" in response:
        kwargs["ExclusiveStartKey"] = response["LastEvaluatedKey"]
        response = self.table.query(**kwargs)
        allitems.extend(response.get("Items", []))
    
    return allitems

AI Integration with Bedrock

Bedrock Service

from app.shared.ai.bedrockservice import BedrockService

bedrock = BedrockService()

# Invoke Claude
response = bedrock.invokeClaude(
    systemprompt="You are an expert proposal writer.",
    userprompt=f"Analyze this RFP: {rfptext}",
    maxtokens=12000,
    temperature=0.2,
    modelid="us.anthropic.claude-sonnet-4-20250514-v1:0",
)

Loading Prompts from DynamoDB

Prompts are stored in DynamoDB and managed via the Admin UI:
from boto3.dynamodb.conditions import Attr

def getPromptFromDynamodb(self) -> Optional[Dict[str, str]]:
    """
    Load active prompt matching feature criteria.
    
    Returns:
        Dict with systemprompt and userprompt keys, or None
    """
    table = self.dynamodb.Table(self.tablename)
    
    # Query for active prompt with matching criteria
    filterexpr = (
        Attr("isactive").eq(True)
        & Attr("section").eq("proposalwriter")
        & Attr("subsection").eq("step1")
        & Attr("categories").contains("RFP Analysis")
    )
    
    response = table.scan(FilterExpression=filterexpr)
    items = response.get("Items", [])
    
    if not items:
        return None
    
    prompt = items[0]
    return {
        "systemprompt": prompt.get("systemprompt", ""),
        "userprompt": prompt.get("userprompttemplate", ""),
    }

Prompt Placeholder Replacement

Critical: Always handle both placeholder formats:
def replacePlaceholders(template: str, context: Dict[str, Any]) -> str:
    """
    Replace placeholders in prompt template.
    
    Handles both {{KEY}} and {[KEY]} formats.
    """
    for key, value in context.items():
        # Format 1: {{KEY}}
        template = template.replace("{{" + key + "}}", str(value))
        # Format 2: {[KEY]}
        template = template.replace("{[" + key + "]}", str(value))
    return template

# Usage
promptContext = {
    "RFPTEXT": rfpContent,
    "DEADLINE": deadline,
    "BUDGET": budget,
}

userprompt = replacePlaceholders(promptTemplate, promptContext)

Async Operations Pattern

Long-running AI operations use Lambda workers with polling:

1. Trigger Endpoint

@router.post("/{proposalid}/analyzerfp")
async def analyzeRfp(
    proposalid: str,
    user: dict = Depends(getCurrentUser),
):
    """Trigger RFP analysis (returns immediately)."""
    
    # Check if already completed
    if proposal.get("rfpanalysis"):
        return {
            "status": "completed",
            "data": proposal["rfpanalysis"],
            "cached": True
        }
    
    # Check if already in progress
    if proposal.get("analysisstatusrfp") == "processing":
        return {"status": "processing"}
    
    # Set status to processing
    await dbclient.updateItem(
        pk=pk, sk="METADATA",
        updateexpression="SET analysisstatusrfp = :s",
        expressionattributevalues={":s": "processing"}
    )
    
    # Invoke worker Lambda asynchronously
    lambdaclient = boto3.client("lambda")
    lambdaclient.invoke(
        FunctionName=workerarn,
        InvocationType="Event",  # Async!
        Payload=json.dumps({
            "proposalid": proposalid,
            "task": "analyzerfp"
        })
    )
    
    return {
        "status": "processing",
        "message": "Poll /rfpanalysisstatus for completion"
    }

2. Status Endpoint

@router.get("/{proposalid}/rfpanalysisstatus")
async def getRfpAnalysisStatus(
    proposalid: str,
    user: dict = Depends(getCurrentUser),
):
    """Poll for RFP analysis status."""
    
    status = proposal.get("analysisstatusrfp", "notstarted")
    
    if status == "completed":
        return {
            "status": "completed",
            "data": proposal.get("rfpanalysis")
        }
    elif status == "failed":
        return {
            "status": "failed",
            "error": proposal.get("rfpanalysiserror")
        }
    else:
        return {"status": status}

3. Lambda Worker

# workflow/worker.py
import json
from app.database.client import dbclient
from ..rfpanalysis.service import RFPAnalyzer

def handler(event, context):
    """
    Lambda worker for async proposal operations.
    
    Event format:
    {
        "proposalid": "abc123",
        "task": "analyzerfp"
    }
    """
    task = event.get("task")
    proposalid = event.get("proposalid")
    
    if task == "analyzerfp":
        handleRfpAnalysis(proposalid)
    elif task == "analyzeconcept":
        handleConceptAnalysis(proposalid)
    # ... more tasks

def handleRfpAnalysis(proposalid: str):
    """Handle RFP analysis task."""
    try:
        analyzer = RFPAnalyzer()
        result = analyzer.analyzeRfp(proposalid)
        
        # Save result and update status to completed
        dbclient.updateItemSync(
            pk=f"PROPOSAL#{proposalid}",
            sk="METADATA",
            updateexpression="SET analysisstatusrfp = :s, rfpanalysis = :r",
            expressionattributevalues={
                ":s": "completed",
                ":r": result
            }
        )
    except Exception as e:
        # Save error and update status to failed
        dbclient.updateItemSync(
            pk=f"PROPOSAL#{proposalid}",
            sk="METADATA",
            updateexpression="SET analysisstatusrfp = :s, rfpanalysiserror = :e",
            expressionattributevalues={
                ":s": "failed",
                ":e": str(e)
            }
        )

Authentication with Cognito

import boto3
from fastapi import HTTPException

class CognitoService:
    def __init__(self):
        self.client = boto3.client('cognitoidentityprovider')
        self.userpoolid = os.environ.get('COGNITOUSERPOOLID')
        self.clientid = os.environ.get('COGNITOCLIENTID')
    
    def signin(self, email: str, password: str) -> Dict[str, Any]:
        """Sign in user with email and password."""
        try:
            response = self.client.adminInitiateAuth(
                UserPoolId=self.userpoolid,
                ClientId=self.clientid,
                AuthFlow='ADMINNOSPRPAUTH',
                AuthParameters={
                    'USERNAME': email,
                    'PASSWORD': password
                }
            )
            
            return {
                'accesstoken': response['AuthenticationResult']['AccessToken'],
                'idtoken': response['AuthenticationResult']['IdToken'],
                'refreshtoken': response['AuthenticationResult']['RefreshToken'],
            }
        except ClientError as e:
            raise HTTPException(status_code=401, detail="Invalid credentials")

Testing

Run Tests

# Run all tests
make test
# Or: pytest tests/

# Run specific file
pytest tests/testrfpanalysis.py

# Run specific test
pytest tests/testrfpanalysis.py::testAnalyzeRfp

# Run tests matching pattern
pytest tests/ -k "rfp"

# With coverage
make testcov
# Or: pytest tests/ --cov=app --covreport=html

Test Example

import pytest
from fastapi.testclient import TestClient
from app.main import app

client = TestClient(app)

@pytest.mark.usermanagement
def testCreateProposal(authHeaders):
    """Test proposal creation."""
    response = client.post(
        "/api/proposals",
        json={"title": "Test Proposal"},
        headers=authHeaders,
    )
    assert response.statuscode == 201
    assert "proposal" in response.json()

Code Quality

Formatting and Linting

# Format code
make format  # Black + isort

# Lint code
make lint  # Flake8

# Type check
make typecheck  # Mypy

# All checks
make allchecks

Pre-commit Hooks

The project uses precommit hooks that automatically:
  • Format code with Black
  • Sort imports with isort
  • Lint with Flake8
  • Type check with Mypy

Deployment

The backend is deployed as a Lambda function using AWS SAM:
# Deploy to testing
cd igadapp
./scripts/deployFullstacktesting.sh --backendonly

# Deploy to production (requires confirmation)
./scripts/deployFullstackproduction.sh --backendonly
See Infrastructure Guide for details.

Next Steps

Infrastructure

Learn about AWS CDK, SAM templates, and deployment

Frontend

Understand how the React frontend consumes these APIs

Build docs developers (and LLMs) love