What Are Context Managers?
Context managers use Python’swith statement to manage resource lifecycle:
- Entering: Resource is created/deployed when entering the context
- Working: You work with the resource within the context
- Exiting: Resource is automatically cleaned up when exiting
ocp_resources/resource.py:759 with the __enter__ and __exit__ methods.
Basic Usage
Simple Context Manager
from ocp_resources.namespace import Namespace
from ocp_resources.resource import get_client
client = get_client()
# Resource is created on entry, deleted on exit
with Namespace(client=client, name="test-namespace") as ns:
print(f"Namespace {ns.name} exists: {ns.exists}") # True
# Do work with the namespace
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=120)
# Namespace is automatically deleted here
print(f"Namespace exists: {ns.exists}") # False
Context Manager with Error Handling
from ocp_resources.pod import Pod
try:
with Pod(
client=client,
name="test-pod",
namespace="default",
containers=[{"name": "nginx", "image": "nginx:latest"}]
) as pod:
# Wait for pod to be ready
pod.wait_for_condition(
condition=Pod.Condition.READY,
status=Pod.Condition.Status.TRUE,
timeout=300
)
# Simulate an error
raise ValueError("Something went wrong")
except ValueError as e:
print(f"Error occurred: {e}")
# Pod is still cleaned up automatically!
Context Manager Lifecycle
Understanding the exact flow helps you use context managers effectively:from ocp_resources.namespace import Namespace
client = get_client()
print("1. Before context manager")
ns = Namespace(client=client, name="lifecycle-demo")
print(f" Exists: {ns.exists}") # False
with ns as namespace:
print("2. Inside context (after __enter__)")
print(f" Exists: {namespace.exists}") # True
print(f" Status: {namespace.status}") # Active
# __enter__ calls deploy(wait=False) by default
# You can wait explicitly if needed
namespace.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60)
print("3. After context (after __exit__)")
print(f" Exists: {ns.exists}") # False (if teardown=True)
Lifecycle Flow Diagram
┌─────────────────────────────────────────┐
│ Resource object created │
│ (not yet on cluster) │
└───────────────┬─────────────────────────┘
│
▼
┌─────────────┐
│ __enter__() │
└──────┬──────┘
│
▼
┌─────────────┐
│ deploy() │ ──────► Resource created on cluster
└──────┬──────┘
│
▼
┌───────────────────────┐
│ Context body runs │
│ (your code here) │
└───────────┬───────────┘
│
▼
┌─────────────┐
│ __exit__() │
└──────┬──────┘
│
▼
┌─────────────┐
│ clean_up() │ ──────► Resource deleted from cluster
└──────┬──────┘
│
▼
┌─────────────────────────────────────────┐
│ Resource removed from cluster │
└─────────────────────────────────────────┘
Controlling Teardown Behavior
Disable Automatic Cleanup
from ocp_resources.namespace import Namespace
# Set teardown=False to keep resource after context exits
with Namespace(
client=client,
name="persistent-namespace",
teardown=False # Won't be deleted
) as ns:
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60)
print(f"Namespace created: {ns.name}")
# Namespace still exists after context
print(f"Still exists: {ns.exists}") # True
# Manual cleanup if needed
ns.clean_up()
Custom Cleanup Timeout
from ocp_resources.deployment import Deployment
# Configure delete timeout
with Deployment(
client=client,
namespace="default",
name="long-running-app",
delete_timeout=600, # 10 minutes for cleanup
replicas=3,
selector={"matchLabels": {"app": "myapp"}},
template={
"metadata": {"labels": {"app": "myapp"}},
"spec": {
"containers": [{
"name": "app",
"image": "myapp:latest"
}]
}
}
) as deployment:
# Work with deployment
deployment.wait_for_condition(
condition=Deployment.Condition.AVAILABLE,
status=Deployment.Condition.Status.TRUE,
timeout=300
)
# Cleanup will wait up to 600 seconds
Wait for Resource on Entry
By default,__enter__ calls deploy(wait=False). You can change this:
from ocp_resources.pod import Pod
# Create pod with wait_for_resource=True
with Pod(
client=client,
namespace="default",
name="nginx-pod",
containers=[{"name": "nginx", "image": "nginx:latest"}],
wait_for_resource=True # Waits for pod to exist before proceeding
) as pod:
# Pod is guaranteed to exist here
print(f"Pod status: {pod.status}")
Nested Context Managers
You can nest context managers to manage dependencies:from ocp_resources.namespace import Namespace
from ocp_resources.config_map import ConfigMap
from ocp_resources.deployment import Deployment
from ocp_resources.service import Service
client = get_client()
# Create namespace first
with Namespace(client=client, name="my-app") as ns:
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60)
# Create config map in the namespace
with ConfigMap(
client=client,
namespace="my-app",
name="app-config",
data={"config.yaml": "setting: value"}
) as config:
# Create deployment that uses config map
with Deployment(
client=client,
namespace="my-app",
name="web-app",
replicas=2,
selector={"matchLabels": {"app": "web"}},
template={
"metadata": {"labels": {"app": "web"}},
"spec": {
"containers": [{
"name": "nginx",
"image": "nginx:latest",
"envFrom": [{"configMapRef": {"name": "app-config"}}]
}]
}
}
) as deployment:
# Create service
with Service(
client=client,
namespace="my-app",
name="web-service",
selector={"app": "web"},
ports=[{"port": 80, "targetPort": 80}]
) as service:
# Run tests or operations
print(f"Application ready at service: {service.name}")
# Everything will be cleaned up in reverse order:
# 1. Service deleted
# 2. Deployment deleted
# 3. ConfigMap deleted
# 4. Namespace deleted
Multiple Resources (Flat Structure)
For multiple independent resources, use flat structure:from ocp_resources.namespace import Namespace
from ocp_resources.pod import Pod
client = get_client()
with (
Namespace(client=client, name="test-ns") as ns,
Pod(
client=client,
namespace="test-ns",
name="pod-1",
containers=[{"name": "nginx", "image": "nginx"}]
) as pod1,
Pod(
client=client,
namespace="test-ns",
name="pod-2",
containers=[{"name": "busybox", "image": "busybox"}]
) as pod2
):
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60)
pod1.wait_for_condition(
condition=Pod.Condition.READY,
status=Pod.Condition.Status.TRUE,
timeout=300
)
pod2.wait_for_condition(
condition=Pod.Condition.READY,
status=Pod.Condition.Status.TRUE,
timeout=300
)
# All resources cleaned up automatically
Testing Patterns
Pytest Fixtures with Context Managers
import pytest
from ocp_resources.namespace import Namespace
from ocp_resources.pod import Pod
from ocp_resources.resource import get_client
@pytest.fixture(scope="function")
def test_namespace(request):
"""Provide a test namespace that's cleaned up after test."""
client = get_client()
ns_name = f"test-{request.node.name}"
with Namespace(client=client, name=ns_name) as ns:
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60)
yield ns
# Automatic cleanup
@pytest.fixture
def test_pod(test_namespace):
"""Provide a test pod in test namespace."""
with Pod(
client=test_namespace.client,
namespace=test_namespace.name,
name="test-pod",
containers=[{"name": "nginx", "image": "nginx:latest"}]
) as pod:
pod.wait_for_condition(
condition=Pod.Condition.READY,
status=Pod.Condition.Status.TRUE,
timeout=300
)
yield pod
# Automatic cleanup
def test_pod_operations(test_pod):
"""Test pod operations."""
assert test_pod.exists
assert test_pod.status == Pod.Status.RUNNING
# Test logs
logs = test_pod.log()
assert logs is not None
# Cleanup happens automatically
Conditional Cleanup
from ocp_resources.namespace import Namespace
import os
client = get_client()
# Keep resources on failure for debugging
keep_on_error = os.getenv("KEEP_ON_ERROR", "false") == "true"
ns = Namespace(client=client, name="test-ns", teardown=True)
try:
with ns:
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60)
# Your test code that might fail
perform_test_operations(ns)
except Exception as e:
if keep_on_error:
print(f"Error occurred: {e}")
print(f"Namespace {ns.name} kept for debugging")
ns.teardown = False # Disable cleanup
raise
Environment-Specific Cleanup
Skip Cleanup in Debug Mode
import os
from ocp_resources.namespace import Namespace
client = get_client()
# Check debug environment variable
debug_mode = os.getenv("DEBUG", "false") == "true"
with Namespace(
client=client,
name="debug-namespace",
teardown=not debug_mode # Don't cleanup in debug mode
) as ns:
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60)
# Do work
# In debug mode, namespace persists for inspection
Environment Variable Control
The wrapper supports environment variables for controlling cleanup:# Skip teardown of specific resources
export SKIP_RESOURCE_TEARDOWN="{Pod: {my-pod: my-namespace}}"
# Skip teardown of all resources of a kind
export SKIP_RESOURCE_TEARDOWN="{Namespace: {}}"
# Skip teardown of multiple resources
export SKIP_RESOURCE_TEARDOWN="{Namespace: {test-ns:}, Pod: {test-pod: default}}"
from ocp_resources.pod import Pod
# This pod won't be deleted if SKIP_RESOURCE_TEARDOWN is set appropriately
with Pod(
client=client,
namespace="my-namespace",
name="my-pod",
containers=[{"name": "nginx", "image": "nginx"}]
) as pod:
# Do work
pass
# Cleanup skipped due to environment variable
Error Handling in Context Managers
Handle Teardown Errors
from ocp_resources.exceptions import ResourceTeardownError
from ocp_resources.pod import Pod
try:
with Pod(
client=client,
namespace="default",
name="test-pod",
containers=[{"name": "nginx", "image": "nginx"}]
) as pod:
# Work with pod
print(f"Pod created: {pod.name}")
except ResourceTeardownError as e:
print(f"Failed to clean up resource: {e}")
# Handle cleanup failure
Custom Context Manager Class
You can create custom resource classes with specialized cleanup:from ocp_resources.secret import Secret
class SecretWithBackup(Secret):
"""Secret that backs up data before deletion."""
def __exit__(self, exc_type, exc_val, exc_tb):
if self.exists:
# Back up secret data
secret_data = self.instance.to_dict()
with open(f"{self.name}-backup.yaml", "w") as f:
import yaml
yaml.dump(secret_data, f)
print(f"Backed up secret to {self.name}-backup.yaml")
# Call parent cleanup
super().__exit__(exc_type, exc_val, exc_tb)
# Usage
with SecretWithBackup(
client=client,
namespace="default",
name="important-secret",
string_data={"key": "value"}
) as secret:
# Work with secret
pass
# Secret is backed up before deletion
Performance Considerations
Parallel Resource Creation
Context managers create resources sequentially. For parallel creation:from concurrent.futures import ThreadPoolExecutor
from ocp_resources.pod import Pod
def create_pod(client, namespace, name):
"""Create a pod."""
pod = Pod(
client=client,
namespace=namespace,
name=name,
containers=[{"name": "nginx", "image": "nginx"}],
teardown=False # Manual cleanup
)
pod.deploy()
return pod
client = get_client()
pod_names = [f"pod-{i}" for i in range(10)]
# Create pods in parallel
with ThreadPoolExecutor(max_workers=5) as executor:
pods = list(executor.map(
lambda name: create_pod(client, "default", name),
pod_names
))
try:
# Work with pods
for pod in pods:
print(f"Pod {pod.name} exists: {pod.exists}")
finally:
# Clean up all pods
for pod in pods:
pod.clean_up(wait=False)
Best Practices
Use Context Managers
Always use context managers for temporary resources to ensure cleanup.
Set Appropriate Timeouts
Configure
delete_timeout for resources that take time to clean up.Handle Errors Gracefully
Catch
ResourceTeardownError for critical cleanup failures.Use teardown=False for Persistence
Disable teardown for resources that should outlive the context.
Common Patterns
Pattern 1: Test Setup and Teardown
from ocp_resources.namespace import Namespace
from ocp_resources.deployment import Deployment
def test_deployment_scaling():
"""Test deployment scaling."""
client = get_client()
with Namespace(client=client, name="test-scaling") as ns:
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=60)
with Deployment(
client=client,
namespace=ns.name,
name="test-app",
replicas=1,
selector={"matchLabels": {"app": "test"}},
template={
"metadata": {"labels": {"app": "test"}},
"spec": {
"containers": [{
"name": "nginx",
"image": "nginx:latest"
}]
}
}
) as deployment:
# Scale up
deployment_dict = deployment.instance.to_dict()
deployment_dict["spec"]["replicas"] = 3
deployment.update(resource_dict=deployment_dict)
# Verify scaling
import time
time.sleep(10)
assert deployment.instance.spec.replicas == 3
# Automatic cleanup
Pattern 2: Resource Factory
from contextlib import contextmanager
from ocp_resources.pod import Pod
@contextmanager
def temporary_pod(client, namespace, name, image):
"""Create a temporary pod for testing."""
pod = Pod(
client=client,
namespace=namespace,
name=name,
containers=[{"name": "main", "image": image}]
)
try:
pod.deploy()
pod.wait_for_condition(
condition=Pod.Condition.READY,
status=Pod.Condition.Status.TRUE,
timeout=300
)
yield pod
finally:
pod.clean_up(wait=True)
# Usage
client = get_client()
with temporary_pod(client, "default", "test-pod", "nginx:latest") as pod:
# Work with pod
logs = pod.log()
print(logs)
Next Steps
Resource Class
Learn about the base Resource class
Namespaced Resources
Working with namespaced resources
Testing Guide
Best practices for testing with resources
Exceptions
Handle errors and exceptions