Get up and running with OpenShift Python Wrapper in minutes
This guide will help you perform your first operations with the OpenShift Python Wrapper. You’ll learn how to connect to a cluster, create resources, and use the wrapper’s key features.
Let’s create a simple Namespace resource to understand the basic workflow.
from ocp_resources.namespace import Namespacefrom ocp_resources.resource import get_client# Get a client to connect to your clusterclient = get_client()# Create a namespacens = Namespace(client=client, name="my-first-namespace")ns.deploy()# Verify it existsprint(f"Namespace exists: {ns.exists}")print(f"Namespace status: {ns.status}")# Clean upns.clean_up()
The context manager pattern (with statement) is recommended as it ensures automatic resource cleanup, even if an error occurs.
Pods are fundamental workload resources. Here’s how to create and interact with them:
from ocp_resources.pod import Podfrom ocp_resources.resource import get_clientclient = get_client()# Create a pod with an nginx containerwith Pod( client=client, name="nginx-pod", namespace="default", containers=[{ "name": "nginx", "image": "nginx:latest", "ports": [{"containerPort": 80}] }]) as pod: # Wait for pod to be running pod.wait_for_status(status=Pod.Status.RUNNING, timeout=300) print(f"Pod {pod.name} is running on node: {pod.node.name}") # Get pod logs logs = pod.log() print(f"Pod logs:\n{logs}")
from ocp_resources.network import Networkfrom ocp_resources.resource import get_clientclient = get_client()# Default API group (config.openshift.io)network_config = Network(client=client, name="cluster")# Specific API group (operator.openshift.io)network_operator = Network( client=client, name="cluster", api_group=Network.ApiGroup.OPERATOR_OPENSHIFT_IO)
The wrapper includes built-in schema validation to catch errors early:
from ocp_resources.pod import Podfrom ocp_resources.exceptions import ValidationError# Create a pod and validate before deployingpod = Pod( name="nginx-pod", namespace="default", containers=[{"name": "nginx", "image": "nginx:latest"}])try: pod.validate() print("Pod configuration is valid!")except ValidationError as e: print(f"Validation failed: {e}")
from ocp_resources.pod import Podfrom ocp_resources.namespace import Namespacefrom ocp_resources.resource import get_clientclient = get_client()# Get all pods in a namespacefor pod in Pod.get(client=client, namespace="kube-system"): print(f"Pod: {pod.name}")# Filter by labelsfor pod in Pod.get(client=client, label_selector="app=nginx,tier=frontend"): print(f"Found pod: {pod.name}")# Get by field selectorfor pod in Pod.get(client=client, field_selector="status.phase=Running"): print(f"Running pod: {pod.name}")
from ocp_resources.namespace import Namespacefrom ocp_resources.resource import get_clientfrom ocp_resources.exceptions import ResourceTeardownErrorfrom timeout_sampler import TimeoutExpiredErrorfrom kubernetes.dynamic.exceptions import NotFoundErrorclient = get_client()try: ns = Namespace(client=client, name="my-namespace") ns.deploy() ns.wait_for_status(status=Namespace.Status.ACTIVE, timeout=120)except TimeoutExpiredError: print("Resource did not reach desired state in time")except NotFoundError: print("Resource not found on cluster")except ResourceTeardownError: print("Failed to clean up resource")finally: # Ensure cleanup if ns.exists: ns.clean_up()
Always prefer context managers (with statements) for automatic cleanup:
# Goodwith Namespace(client=client, name="test") as ns: # Work with namespace pass # Automatic cleanup# Avoidns = Namespace(client=client, name="test")ns.deploy()# ... do work ...ns.clean_up() # Easy to forget!
Enable Schema Validation
Catch configuration errors early:
pod = Pod( client=client, name="my-pod", namespace="default", containers=[...], schema_validation_enabled=True)
Wait for Resource Ready State
Don’t assume immediate availability:
pod.deploy()pod.wait_for_status(status=Pod.Status.RUNNING, timeout=300)# Now safe to interact with the pod
Use Appropriate Timeouts
Set realistic timeouts based on resource type:
# Pods might take a few minutespod.wait_for_status(status=Pod.Status.RUNNING, timeout=300)# Deployments can take longerdeployment.wait_for_replicas(timeout=600)
Handle Cleanup Failures
Ensure resources are cleaned up even on errors:
from ocp_resources.exceptions import ResourceTeardownErrortry: with Namespace(client=client, name="test") as ns: # ... work ... passexcept ResourceTeardownError as e: print(f"Warning: Cleanup failed: {e}") # Handle or log the error