Skip to main content
The test utilities module provides tools for testing workflows, activities, and handlers.

Overview

The SDK includes several testing utilities:
  • Hypothesis Strategies: Property-based testing strategies
  • E2E Testing: End-to-end testing utilities
  • Scale Data Generator: Generate test data at scale
  • Workflow Monitoring: Monitor workflow execution in tests

Hypothesis Strategies

application_sdk.test_utils.hypothesis.strategies Property-based testing strategies for generating test data.

Available Strategies

from application_sdk.test_utils.hypothesis.strategies import (
    temporal_strategies,
    sql_client_strategies,
    json_input_strategies,
    parquet_input_strategies,
    statestore_output_strategies
)

E2E Testing

application_sdk.test_utils.e2e End-to-end testing utilities for workflows.

Base Test Class

from application_sdk.test_utils.e2e.base import E2ETestBase

class TestMyWorkflow(E2ETestBase):
    async def test_workflow_execution(self):
        """Test complete workflow execution."""
        # Setup
        workflow_args = {...}
        
        # Execute
        result = await self.execute_workflow(
            workflow_class=MyWorkflow,
            workflow_args=workflow_args
        )
        
        # Assert
        assert result["status"] == "success"

Scale Data Generator

application_sdk.test_utils.scale_data_generator Generate large-scale test data for performance testing.

DataGenerator

from application_sdk.test_utils.scale_data_generator import DataGenerator

# Create generator
generator = DataGenerator(config={
    "entities": {
        "tables": {
            "count": 1000,
            "fields": ["name", "schema", "database"]
        },
        "columns": {
            "count": 10000,
            "fields": ["name", "type", "table_id"]
        }
    }
})

# Generate data
data = generator.generate()

Workflow Monitoring

application_sdk.test_utils.workflow_monitoring Monitor workflow execution during tests.

WorkflowMonitor

from application_sdk.test_utils.workflow_monitoring import WorkflowMonitor

# Create monitor
monitor = WorkflowMonitor(workflow_id="test-workflow-123")

# Start monitoring
await monitor.start()

# Check status
status = await monitor.get_status()
assert status == "RUNNING"

# Wait for completion
result = await monitor.wait_for_completion(timeout=300)
assert result["status"] == "COMPLETED"

Example Usage

Basic Unit Test

import pytest
from application_sdk.activities import ActivitiesInterface
from application_sdk.handlers.base import BaseHandler
from unittest.mock import AsyncMock, Mock

class TestMyActivities:
    
    @pytest.fixture
    def handler(self):
        """Create mock handler."""
        handler = Mock(spec=BaseHandler)
        handler.fetch_metadata = AsyncMock(return_value={"data": []})
        return handler
    
    @pytest.fixture
    def activities(self, handler):
        """Create activities instance."""
        activities = MyActivities()
        activities._state = {"test-workflow": Mock(
            handler=handler,
            workflow_args={"workflow_id": "test-workflow"}
        )}
        return activities
    
    @pytest.mark.asyncio
    async def test_fetch_data(self, activities, handler):
        """Test data fetching activity."""
        workflow_args = {
            "workflow_id": "test-workflow",
            "metadata": {"source": "test"}
        }
        
        result = await activities.fetch_data(workflow_args)
        
        assert result is not None
        handler.fetch_metadata.assert_called_once()

Integration Test

import pytest
from application_sdk.application import BaseApplication
from application_sdk.workflows import WorkflowInterface
from application_sdk.activities import ActivitiesInterface

class TestWorkflowIntegration:
    
    @pytest.fixture
    async def app(self):
        """Create application instance."""
        app = BaseApplication(name="test-app")
        
        await app.setup_workflow(
            workflow_and_activities_classes=[
                (TestWorkflow, TestActivities)
            ]
        )
        
        yield app
        
        # Cleanup
        if app.worker:
            await app.worker.shutdown()
    
    @pytest.mark.asyncio
    async def test_workflow_execution(self, app):
        """Test end-to-end workflow execution."""
        workflow_args = {
            "workflow_id": "test-workflow",
            "metadata": {"source": "test"}
        }
        
        # Store workflow args in state store
        from application_sdk.services.statestore import StateStore, StateType
        await StateStore.save_state_object(
            id="test-workflow",
            value=workflow_args,
            type=StateType.WORKFLOWS
        )
        
        # Execute workflow
        result = await app.start_workflow(
            workflow_args={"workflow_id": "test-workflow"},
            workflow_class=TestWorkflow
        )
        
        assert result is not None

