Skip to main content
Context managers provide automatic resource creation and cleanup, ensuring resources are properly deleted even if errors occur. This is one of the most powerful features of the OpenShift Python Wrapper.

What Are Context Managers?

Context managers use Python’s with statement to manage resource lifecycle:
  1. Entering: Resource is created/deployed when entering the context
  2. Working: You work with the resource within the context
  3. Exiting: Resource is automatically cleaned up when exiting
This pattern is defined in ocp_resources/resource.py:759 with the __enter__ and __exit__ methods.

Basic Usage

Simple Context Manager

from ocp_resources.namespace import Namespace
from ocp_resources.resource import get_client

client = get_client()

# Resource is created on entry, deleted on exit
with Namespace(client=client, name="test-namespace") as ns:
    print(f"Namespace {ns.name} exists: {ns.exists}")  # True
    # Do work with the namespace
    ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=120)

# Namespace is automatically deleted here
print(f"Namespace exists: {ns.exists}")  # False

Context Manager with Error Handling

from ocp_resources.pod import Pod

try:
    with Pod(
        client=client,
        name="test-pod",
        namespace="default",
        containers=[{"name": "nginx", "image": "nginx:latest"}]
    ) as pod:
        # Wait for pod to be ready
        pod.wait_for_condition(
            condition=Pod.Condition.READY,
            status=Pod.Condition.Status.TRUE,
            timeout=300
        )
        
        # Simulate an error
        raise ValueError("Something went wrong")
        
except ValueError as e:
    print(f"Error occurred: {e}")
    # Pod is still cleaned up automatically!

Context Manager Lifecycle

Understanding the exact flow helps you use context managers effectively:
from ocp_resources.namespace import Namespace

client = get_client()

print("1. Before context manager")
ns = Namespace(client=client, name="lifecycle-demo")
print(f"   Exists: {ns.exists}")  # False

with ns as namespace:
    print("2. Inside context (after __enter__)")
    print(f"   Exists: {namespace.exists}")  # True
    print(f"   Status: {namespace.status}")  # Active
    
    # __enter__ calls deploy(wait=False) by default
    # You can wait explicitly if needed
    namespace.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60)
    
print("3. After context (after __exit__)")
print(f"   Exists: {ns.exists}")  # False (if teardown=True)

Lifecycle Flow Diagram

┌─────────────────────────────────────────┐
│ Resource object created                 │
│ (not yet on cluster)                    │
└───────────────┬─────────────────────────┘


         ┌─────────────┐
         │ __enter__() │
         └──────┬──────┘


         ┌─────────────┐
         │  deploy()   │ ──────► Resource created on cluster
         └──────┬──────┘


    ┌───────────────────────┐
    │  Context body runs    │
    │  (your code here)     │
    └───────────┬───────────┘


         ┌─────────────┐
         │ __exit__()  │
         └──────┬──────┘


         ┌─────────────┐
         │  clean_up() │ ──────► Resource deleted from cluster
         └──────┬──────┘


┌─────────────────────────────────────────┐
│ Resource removed from cluster           │
└─────────────────────────────────────────┘

Controlling Teardown Behavior

Disable Automatic Cleanup

from ocp_resources.namespace import Namespace

# Set teardown=False to keep resource after context exits
with Namespace(
    client=client,
    name="persistent-namespace",
    teardown=False  # Won't be deleted
) as ns:
    ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60)
    print(f"Namespace created: {ns.name}")

# Namespace still exists after context
print(f"Still exists: {ns.exists}")  # True

# Manual cleanup if needed
ns.clean_up()

Custom Cleanup Timeout

from ocp_resources.deployment import Deployment

# Configure delete timeout
with Deployment(
    client=client,
    namespace="default",
    name="long-running-app",
    delete_timeout=600,  # 10 minutes for cleanup
    replicas=3,
    selector={"matchLabels": {"app": "myapp"}},
    template={
        "metadata": {"labels": {"app": "myapp"}},
        "spec": {
            "containers": [{
                "name": "app",
                "image": "myapp:latest"
            }]
        }
    }
) as deployment:
    # Work with deployment
    deployment.wait_for_condition(
        condition=Deployment.Condition.AVAILABLE,
        status=Deployment.Condition.Status.TRUE,
        timeout=300
    )
    # Cleanup will wait up to 600 seconds

Wait for Resource on Entry

By default, __enter__ calls deploy(wait=False). You can change this:
from ocp_resources.pod import Pod

# Create pod with wait_for_resource=True
with Pod(
    client=client,
    namespace="default",
    name="nginx-pod",
    containers=[{"name": "nginx", "image": "nginx:latest"}],
    wait_for_resource=True  # Waits for pod to exist before proceeding
) as pod:
    # Pod is guaranteed to exist here
    print(f"Pod status: {pod.status}")

Nested Context Managers

You can nest context managers to manage dependencies:
from ocp_resources.namespace import Namespace
from ocp_resources.config_map import ConfigMap
from ocp_resources.deployment import Deployment
from ocp_resources.service import Service

