Overview
All resources in the OpenShift Python Wrapper follow a unified CRUD (Create, Read, Update, Delete) pattern. This guide demonstrates common operations that apply to all resource types.
Getting a Client
All resource operations require a client connection:
from ocp_resources.resource import get_client
# Connect using default kubeconfig
client = get_client()
# Connect with specific kubeconfig file
client = get_client(config_file="/path/to/kubeconfig")
# Connect with specific context
client = get_client(context="production-cluster")
# Use fake client for testing
client = get_client(fake=True)
The get_client() function automatically handles authentication and cluster connection. For testing, use fake=True to create resources without a real cluster.
Create Operations
Deploy a Resource
The deploy() method creates a resource in the cluster:
from ocp_resources.namespace import Namespace
client = get_client()
ns = Namespace(client=client, name="my-namespace")
ns.deploy()
assert ns.exists
print(f"Namespace {ns.name} created successfully")
Create Method
Alternatively, use the create() method:
from ocp_resources.config_map import ConfigMap
configmap = ConfigMap(
client=client,
name="app-config",
namespace="default",
data={
"database.url": "postgresql://localhost:5432/mydb",
"log.level": "INFO"
}
)
configmap.create()
Using Context Managers
Context managers provide automatic cleanup:
with Namespace(client=client, name="temporary-ns") as ns:
# Wait for namespace to be active
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=120)
# Use the namespace
print(f"Namespace {ns.name} is active")
# Namespace is automatically deleted here
Always prefer context managers when possible. They ensure resources are cleaned up even if exceptions occur.
Disable Automatic Teardown
# Keep resource after context exits
with Namespace(client=client, name="persistent-ns", teardown=False) as ns:
ns.deploy()
# Namespace will NOT be deleted
Read Operations
Check if Resource Exists
ns = Namespace(client=client, name="my-namespace")
if ns.exists:
print(f"Namespace {ns.name} exists")
else:
print(f"Namespace {ns.name} not found")
Get Resource Instance
Access the underlying Kubernetes object:
from ocp_resources.deployment import Deployment
deployment = Deployment(client=client, name="myapp", namespace="production")
# Access the raw instance
print(f"Replicas: {deployment.instance.spec.replicas}")
print(f"Available: {deployment.instance.status.availableReplicas}")
Query Resources with Get
The get() class method returns a generator of resources:
from ocp_resources.pod import Pod
# Get all pods in a namespace
for pod in Pod.get(client=client, namespace="default"):
print(f"Pod: {pod.name}")
# Get pods with label selector
for pod in Pod.get(client=client, label_selector="app=nginx"):
print(f"Nginx pod: {pod.name}, Node: {pod.node.name}")
# Get pods across all namespaces
for pod in Pod.get(client=client):
print(f"Pod: {pod.name} in namespace {pod.namespace}")
Get Single Resource
Get a specific resource by name:
from ocp_resources.service import Service
# Get a specific service
service = Service(client=client, name="nginx-service", namespace="default")
if service.exists:
print(f"Service type: {service.instance.spec.type}")
print(f"Ports: {service.instance.spec.ports}")
Update Operations
Update Resource
Modify and update a resource:
from ocp_resources.deployment import Deployment
deployment = Deployment(client=client, name="myapp", namespace="production")
# Modify the resource
deployment.res["spec"]["replicas"] = 5
deployment.res["spec"]["template"]["spec"]["containers"][0]["image"] = "myapp:v2.0"
# Apply the update
deployment.update(resource_dict=deployment.res)
print(f"Deployment {deployment.name} updated")
Patch Resource
Apply partial updates:
# Patch with specific changes
patch_data = {
"spec": {
"replicas": 10
}
}
deployment.update(resource_dict=patch_data)
Update Labels
from ocp_resources.pod import Pod
pod = Pod(client=client, name="my-pod", namespace="default")
# Add or update labels
pod.res["metadata"]["labels"] = {
"app": "myapp",
"version": "v2.0",
"environment": "production"
}
pod.update(resource_dict=pod.res)
Delete Operations
Clean Up Resource
The clean_up() method deletes a resource:
ns = Namespace(client=client, name="my-namespace")
ns.clean_up()
print(f"Namespace {ns.name} deleted")
Wait for Deletion
Wait until a resource is fully deleted:
from ocp_resources.pod import Pod
pod = Pod(client=client, name="my-pod", namespace="default")
pod.clean_up()
pod.wait_deleted(timeout=60)
print(f"Pod {pod.name} fully deleted")
Some resources have finalizers that prevent immediate deletion. Always use wait_deleted() to ensure complete removal.
Delete with Grace Period
# Delete immediately (grace period = 0)
pod.clean_up()
pod.delete(body={"gracePeriodSeconds": 0})
Wait Operations
Wait for Status
Wait for a resource to reach a specific status:
from ocp_resources.namespace import Namespace
ns = Namespace(client=client, name="my-namespace")
ns.deploy()
# Wait for namespace to be active
ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=120)
print(f"Namespace is active")
Wait for Condition
Wait for specific conditions:
from ocp_resources.deployment import Deployment
deployment = Deployment(client=client, name="myapp", namespace="production")
# Wait for deployment to be fully deployed
deployment.wait_for_replicas(deployed=True, timeout=240)
print(f"All replicas are ready")
Custom Wait Conditions
from timeout_sampler import TimeoutSampler
# Wait for custom condition
for sample in TimeoutSampler(
wait_timeout=120,
sleep=5,
func=lambda: pod.instance.status.phase == "Running"
):
if sample:
print("Pod is running")
break
Working with Multiple Resources
ResourceList
Create multiple similar resources:
from ocp_resources.resource import ResourceList
from ocp_resources.namespace import Namespace
# Create 3 namespaces: ns-1, ns-2, ns-3
namespaces = ResourceList(
resource_class=Namespace,
num_resources=3,
client=client,
name="ns"
)
namespaces.deploy()
assert namespaces[0].name == "ns-1"
assert namespaces[2].name == "ns-3"
# Clean up all
namespaces.clean_up()
NamespacedResourceList
Create resources across multiple namespaces:
from ocp_resources.resource import NamespacedResourceList
from ocp_resources.role import Role
# Create one role in each namespace
roles = NamespacedResourceList(
client=client,
resource_class=Role,
name="reader-role",
namespaces=["ns-1", "ns-2", "ns-3"],
rules=[{
"apiGroups": [""],
"resources": ["pods"],
"verbs": ["get", "list", "watch"]
}]
)
roles.deploy()
assert roles[0].namespace == "ns-1"
roles.clean_up()
Use ResourceList and NamespacedResourceList when you need to create multiple similar resources efficiently.
Working with YAML Files
Create from YAML
from ocp_resources.deployment import Deployment
# Create from YAML file
deployment = Deployment(
client=client,
yaml_file="/path/to/deployment.yaml"
)
deployment.deploy()
Create from Dictionary
resource_dict = {
"apiVersion": "v1",
"kind": "Namespace",
"metadata": {
"name": "my-namespace",
"labels": {"environment": "production"}
}
}
ns = Namespace(client=client, kind_dict=resource_dict)
ns.deploy()
Validation
Schema Validation
Validate resources before deployment:
from ocp_resources.pod import Pod
from ocp_resources.exceptions import ValidationError
pod = Pod(
name="test-pod",
namespace="default",
containers=[{"name": "nginx", "image": "nginx:latest"}]
)
try:
pod.validate()
print("Pod configuration is valid")
except ValidationError as e:
print(f"Validation failed: {e}")
Auto-validation on Create
# Enable auto-validation
pod = Pod(
client=client,
name="validated-pod",
namespace="default",
containers=[{"name": "nginx", "image": "nginx:latest"}],
schema_validation_enabled=True
)
# Validation happens automatically during create
pod.create() # Raises ValidationError if invalid
Error Handling
Common Exceptions
from ocp_resources.exceptions import (
MissingRequiredArgumentError,
ResourceTeardownError,
ValidationError
)
from kubernetes.dynamic.exceptions import (
NotFoundError,
ConflictError,
ForbiddenError
)
try:
ns.deploy()
except MissingRequiredArgumentError as e:
print(f"Missing required field: {e}")
except ConflictError:
print("Resource already exists")
except ForbiddenError:
print("Insufficient permissions")
except Exception as e:
print(f"Unexpected error: {e}")
Retry on Errors
from timeout_sampler import TimeoutSampler, TimeoutExpiredError
try:
for sample in TimeoutSampler(
wait_timeout=60,
sleep=5,
func=lambda: ns.deploy()
):
if sample:
break
except TimeoutExpiredError:
print("Deployment timed out")
Best Practices
- Use Context Managers: Ensure automatic cleanup with context managers
- Check Exists: Always check if a resource exists before operations
- Wait for Readiness: Use wait methods after create/update operations
- Handle Errors: Wrap operations in try-except blocks
- Use Label Selectors: Query resources efficiently with labels
- Validate Before Deploy: Enable validation to catch errors early
- Clean Up Resources: Always clean up test resources
- Use Fake Client for Tests: Test without a real cluster using
fake=True