Overview
Pods are the smallest deployable units in Kubernetes and OpenShift. The Pod class provides a Pythonic interface for creating, managing, and interacting with pods, including executing commands and retrieving logs.
Creating a Pod
Basic Pod Creation
Create a simple pod with a single container:
from ocp_resources.pod import Pod
from ocp_resources.resource import get_client
client = get_client()
pod = Pod(
client=client,
name="nginx-pod",
namespace="default",
containers=[{
"name": "nginx",
"image": "nginx:latest",
"ports": [{"containerPort": 80}]
}]
)
pod.deploy()
Pod with Advanced Configuration
Create a pod with resource limits, security context, and volumes:
pod = Pod(
client=client,
name="app-pod",
namespace="my-namespace",
containers=[{
"name": "app",
"image": "myapp:v1.0",
"resources": {
"requests": {"memory": "64Mi", "cpu": "250m"},
"limits": {"memory": "128Mi", "cpu": "500m"}
}
}],
security_context={
"runAsNonRoot": True,
"runAsUser": 1000
},
restart_policy="Always",
volumes=[{
"name": "config-volume",
"configMap": {"name": "app-config"}
}]
)
pod.deploy()
The containers parameter is required when creating a pod. If not provided, a MissingRequiredArgumentError will be raised.
Using Context Managers
Context managers provide automatic cleanup:
with Pod(
client=client,
name="temporary-pod",
namespace="default",
containers=[{"name": "nginx", "image": "nginx:latest"}]
) as pod:
pod.wait_for_status(status=Pod.Status.RUNNING, timeout=120)
# Pod is automatically deleted when exiting the context
Executing Commands in Pods
The execute() method runs commands inside a pod container:
# Execute a command in the default container
output = pod.execute(command=["ls", "-la", "/"], timeout=60)
print(output)
# Execute in a specific container
output = pod.execute(
command=["cat", "/etc/hostname"],
container="nginx",
timeout=30
)
# Ignore return codes (useful for commands that may fail)
output = pod.execute(
command=["test", "-f", "/nonexistent"],
ignore_rc=True
)
The execute() method will raise an ExecOnPodError if the command fails (returns non-zero exit code), unless ignore_rc=True is specified.
Retrieving Pod Logs
Get logs from a pod using the log() method:
# Get all logs
logs = pod.log()
print(logs)
# Get logs from a specific container
logs = pod.log(container="nginx")
# Get recent logs with tail
logs = pod.log(tail_lines=100)
# Follow logs (streaming)
logs = pod.log(follow=True)
# Get logs with timestamps
logs = pod.log(timestamps=True)
Querying Pods
Get Pods with Label Selector
Query pods using label selectors:
# Get all pods with a specific label
for pod in Pod.get(client=client, label_selector="app=nginx"):
print(f"Pod: {pod.name}, Status: {pod.status}")
# Get pods in a specific namespace
for pod in Pod.get(client=client, namespace="production", label_selector="tier=frontend"):
print(f"Pod: {pod.name}, Node: {pod.node.name}")
Access Pod Properties
# Get the node where the pod is running
node = pod.node
print(f"Pod running on node: {node.name}")
# Get pod IP address
ip_address = pod.ip
print(f"Pod IP: {ip_address}")
# Check if pod exists
if pod.exists:
print("Pod is running")
Pod Scheduling and Affinity
Node Selector
Schedule pods on specific nodes:
pod = Pod(
client=client,
name="scheduled-pod",
namespace="default",
containers=[{"name": "nginx", "image": "nginx:latest"}],
node_selector={"disktype": "ssd", "zone": "us-west-1"}
)
pod.deploy()
Pod Affinity
pod = Pod(
client=client,
name="affinity-pod",
namespace="default",
containers=[{"name": "nginx", "image": "nginx:latest"}],
affinity={
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [{
"matchExpressions": [{
"key": "kubernetes.io/hostname",
"operator": "In",
"values": ["node-1", "node-2"]
}]
}]
}
}
}
)
pod.deploy()
Init Containers
Use init containers for setup tasks:
pod = Pod(
client=client,
name="init-pod",
namespace="default",
init_containers=[{
"name": "init-setup",
"image": "busybox:latest",
"command": ["sh", "-c", "echo Initializing && sleep 5"]
}],
containers=[{
"name": "app",
"image": "nginx:latest"
}]
)
pod.deploy()
Troubleshooting
Pod Not Starting
-
Check pod status and events:
print(f"Status: {pod.instance.status.phase}")
from ocp_resources.event import Event
for event in Event.get(client=client, namespace=pod.namespace):
if event.instance.involvedObject.name == pod.name:
print(f"Event: {event.instance.message}")
-
Check container statuses:
for container_status in pod.instance.status.containerStatuses or []:
print(f"Container: {container_status.name}")
print(f"Ready: {container_status.ready}")
if container_status.state.waiting:
print(f"Waiting: {container_status.state.waiting.reason}")
-
Review pod logs for errors:
try:
logs = pod.log()
print(logs)
except Exception as e:
print(f"Could not get logs: {e}")
Command Execution Failures
If execute() times out, increase the timeout parameter or check if the pod is in Running state before executing commands.
# Wait for pod to be ready before executing
pod.wait_for_status(status=Pod.Status.RUNNING, timeout=120)
try:
output = pod.execute(command=["ls", "/app"], timeout=120)
except Exception as e:
print(f"Execution failed: {e}")
# Check pod status
print(f"Pod phase: {pod.instance.status.phase}")
Resource Cleanup
Ensure proper cleanup of pod resources:
try:
pod.deploy()
# ... use the pod ...
finally:
if pod.exists:
pod.clean_up()
pod.wait_deleted(timeout=60)
Best Practices
- Use Context Managers: Always prefer context managers for automatic cleanup
- Set Resource Limits: Define CPU and memory limits to prevent resource exhaustion
- Use Readiness Probes: Add health checks to ensure containers are ready
- Label Your Pods: Use meaningful labels for easier querying and management
- Handle Errors: Always wrap pod operations in try-except blocks
- Wait for Status: Use
wait_for_status() before performing operations on pods