client = get_client()

# Create namespace first
with Namespace(client=client, name="my-app") as ns:
    ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60)
    
    # Create config map in the namespace
    with ConfigMap(
        client=client,
        namespace="my-app",
        name="app-config",
        data={"config.yaml": "setting: value"}
    ) as config:
        
        # Create deployment that uses config map
        with Deployment(
            client=client,
            namespace="my-app",
            name="web-app",
            replicas=2,
            selector={"matchLabels": {"app": "web"}},
            template={
                "metadata": {"labels": {"app": "web"}},
                "spec": {
                    "containers": [{
                        "name": "nginx",
                        "image": "nginx:latest",
                        "envFrom": [{"configMapRef": {"name": "app-config"}}]
                    }]
                }
            }
        ) as deployment:
            
            # Create service
            with Service(
                client=client,
                namespace="my-app",
                name="web-service",
                selector={"app": "web"},
                ports=[{"port": 80, "targetPort": 80}]
            ) as service:
                
                # Run tests or operations
                print(f"Application ready at service: {service.name}")
                
                # Everything will be cleaned up in reverse order:
                # 1. Service deleted
                # 2. Deployment deleted
                # 3. ConfigMap deleted
                # 4. Namespace deleted

Multiple Resources (Flat Structure)

For multiple independent resources, use flat structure:
from ocp_resources.namespace import Namespace
from ocp_resources.pod import Pod

client = get_client()

with (
    Namespace(client=client, name="test-ns") as ns,
    Pod(
        client=client,
        namespace="test-ns",
        name="pod-1",
        containers=[{"name": "nginx", "image": "nginx"}]
    ) as pod1,
    Pod(
        client=client,
        namespace="test-ns",
        name="pod-2",
        containers=[{"name": "busybox", "image": "busybox"}]
    ) as pod2
):
    ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60)
    pod1.wait_for_condition(
        condition=Pod.Condition.READY,
        status=Pod.Condition.Status.TRUE,
        timeout=300
    )
    pod2.wait_for_condition(
        condition=Pod.Condition.READY,
        status=Pod.Condition.Status.TRUE,
        timeout=300
    )
    
    # All resources cleaned up automatically

Testing Patterns

Pytest Fixtures with Context Managers

import pytest
from ocp_resources.namespace import Namespace
from ocp_resources.pod import Pod
from ocp_resources.resource import get_client

@pytest.fixture(scope="function")
def test_namespace(request):
    """Provide a test namespace that's cleaned up after test."""
    client = get_client()
    ns_name = f"test-{request.node.name}"
    
    with Namespace(client=client, name=ns_name) as ns:
        ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60)
        yield ns
    # Automatic cleanup

@pytest.fixture
def test_pod(test_namespace):
    """Provide a test pod in test namespace."""
    with Pod(
        client=test_namespace.client,
        namespace=test_namespace.name,
        name="test-pod",
        containers=[{"name": "nginx", "image": "nginx:latest"}]
    ) as pod:
        pod.wait_for_condition(
            condition=Pod.Condition.READY,
            status=Pod.Condition.Status.TRUE,
            timeout=300
        )
        yield pod
    # Automatic cleanup

def test_pod_operations(test_pod):
    """Test pod operations."""
    assert test_pod.exists
    assert test_pod.status == Pod.Status.RUNNING
    
    # Test logs
    logs = test_pod.log()
    assert logs is not None
    
    # Cleanup happens automatically

Conditional Cleanup

from ocp_resources.namespace import Namespace
import os

client = get_client()

# Keep resources on failure for debugging
keep_on_error = os.getenv("KEEP_ON_ERROR", "false") == "true"

ns = Namespace(client=client, name="test-ns", teardown=True)

try:
    with ns:
        ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60)
        
        # Your test code that might fail
        perform_test_operations(ns)
        
except Exception as e:
    if keep_on_error:
        print(f"Error occurred: {e}")
        print(f"Namespace {ns.name} kept for debugging")
        ns.teardown = False  # Disable cleanup
    raise

Environment-Specific Cleanup

Skip Cleanup in Debug Mode

import os
from ocp_resources.namespace import Namespace

client = get_client()

# Check debug environment variable
debug_mode = os.getenv("DEBUG", "false") == "true"

with Namespace(
    client=client,
    name="debug-namespace",
    teardown=not debug_mode  # Don't cleanup in debug mode
) as ns:
    ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60)
    # Do work
    
# In debug mode, namespace persists for inspection

Environment Variable Control

The wrapper supports environment variables for controlling cleanup:
# Skip teardown of specific resources
export SKIP_RESOURCE_TEARDOWN="{Pod: {my-pod: my-namespace}}"

# Skip teardown of all resources of a kind
export SKIP_RESOURCE_TEARDOWN="{Namespace: {}}"

# Skip teardown of multiple resources
export SKIP_RESOURCE_TEARDOWN="{Namespace: {test-ns:}, Pod: {test-pod: default}}"
from ocp_resources.pod import Pod

