@task(task_config=Spark(...)) tasks to the spark-on-k8s-operator, which manages the lifecycle of SparkApplication Kubernetes resources.
Prerequisites
- A running Kubernetes cluster with Flyte installed
helmandkubectlconfigured
Step 1: Install the Spark operator
helm repo add spark-operator https://kubeflow.github.io/spark-operator
helm repo update
helm install spark-operator spark-operator/spark-operator \
--namespace spark-operator \
--create-namespace
kubectl get pods -n spark-operator
Step 2: Enable the Spark plugin in Flyte
Create avalues-spark.yaml override file:
- flyte-binary (AWS)
- flyte-binary (GCP)
configuration:
inline:
tasks:
task-plugins:
enabled-plugins:
- container
- sidecar
- k8s-array
- spark
default-for-task-types:
- container: container
- container_array: k8s-array
- spark: spark
cluster_resources:
customData:
- production:
- defaultIamRole:
value: <FLYTE_IAM_USER_ARN>
- staging:
- defaultIamRole:
value: <FLYTE_IAM_USER_ARN>
- development:
- defaultIamRole:
value: <FLYTE_IAM_USER_ARN>
plugins:
spark:
spark-config-default:
- spark.driver.cores: "1"
- spark.hadoop.fs.s3a.aws.credentials.provider: "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
- spark.kubernetes.allocation.batch.size: "50"
- spark.hadoop.fs.s3a.acl.default: "BucketOwnerFullControl"
- spark.hadoop.fs.s3n.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
- spark.hadoop.fs.AbstractFileSystem.s3n.impl: "org.apache.hadoop.fs.s3a.S3A"
- spark.hadoop.fs.s3.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
- spark.hadoop.fs.AbstractFileSystem.s3.impl: "org.apache.hadoop.fs.s3a.S3A"
- spark.hadoop.fs.s3a.impl: "org.apache.hadoop.fs.s3a.S3AFileSystem"
- spark.hadoop.fs.AbstractFileSystem.s3a.impl: "org.apache.hadoop.fs.s3a.S3A"
- spark.network.timeout: 600s
- spark.executorEnv.KUBERNETES_REQUEST_TIMEOUT: 100000
- spark.executor.heartbeatInterval: 60s
clusterResourceTemplates:
inline:
001_namespace.yaml: |
apiVersion: v1
kind: Namespace
metadata:
name: {{ namespace }}
002_serviceaccount.yaml: |
apiVersion: v1
kind: ServiceAccount
metadata:
name: default
namespace: '{{ namespace }}'
annotations:
eks.amazonaws.com/role-arn: '{{ defaultIamRole }}'
003_spark_role.yaml: |
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: spark-role
namespace: "{{ namespace }}"
rules:
- apiGroups: ["*"]
resources: [pods, services, configmaps, persistentvolumeclaims]
verbs: ["*"]
004_spark_serviceaccount.yaml: |
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark
namespace: "{{ namespace }}"
annotations:
eks.amazonaws.com/role-arn: '{{ defaultIamRole }}'
005_spark_rolebinding.yaml: |
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-role-binding
namespace: "{{ namespace }}"
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: spark-role
subjects:
- kind: ServiceAccount
name: spark
namespace: "{{ namespace }}"
configuration:
inline:
tasks:
task-plugins:
enabled-plugins:
- container
- sidecar
- k8s-array
- spark
default-for-task-types:
- container: container
- container_array: k8s-array
- spark: spark
cluster_resources:
customData:
- production:
- gsa:
value: <GoogleServiceAccount-EMAIL>
- staging:
- gsa:
value: <GoogleServiceAccount-EMAIL>
- development:
- gsa:
value: <GoogleServiceAccount-EMAIL>
plugins:
spark:
spark-config-default:
- spark.eventLog.enabled: "true"
- spark.driver.cores: "1"
- spark.executorEnv.HTTP2_DISABLE: "true"
- spark.hadoop.fs.AbstractFileSystem.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS
- spark.kubernetes.allocation.batch.size: "50"
- spark.network.timeout: 600s
- spark.executor.heartbeatInterval: 60s
helm upgrade flyte-backend flyteorg/flyte-binary \
--namespace flyte \
--values values.yaml \
--values values-spark.yaml
Step 3: Write a Spark task
Install the flytekit Spark plugin:pip install flytekitplugins-spark
import flytekit
from flytekit import task, workflow
from flytekitplugins.spark import Spark
@task(
task_config=Spark(
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.memory": "1000M",
"spark.executor.cores": "1",
"spark.executor.instances": "2",
"spark.driver.cores": "1",
}
),
cache=True,
cache_version="1",
)
def hello_spark(partitions: int) -> float:
import math
from operator import add
sess = flytekit.current_context().spark_session
n = 100000 * partitions
xs = sess.sparkContext.parallelize(range(1, n + 1), partitions)
pi_approx = (
4.0 * xs.map(lambda _: 1 if (__import__('random').random() ** 2 + __import__('random').random() ** 2 < 1) else 0).reduce(add) / n
)
return pi_approx
@workflow
def my_spark_wf(partitions: int = 50) -> float:
return hello_spark(partitions=partitions)
Spark driver service account
The Spark driver pod must run with a service account that has permissions to create executor pods. TheclusterResourceTemplates above create a spark service account in each project namespace with a spark-role Role and RoleBinding.
Run Spark tasks with this service account:
pyflyte run --remote \
--service-account spark \
my_workflow.py my_spark_wf
Resource quotas (optional)
To enforce per-project resource quotas for Spark:clusterResourceTemplates:
inline:
006_resourcequota.yaml: |
apiVersion: v1
kind: ResourceQuota
metadata:
name: project-quota
namespace: {{ namespace }}
spec:
hard:
limits.cpu: {{ projectQuotaCpu }}
limits.memory: {{ projectQuotaMemory }}
configuration:
inline:
cluster_resources:
customData:
- production:
- projectQuotaCpu:
value: "5"
- projectQuotaMemory:
value: "4000Mi"
- development:
- projectQuotaCpu:
value: "2"
- projectQuotaMemory:
value: "2000Mi"
Verify
# Check Spark operator is healthy
kubectl get pods -n spark-operator
# After running a Spark task, check SparkApplication resource
kubectl get sparkapplication -n flytesnacks-development