Skip to main content

Overview

VoicePact implements a defense-in-depth security model designed for the unique challenges of African informal markets:
  • Cryptographic integrity - Ed25519 signatures and hash verification
  • Immutable audit logs - Complete event trail for dispute resolution
  • Data encryption - Sensitive information protected at rest
  • Webhook verification - HMAC signatures for external API callbacks
  • Rate limiting - Protection against SMS/USSD abuse (planned)

Security Principles

From the README (README.md:107-121):
Principles:
- Least privilege separation between processing steps
- No private keys in frontend – signatures are server-side
- Audio encryption at rest (planned envelope encryption)
- Ed25519 signatures for contract canonical form hash
- Immutable event log (append-only table) for evidentiary chain

Controls (Current / Planned):
- OTP-backed phone identity binding
- Rate limiting: SMS / USSD confirmation attempts
- Integrity hashing of audio + transcript bundles
- Secure webhook verification (HMAC headers – planned)
- PII minimization: Partial phone masking in UI

Cryptographic Services

The CryptoService class (crypto_service.py) provides all cryptographic operations.

Contract Hashing

Purpose: Ensure contract immutability and detect tampering Implementation (crypto_service.py:51-63):
def generate_contract_hash(self, content: str) -> str:
    try:
        content_bytes = content.encode('utf-8')
        
        # Use BLAKE2b for speed or SHA-256 for compatibility
        if settings.contract_hash_algorithm == "blake2b":
            hash_obj = hashlib.blake2b(content_bytes, digest_size=32)
        else:
            hash_obj = hashlib.sha256(content_bytes)
        
        return hash_obj.hexdigest()
    except Exception as e:
        logger.error(f"Hash generation failed: {e}")
        raise CryptographicError(f"Failed to generate hash: {e}")
Usage in contract generation (contract_generator.py:54-60):
def generate_contract_hash(self, transcript: str, terms: Dict[str, Any]) -> str:
    # Create deterministic content representation
    content = f"{transcript}:{str(sorted(terms.items()))}"
    
    if settings.contract_hash_algorithm == "blake2b":
        return hashlib.blake2b(content.encode()).hexdigest()
    else:
        return hashlib.sha256(content.encode()).hexdigest()
Integrity validation (crypto_service.py:238-244):
def validate_contract_integrity(
    self, 
    original_hash: str, 
    current_content: str
) -> bool:
    try:
        current_hash = self.generate_contract_hash(current_content)
        # Constant-time comparison prevents timing attacks
        return hmac.compare_digest(original_hash, current_hash)
    except Exception as e:
        logger.error(f"Contract integrity validation failed: {e}")
        return False
BLAKE2b vs SHA-256:
  • BLAKE2b - Faster, smaller output, modern
  • SHA-256 - Wider compatibility, FIPS approved
VoicePact defaults to BLAKE2b for performance but supports SHA-256 for regulatory compliance.

Ed25519 Digital Signatures

Purpose: Cryptographically bind parties to contract terms Key pair generation (crypto_service.py:27-49):
def generate_key_pair(self) -> Tuple[str, str]:
    try:
        # Generate Ed25519 key pair
        private_key = ed25519.Ed25519PrivateKey.generate()
        public_key = private_key.public_key()
        
        # Serialize to PEM format
        private_pem = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption()
        )
        
        public_pem = public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        
        # Base64 encode for storage
        return (
            base64.b64encode(private_pem).decode('utf-8'),
            base64.b64encode(public_pem).decode('utf-8')
        )
    except Exception as e:
        logger.error(f"Key pair generation failed: {e}")
        raise CryptographicError(f"Failed to generate key pair: {e}")