# This pod won't be deleted if SKIP_RESOURCE_TEARDOWN is set appropriately
with Pod(
    client=client,
    namespace="my-namespace",
    name="my-pod",
    containers=[{"name": "nginx", "image": "nginx"}]
) as pod:
    # Do work
    pass
# Cleanup skipped due to environment variable

Error Handling in Context Managers

Handle Teardown Errors

from ocp_resources.exceptions import ResourceTeardownError
from ocp_resources.pod import Pod

try:
    with Pod(
        client=client,
        namespace="default",
        name="test-pod",
        containers=[{"name": "nginx", "image": "nginx"}]
    ) as pod:
        # Work with pod
        print(f"Pod created: {pod.name}")
        
except ResourceTeardownError as e:
    print(f"Failed to clean up resource: {e}")
    # Handle cleanup failure

Custom Context Manager Class

You can create custom resource classes with specialized cleanup:
from ocp_resources.secret import Secret

class SecretWithBackup(Secret):
    """Secret that backs up data before deletion."""
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.exists:
            # Back up secret data
            secret_data = self.instance.to_dict()
            with open(f"{self.name}-backup.yaml", "w") as f:
                import yaml
                yaml.dump(secret_data, f)
            print(f"Backed up secret to {self.name}-backup.yaml")
        
        # Call parent cleanup
        super().__exit__(exc_type, exc_val, exc_tb)

# Usage
with SecretWithBackup(
    client=client,
    namespace="default",
    name="important-secret",
    string_data={"key": "value"}
) as secret:
    # Work with secret
    pass
# Secret is backed up before deletion

Performance Considerations

Parallel Resource Creation

Context managers create resources sequentially. For parallel creation:
from concurrent.futures import ThreadPoolExecutor
from ocp_resources.pod import Pod

def create_pod(client, namespace, name):
    """Create a pod."""
    pod = Pod(
        client=client,
        namespace=namespace,
        name=name,
        containers=[{"name": "nginx", "image": "nginx"}],
        teardown=False  # Manual cleanup
    )
    pod.deploy()
    return pod

client = get_client()
pod_names = [f"pod-{i}" for i in range(10)]

# Create pods in parallel
with ThreadPoolExecutor(max_workers=5) as executor:
    pods = list(executor.map(
        lambda name: create_pod(client, "default", name),
        pod_names
    ))

try:
    # Work with pods
    for pod in pods:
        print(f"Pod {pod.name} exists: {pod.exists}")
finally:
    # Clean up all pods
    for pod in pods:
        pod.clean_up(wait=False)

Best Practices

Use Context Managers

Always use context managers for temporary resources to ensure cleanup.

Set Appropriate Timeouts

Configure delete_timeout for resources that take time to clean up.

Handle Errors Gracefully

Catch ResourceTeardownError for critical cleanup failures.

Use teardown=False for Persistence

Disable teardown for resources that should outlive the context.

Common Patterns

Pattern 1: Test Setup and Teardown

from ocp_resources.namespace import Namespace
from ocp_resources.deployment import Deployment

def test_deployment_scaling():
    """Test deployment scaling."""
    client = get_client()
    
    with Namespace(client=client, name="test-scaling") as ns:
        ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60)
        
        with Deployment(
            client=client,
            namespace=ns.name,
            name="test-app",
            replicas=1,
            selector={"matchLabels": {"app": "test"}},
            template={
                "metadata": {"labels": {"app": "test"}},
                "spec": {
                    "containers": [{
                        "name": "nginx",
                        "image": "nginx:latest"
                    }]
                }
            }
        ) as deployment:
            # Scale up
            deployment_dict = deployment.instance.to_dict()
            deployment_dict["spec"]["replicas"] = 3
            deployment.update(resource_dict=deployment_dict)
            
            # Verify scaling
            import time
            time.sleep(10)
            assert deployment.instance.spec.replicas == 3
            
        # Automatic cleanup

Pattern 2: Resource Factory

from contextlib import contextmanager
from ocp_resources.pod import Pod

@contextmanager
def temporary_pod(client, namespace, name, image):
    """Create a temporary pod for testing."""
    pod = Pod(
        client=client,
        namespace=namespace,
        name=name,
        containers=[{"name": "main", "image": image}]
    )
    
    try:
        pod.deploy()
        pod.wait_for_condition(
            condition=Pod.Condition.READY,
            status=Pod.Condition.Status.TRUE,
            timeout=300
        )
        yield pod
    finally:
        pod.clean_up(wait=True)

# Usage
client = get_client()
with temporary_pod(client, "default", "test-pod", "nginx:latest") as pod:
    # Work with pod
    logs = pod.log()
    print(logs)

Next Steps

Resource Class

Learn about the base Resource class

Namespaced Resources

Working with namespaced resources

Testing Guide

Best practices for testing with resources

Exceptions

Handle errors and exceptions

Build docs developers (and LLMs) love