Overview
Deployments provide declarative updates for Pods and ReplicaSets. The Deployment class allows you to manage application lifecycle, including scaling, rolling updates, and rollbacks.
Creating a Deployment
Basic Deployment
Create a simple deployment with required fields:
from ocp_resources.deployment import Deployment
from ocp_resources.resource import get_client
client = get_client()
deployment = Deployment(
client=client,
name="nginx-deployment",
namespace="default",
replicas=3,
selector={"matchLabels": {"app": "nginx"}},
template={
"metadata": {"labels": {"app": "nginx"}},
"spec": {
"containers": [{
"name": "nginx",
"image": "nginx:1.21",
"ports": [{"containerPort": 80}]
}]
}
}
)
deployment.deploy()
Both selector and template are required parameters. If not provided, a MissingRequiredArgumentError will be raised.
Deployment with Resource Limits
Create a deployment with resource requests and limits:
deployment = Deployment(
client=client,
name="app-deployment",
namespace="production",
replicas=5,
selector={"matchLabels": {"app": "myapp", "tier": "frontend"}},
template={
"metadata": {"labels": {"app": "myapp", "tier": "frontend"}},
"spec": {
"containers": [{
"name": "app",
"image": "myapp:v2.0",
"resources": {
"requests": {"memory": "256Mi", "cpu": "500m"},
"limits": {"memory": "512Mi", "cpu": "1000m"}
},
"ports": [{"containerPort": 8080}]
}]
}
}
)
deployment.deploy()
Using Context Managers
Automatic deployment and cleanup with context managers:
with Deployment(
client=client,
name="temporary-deployment",
namespace="default",
replicas=2,
selector={"matchLabels": {"app": "temp"}},
template={
"metadata": {"labels": {"app": "temp"}},
"spec": {
"containers": [{"name": "nginx", "image": "nginx:latest"}]
}
}
) as deployment:
deployment.wait_for_replicas(deployed=True, timeout=240)
# Deployment is automatically deleted when exiting
Scaling Deployments
Scale Replicas
Use the scale_replicas() method to change the number of replicas:
# Scale up to 10 replicas
deployment.scale_replicas(replica_count=10)
deployment.wait_for_replicas(deployed=True, timeout=240)
print(f"Deployment scaled to {deployment.instance.spec.replicas} replicas")
Scale Down
# Scale down to 2 replicas
deployment.scale_replicas(replica_count=2)
deployment.wait_for_replicas(deployed=True, timeout=240)
# Scale to zero (stop all pods)
deployment.scale_replicas(replica_count=0)
deployment.wait_for_replicas(deployed=False, timeout=240)
Use wait_for_replicas() after scaling to ensure all replicas are updated and available before proceeding.
Waiting for Deployment Readiness
The wait_for_replicas() method waits until all replicas are updated and available:
# Wait for deployment to be fully deployed
deployment.wait_for_replicas(deployed=True, timeout=240)
# Wait for all replicas to be removed
deployment.wait_for_replicas(deployed=False, timeout=240)
Understanding Replica States
Monitor different replica counts:
status = deployment.instance.status
print(f"Desired replicas: {deployment.instance.spec.replicas}")
print(f"Total replicas: {status.replicas or 0}")
print(f"Updated replicas: {status.updatedReplicas or 0}")
print(f"Ready replicas: {status.readyReplicas or 0}")
print(f"Available replicas: {status.availableReplicas or 0}")
Deployment Strategies
Rolling Update Strategy
deployment = Deployment(
client=client,
name="rolling-deployment",
namespace="default",
replicas=5,
selector={"matchLabels": {"app": "myapp"}},
template={
"metadata": {"labels": {"app": "myapp"}},
"spec": {
"containers": [{"name": "app", "image": "myapp:v1.0"}]
}
},
strategy={
"type": "RollingUpdate",
"rollingUpdate": {
"maxSurge": 1,
"maxUnavailable": 0
}
}
)
deployment.deploy()
Recreate Strategy
deployment = Deployment(
client=client,
name="recreate-deployment",
namespace="default",
replicas=3,
selector={"matchLabels": {"app": "myapp"}},
template={
"metadata": {"labels": {"app": "myapp"}},
"spec": {
"containers": [{"name": "app", "image": "myapp:v1.0"}]
}
},
strategy={"type": "Recreate"}
)
deployment.deploy()
Updating Deployments
Update Container Image
# Get the current deployment
deployment = Deployment(client=client, name="myapp", namespace="production")
# Update the deployment spec
deployment.res["spec"]["template"]["spec"]["containers"][0]["image"] = "myapp:v2.0"
deployment.update(resource_dict=deployment.res)
# Wait for rollout to complete
deployment.wait_for_replicas(deployed=True, timeout=300)
Update Environment Variables
deployment.res["spec"]["template"]["spec"]["containers"][0]["env"] = [
{"name": "LOG_LEVEL", "value": "DEBUG"},
{"name": "DATABASE_URL", "value": "postgresql://db:5432/myapp"}
]
deployment.update(resource_dict=deployment.res)
deployment.wait_for_replicas(deployed=True, timeout=240)
Querying Deployments
List All Deployments
# Get all deployments in a namespace
for deployment in Deployment.get(client=client, namespace="production"):
print(f"Deployment: {deployment.name}")
print(f" Replicas: {deployment.instance.spec.replicas}")
print(f" Available: {deployment.instance.status.availableReplicas or 0}")
# Get deployments with label selector
for deployment in Deployment.get(
client=client,
namespace="production",
label_selector="tier=frontend"
):
print(f"Frontend deployment: {deployment.name}")
Advanced Configuration
Revision History Limit
Control the number of old ReplicaSets to retain:
deployment = Deployment(
client=client,
name="limited-history",
namespace="default",
replicas=3,
revision_history_limit=5, # Keep only 5 old ReplicaSets
selector={"matchLabels": {"app": "myapp"}},
template={
"metadata": {"labels": {"app": "myapp"}},
"spec": {
"containers": [{"name": "app", "image": "myapp:v1.0"}]
}
}
)
deployment.deploy()
Progress Deadline
Set a deadline for deployment progress:
deployment = Deployment(
client=client,
name="deadline-deployment",
namespace="default",
replicas=3,
progress_deadline_seconds=300, # 5 minutes
selector={"matchLabels": {"app": "myapp"}},
template={
"metadata": {"labels": {"app": "myapp"}},
"spec": {
"containers": [{"name": "app", "image": "myapp:v1.0"}]
}
}
)
deployment.deploy()
Min Ready Seconds
deployment = Deployment(
client=client,
name="stable-deployment",
namespace="default",
replicas=3,
min_ready_seconds=30, # Wait 30 seconds before considering pod available
selector={"matchLabels": {"app": "myapp"}},
template={
"metadata": {"labels": {"app": "myapp"}},
"spec": {
"containers": [{"name": "app", "image": "myapp:v1.0"}]
}
}
)
deployment.deploy()
Pausing and Resuming Deployments
# Pause a deployment (stops rollouts)
deployment = Deployment(
client=client,
name="myapp",
namespace="default",
paused=True,
replicas=3,
selector={"matchLabels": {"app": "myapp"}},
template={
"metadata": {"labels": {"app": "myapp"}},
"spec": {
"containers": [{"name": "app", "image": "myapp:v1.0"}]
}
}
)
deployment.deploy()
# Resume by updating paused to False
deployment.res["spec"]["paused"] = False
deployment.update(resource_dict=deployment.res)
Troubleshooting
Deployment Not Progressing
-
Check deployment status conditions:
for condition in deployment.instance.status.conditions or []:
print(f"Type: {condition.type}")
print(f"Status: {condition.status}")
print(f"Reason: {condition.reason}")
print(f"Message: {condition.message}")
-
Verify replica counts match:
status = deployment.instance.status
spec_replicas = deployment.instance.spec.replicas
if status.availableReplicas != spec_replicas:
print(f"Expected {spec_replicas}, but only {status.availableReplicas} available")
-
Check pod status:
from ocp_resources.pod import Pod
label_selector = ",".join([f"{k}={v}" for k, v in
deployment.instance.spec.selector.matchLabels.items()])
for pod in Pod.get(client=client, namespace=deployment.namespace,
label_selector=label_selector):
print(f"Pod: {pod.name}, Phase: {pod.instance.status.phase}")
Replicas Not Scaling
If wait_for_replicas() times out, check for resource constraints or pod scheduling issues.
try:
deployment.scale_replicas(replica_count=10)
deployment.wait_for_replicas(deployed=True, timeout=240)
except TimeoutExpiredError:
print("Scaling timed out, checking status...")
status = deployment.instance.status
print(f"Updated: {status.updatedReplicas}, Available: {status.availableReplicas}")
# Check events for issues
from ocp_resources.event import Event
for event in Event.get(client=client, namespace=deployment.namespace):
if event.instance.involvedObject.name == deployment.name:
print(f"Event: {event.instance.message}")
Best Practices
- Use Rolling Updates: Prefer rolling updates for zero-downtime deployments
- Set Resource Limits: Always define CPU and memory limits
- Configure Health Checks: Add readiness and liveness probes
- Use Labels: Apply consistent labels for querying and management
- Wait for Readiness: Always use
wait_for_replicas() after changes
- Monitor Status: Check deployment conditions and replica counts
- Set Progress Deadlines: Define reasonable progress deadlines