Handler Test

import pytest
from application_sdk.handlers.base import BaseHandler
from application_sdk.clients.base import BaseClient
from unittest.mock import AsyncMock, Mock
import httpx

class TestMyHandler:
    
    @pytest.fixture
    def client(self):
        """Create mock client."""
        client = Mock(spec=BaseClient)
        client.execute_http_get_request = AsyncMock()
        return client
    
    @pytest.fixture
    def handler(self, client):
        """Create handler instance."""
        handler = MyHandler()
        handler.client = client
        return handler
    
    @pytest.mark.asyncio
    async def test_preflight_check_success(self, handler, client):
        """Test successful preflight check."""
        # Mock successful response
        mock_response = Mock(spec=httpx.Response)
        mock_response.json.return_value = {"status": "ok"}
        client.execute_http_get_request.return_value = mock_response
        
        result = await handler.preflight_check(metadata={})
        
        assert result["authentication"]["success"] is True
    
    @pytest.mark.asyncio
    async def test_preflight_check_failure(self, handler, client):
        """Test failed preflight check."""
        # Mock failed response
        client.execute_http_get_request.return_value = None
        
        result = await handler.preflight_check(metadata={})
        
        assert result["authentication"]["success"] is False
        assert result["authentication"]["failureMessage"] != ""

Property-Based Test

import pytest
from hypothesis import given, strategies as st
from application_sdk.transformers import TransformerInterface
import daft

class TestTransformer:
    
    @given(
        typename=st.sampled_from(["Table", "Column", "Schema"]),
        workflow_id=st.uuids().map(str),
        workflow_run_id=st.uuids().map(str)
    )
    def test_transform_metadata_properties(self, typename, workflow_id, workflow_run_id):
        """Test transformer with generated inputs."""
        transformer = MyTransformer()
        
        # Create test dataframe
        df = daft.from_pydict({
            "name": ["test1", "test2"],
            "qualified_name": ["db.schema.test1", "db.schema.test2"]
        })
        
        # Transform
        result = transformer.transform_metadata(
            typename=typename,
            dataframe=df,
            workflow_id=workflow_id,
            workflow_run_id=workflow_run_id
        )
        
        # Verify properties
        assert "workflow_id" in result.column_names
        assert "workflow_run_id" in result.column_names
        assert len(result) == 2

Performance Test

import pytest
import time
from application_sdk.test_utils.scale_data_generator import DataGenerator

class TestPerformance:
    
    @pytest.mark.performance
    @pytest.mark.asyncio
    async def test_large_batch_processing(self):
        """Test processing large batches."""
        # Generate test data
        generator = DataGenerator(config={
            "entities": {
                "items": {"count": 10000}
            }
        })
        data = generator.generate()
        
        # Measure processing time
        start_time = time.time()
        
        result = await process_batch(data["items"])
        
        duration = time.time() - start_time
        
        # Assert performance requirements
        assert duration < 60  # Should complete within 60 seconds
        assert result["processed_count"] == 10000

Best Practices

Unit Testing

  • Use mocks for external dependencies
  • Test individual functions in isolation
  • Verify both success and failure cases
  • Use fixtures for common setup

Integration Testing

  • Test complete workflows end-to-end
  • Use real services when possible
  • Clean up resources after tests
  • Test error handling and recovery

Performance Testing

  • Use scale data generators for realistic loads
  • Measure and assert on performance metrics
  • Test with production-like data volumes
  • Monitor resource usage

Test Organization

  • Group related tests in classes
  • Use descriptive test names
  • Add docstrings to tests
  • Mark slow tests appropriately

Build docs developers (and LLMs) love