Phone-specific key derivation (crypto_service.py:246-259):
def _derive_signing_key(self, phone_number: str) -> bytes:
    """Derive deterministic signing key from phone number"""
    try:
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,  # Ed25519 requires 32-byte seed
            salt=self.salt.encode('utf-8'),
            iterations=100000,  # OWASP recommended minimum
        )
        
        # Combine master key with phone number
        key_material = f"{self.master_key}:{phone_number}".encode('utf-8')
        return kdf.derive(key_material)
    except Exception as e:
        logger.error(f"Key derivation failed: {e}")
        raise CryptographicError(f"Failed to derive signing key: {e}")
Why derive keys from phone numbers?
  1. No key storage - Keys regenerated on-demand from phone number
  2. Consistent - Same phone always produces same key
  3. Isolated - Each phone has unique cryptographic identity
  4. Recovery - No key backup needed; derived from master secret
Contract signing (crypto_service.py:65-76):
def sign_contract(self, contract_data: str, phone_number: str) -> str:
    try:
        # Derive signing key
        signing_key = self._derive_signing_key(phone_number)
        private_key = ed25519.Ed25519PrivateKey.from_private_bytes(signing_key)
        
        # Create timestamped message
        message = f"{contract_data}:{phone_number}:{datetime.utcnow().isoformat()}"
        signature = private_key.sign(message.encode('utf-8'))
        
        return base64.b64encode(signature).decode('utf-8')
    except Exception as e:
        logger.error(f"Contract signing failed: {e}")
        raise CryptographicError(f"Failed to sign contract: {e}")

SMS OTP Generation

Purpose: Phone-based identity verification Code generation (crypto_service.py:101-111):
def generate_sms_confirmation_code(
    self, 
    contract_id: str, 
    phone_number: str
) -> str:
    try:
        # Daily-rotated deterministic code
        content = f"{contract_id}:{phone_number}:{datetime.utcnow().date().isoformat()}"
        hash_obj = hashlib.sha256(content.encode('utf-8'))
        hex_hash = hash_obj.hexdigest()
        
        # Convert to 6-digit numeric code
        numeric_code = int(hex_hash[:8], 16) % 1000000
        return f"{numeric_code:06d}"
    except Exception as e:
        logger.error(f"SMS code generation failed: {e}")
        raise CryptographicError(f"Failed to generate SMS code: {e}")
Properties:
  • Deterministic - Same contract + phone + day = same code
  • Time-limited - Changes daily (no database needed)
  • Collision-resistant - SHA-256 hash prevents guessing

Payment Reference Generation

Purpose: Unique, verifiable payment identifiers Implementation (crypto_service.py:121-128):
def generate_payment_reference(
    self, 
    contract_id: str, 
    amount: float, 
    phone_number: str
) -> str:
    try:
        content = f"{contract_id}:{amount}:{phone_number}"
        # Use BLAKE2b with 8-byte digest for compact references
        hash_obj = hashlib.blake2b(content.encode('utf-8'), digest_size=8)
        return hash_obj.hexdigest().upper()
    except Exception as e:
        logger.error(f"Payment reference generation failed: {e}")
        raise CryptographicError(f"Failed to generate payment reference: {e}")
Example: AG-260306-A1B2C3 + 150000.00 + +254712345678 → 4F7A3B9D1E2C8A6F

Webhook Signature Verification

Purpose: Validate callbacks from Africa’s Talking are authentic Signature generation (crypto_service.py:162-174):
def generate_webhook_signature(self, payload: str) -> str:
    try:
        webhook_secret = settings.get_secret_value('webhook_secret')
        
        # HMAC-SHA256 signature
        signature = hmac.new(
            webhook_secret.encode('utf-8'),
            payload.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
        
        return f"sha256={signature}"
    except Exception as e:
        logger.error(f"Webhook signature generation failed: {e}")
        raise CryptographicError(f"Failed to generate webhook signature: {e}")
Verification (crypto_service.py:176-182):
def verify_webhook_signature(self, payload: str, signature: str) -> bool:
    try:
        expected_signature = self.generate_webhook_signature(payload)
        # Constant-time comparison
        return hmac.compare_digest(signature, expected_signature)
    except Exception as e:
        logger.error(f"Webhook signature verification failed: {e}")
        return False
Usage in webhook handlers:
@router.post("/payments/webhook")
async def payment_webhook(
    request: Request,
    crypto_service: CryptoService = Depends(get_crypto_service)
):
    # Get signature from header
    signature = request.headers.get("X-AT-Signature")
    
    # Get raw body
    body = await request.body()
    payload = body.decode('utf-8')
    
    # Verify signature
    if not crypto_service.verify_webhook_signature(payload, signature):
        raise HTTPException(status_code=401, detail="Invalid webhook signature")
    
    # Process webhook...
Why HMAC for webhooks?
  • Shared secret - Both parties know the webhook secret
  • No PKI - Simpler than certificate management
  • Fast - Critical for real-time webhook processing
  • Standard - Used by GitHub, Stripe, etc.

Data Encryption

Purpose: Protect sensitive data at rest (PII, payment details) Encryption (crypto_service.py:184-203):
def encrypt_sensitive_data(self, data: str, context: str = "") -> str:
    try:
        # Generate random salt
        salt = secrets.token_bytes(16)
        
        # Derive encryption key using PBKDF2
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000,
        )
        
        key_material = f"{self.master_key}:{context}".encode('utf-8')
        key = kdf.derive(key_material)
        
        # XOR encryption (simple but effective for this use case)
        encrypted = self._xor_encrypt(data.encode('utf-8'), key)
        
        # Prepend salt to ciphertext
        combined = salt + encrypted
        
        return base64.b64encode(combined).decode('utf-8')
    except Exception as e:
        logger.error(f"Data encryption failed: {e}")
        raise CryptographicError(f"Failed to encrypt data: {e}")
Decryption (crypto_service.py:205-225):
def decrypt_sensitive_data(self, encrypted_data: str, context: str = "") -> str:
    try:
        combined = base64.b64decode(encrypted_data.encode('utf-8'))
        
        # Extract salt and ciphertext
        salt = combined[:16]
        encrypted = combined[16:]
        
        # Derive same key
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000,
        )
        
        key_material = f"{self.master_key}:{context}".encode('utf-8')
        key = kdf.derive(key_material)
        
        # Decrypt
        decrypted = self._xor_encrypt(encrypted, key)
        return decrypted.decode('utf-8')
    except Exception as e:
        logger.error(f"Data decryption failed: {e}")
        raise CryptographicError(f"Failed to decrypt data: {e}")

Audit Trail

Purpose: Immutable evidence chain for disputes Audit signature creation (crypto_service.py:130-144):
def create_audit_signature(
    self, 
    action: str, 
    contract_id: str, 
    actor: str, 
    data: Dict[str, Any]
) -> str:
    try:
        timestamp = datetime.utcnow().isoformat()
        
        # Create canonical representation
        content = f"{action}:{contract_id}:{actor}:{timestamp}:{str(sorted(data.items()))}"
        
        # HMAC signature
        signature = hmac.new(
            self.master_key.encode('utf-8'),
            content.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()
        
        return f"{timestamp}:{signature}"
    except Exception as e:
        logger.error(f"Audit signature creation failed: {e}")
        raise CryptographicError(f"Failed to create audit signature: {e}")
Audit log model (contract.py:271-309):
class AuditLog(Base):
    __tablename__ = "audit_logs"
    
    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    contract_id: Mapped[str] = mapped_column(
        String(50),
        ForeignKey("contracts.id", ondelete="CASCADE")
    )
    action: Mapped[str] = mapped_column(String(50), index=True)
    actor_phone: Mapped[Optional[str]] = mapped_column(String(20))
    old_values: Mapped[Optional[dict]] = mapped_column(JSON, default=dict)
    new_values: Mapped[Optional[dict]] = mapped_column(JSON, default=dict)
    details: Mapped[Optional[str]] = mapped_column(Text)
    created_at: Mapped[datetime] = mapped_column(
        DateTime, 
        default=datetime.utcnow,
        index=True
    )
Key features:
  • Append-only - Logs never deleted or modified
  • Timestamped - Every action has precise timing
  • Actor tracking - Know who did what
  • State diff - Record old and new values
  • Cryptographically signed - Tamper detection

Database Security Constraints

Integrity checks (contract.py:133-139):
__table_args__ = (
    # Prevent negative amounts
    CheckConstraint("total_amount >= 0", name="check_positive_amount"),
    
    # Ensure expiration is after creation
    CheckConstraint("created_at <= expires_at", name="check_valid_expiry"),
    
    # Composite indexes for security queries
    Index("idx_contract_status_created", "status", "created_at"),
    Index("idx_contract_type_status", "contract_type", "status"),
)

Session Security

USSD session tokens (crypto_service.py:227-236):
def generate_session_token(
    self, 
    phone_number: str, 
    session_type: str = "ussd"
) -> str:
    try:
        timestamp = datetime.utcnow().timestamp()
        content = f"{phone_number}:{session_type}:{timestamp}:{secrets.token_hex(8)}"
        
        # BLAKE2b for fast token generation
        token_hash = hashlib.blake2b(content.encode('utf-8'), digest_size=16)
        return base64.urlsafe_b64encode(token_hash.digest()).decode('utf-8').rstrip('=')
    except Exception as e:
        logger.error(f"Session token generation failed: {e}")
        raise CryptographicError(f"Failed to generate session token: {e}")

Contract Verification Codes

Purpose: Human-readable integrity checks Implementation (crypto_service.py:265-275):
def generate_contract_verification_code(self, contract_id: str) -> str:
    try:
        content = f"{contract_id}:{datetime.utcnow().date().isoformat()}"
        hash_obj = hashlib.sha256(content.encode('utf-8'))
        hex_hash = hash_obj.hexdigest()
        
        # Generate 8-character code
        verification_code = hex_hash[:8].upper()
        return f"VC-{verification_code}"
    except Exception as e:
        logger.error(f"Verification code generation failed: {e}")
        raise CryptographicError(f"Failed to generate verification code: {e}")
Example: Contract AG-260306-A1B2C3 → Verification code VC-4F7A3B9D Parties can verify contract authenticity by comparing codes.

Security Best Practices

Implementation guidelines:
  1. Never log secrets - Redact keys, tokens, passwords in logs
  2. Constant-time comparison - Always use hmac.compare_digest() for equality checks
  3. Key rotation - Rotate master secrets periodically (not yet implemented)
  4. Secure randomness - Use secrets module, never random
  5. Input validation - Sanitize all user inputs before processing
  6. Rate limiting - Prevent brute-force attacks (planned)
  7. HTTPS only - Never transmit sensitive data over HTTP
  8. PII minimization - Only store necessary personal data

Configuration

Security settings (.env):
# Master cryptographic key (keep secret!)
SIGNATURE_PRIVATE_KEY=your_ed25519_private_key_base64

# Salt for key derivation
PASSWORD_SALT=random_salt_string

# Webhook verification secret
WEBHOOK_SECRET=shared_secret_with_africastalking

# Hash algorithm
CONTRACT_HASH_ALGORITHM=blake2b  # or sha256

Threat Model

Mitigated threats:
  • Contract tampering - Cryptographic hashes detect modifications
  • Signature forgery - Ed25519 prevents impersonation
  • Webhook spoofing - HMAC verification blocks fake callbacks
  • Replay attacks - Timestamped signatures expire
  • Timing attacks - Constant-time comparisons
Planned mitigations:
  • Rate limiting - Prevent SMS/USSD flooding
  • Audio encryption - Encrypt voice recordings at rest
  • Key rotation - Automated master key updates
  • 2FA - Optional second factor for high-value contracts

Build docs developers (and LLMs) love