Spark on Docker on Yarn on Azure HDInsight

Feb 18, 2023

devtip , docker , yarn , spark , azure , hdinsight

This post builds on Docker on Yarn on Azure HDInsight and covers tricks to run Spark workloads on Docker on Yarn on Azure HDInsight

We will first run pi example on Spark followed by a sample numpy script inside docker containers.

Pre-requisite

PS: Keeping it to one worker node gives more control on where the containers get created and that means it's easier to debug should things go wrong.

All the steps in this post, can be completed from the workernode itself. First ssh to one of the headnodes and from there ssh to the workernode to follow the steps below

Prepare the cluster for running Docker workload

Install Docker on the worker node.

Run following script either using custom script action or by sshing to the node directly as $ sudo bash -x install_docker.sh

#!/bin/bash
# install_docker.sh
set -e

sudo apt-get -y remove docker docker-engine docker.io containerd runc
sudo apt-get -y update
sudo apt-get -y install \
    ca-certificates \
    curl \
    gnupg \
    lsb-release

sudo rm -rf /etc/docker /var/lib/docker /run/docker.sock /etc/systemd/system/docker.service.d

curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg

echo \
  "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu \
  $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null

sudo apt-get update
sudo apt-get -y autoremove
sudo apt-get -y autoclean
sudo apt-get -y install docker-ce docker-ce-cli containerd.io
sudo usermod -aG docker sshuser
sudo service docker restart
sudo chmod 666 /var/run/docker.sock

# Verify that we have a working docker setup now.
docker run hello-world

Configure Yarn for running Docker workloads

Run following python script as $ sudo python configure_docker_on_yarn.py

Copying the following formatted script ends up giving some whitespaces in container-executor.cfg. Before running the python script below, please make sure to remove whitespaces from those empty lines before section headers [docker], [gpu] and [cgroups] in the script; otherwise container-executor will complain about invalid configuration

# /usr/bin/python
# configure_docker_on_yarn.py
import sys
import subprocess
from datetime import datetime
from hdinsight_common.AmbariHelper import AmbariHelper

ambari_helper = AmbariHelper()
current_ts=datetime.today().strftime('%Y_%m_%d_%H_%M_%S')
hdp_version=subprocess.check_output(["/usr/bin/hdp-select", "status", "spark3-client"]).split("-")[2].strip()

new_yarn_config = {
    "yarn.nodemanager.container-executor.class": "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor",
    "yarn.nodemanager.linux-container-executor.group": "hadoop",
    "yarn.nodemanager.linux-container-executor.nonsecure-mode.limit-users": "false",
    "yarn.nodemanager.runtime.linux.allowed-runtimes": "default,docker",
    "yarn.nodemanager.runtime.linux.docker.allowed-container-networks": "host,none,bridge",
    "yarn.nodemanager.runtime.linux.docker.default-container-network": "host",
    "yarn.nodemanager.runtime.linux.docker.host-pid-namespace.allowed": "false",
    "yarn.nodemanager.runtime.linux.docker.privileged-containers.allowed": "false",
    "yarn.nodemanager.runtime.linux.docker.privileged-containers.acl": "",
    "yarn.nodemanager.runtime.linux.docker.capabilities": "CHOWN,DAC_OVERRIDE,FSETID,FOWNER,MKNOD,NET_RAW,SETGID,SETUID,SETFCAP,SETPCAP,NET_BIND_SERVICE,SYS_CHROOT,KILL,AUDIT_WRITE",
    "yarn.nodemanager.runtime.linux.docker.delayed-removal.allowed": "true",
    "yarn.nodemanager.delete.debug-delay-sec": "900"
}

# Apply the configurations to yarn-site
ambari_helper.update_latest_service_config("yarn-site", "YARN", "update_yarn_site_for_docker_" + current_ts, new_yarn_config)

container_executor_cfg="""
yarn.nodemanager.local-dirs=/mnt/resource/hadoop/yarn/local
yarn.nodemanager.log-dirs=/mnt/resource/hadoop/yarn/log
yarn.nodemanager.linux-container-executor.group=hadoop
banned.users=hdfs,yarn,mapred,bin
min.user.id=1000

[docker]
  module.enabled=true
  docker.binary=/usr/bin/docker
  docker.allowed.capabilities=CHOWN,DAC_OVERRIDE,FSETID,FOWNER,MKNOD,NET_RAW,SETGID,SETUID,SETFCAP,SETPCAP,NET_BIND_SERVICE,SYS_CHROOT,KILL,AUDIT_WRITE
  docker.allowed.devices=
  docker.allowed.networks=host,none,bridge
  docker.allowed.ro-mounts=/etc/passwd,/etc/group,/etc/hadoop/conf,/mnt/resource/hadoop/yarn/local,/usr/lib/hdinsight-common,/usr/lib/hdinsight-spark,/usr/lib/hdinsight-logging,/usr/hdp/{}/spark3/jars
  docker.allowed.rw-mounts=/mnt/resource/hadoop/yarn/local,/mnt/resource/hadoop/yarn/log
  docker.privileged-containers.enabled=false
  docker.trusted.registries=local
  docker.allowed.volume-drivers=

[gpu]
  module.enabled=false

[cgroups]
  root=/sys/fs/cgroup
  yarn-hierarchy=yarn
""".format(hdp_version)
container_executor_properties = {
    "content": container_executor_cfg
}
# Update container-executor.cfg 
ambari_helper.update_latest_service_config("container-executor", "YARN", "update_container_executor_configs_" + current_ts, container_executor_properties)

# Restart all services so the new configurations take effect.
request_id=ambari_helper.restart_all_stale_services()
if request_id:
  ambari_helper.wait_for_request_completion(request_id, 600, 60)
else:
  print("Failed to restart all stale services")

Verify that container-executor.cfg is looking good

HDP_VERSION=$(hdp-select status hadoop-yarn-nodemanager | awk '{print $3}')
sudo -u yarn /usr/hdp/${HDP_VERSION}/hadoop-yarn/bin/container-executor --checksetup && echo "Successful!"

Clean up usercache directory on the workernode

This is required so when container-executor runs, it can setup the directory with appropriate permissions that would allow docker containers to mount local paths in ro and rw modes as necessary. This is configured in /etc/hadoop/conf/yarn-site.xml in yarn.nodemanager.local-dirs and defaults to /mnt/resource/hadoop/yarn/local. Example command if the default value is used:

NODEMANAGER_LOCAL_DIR=/mnt/resource/hadoop/yarn/local
sudo rm -rf ${NODEMANAGER_LOCAL_DIR}/usercache/spark

Set up a container image locally on the worker node

We will use the following as our Dockerfile

FROM adoptopenjdk/openjdk8

RUN apt-get update -qq > /dev/null \
    # Required for hadoop
    && apt install -y libsnappy-dev \
    # Useful tools for debugging/development
    && apt install -y wget vim telnet lsof

# Python 2 is needed for some utility scripts on HDInsight clusters
RUN apt install python python-dev -y \
    && curl https://bootstrap.pypa.io/pip/2.7/get-pip.py -o get-pip.py \
    && python get-pip.py \
    && python -V \
    && pip install virtualenv \
    && pip install --upgrade pip

RUN apt install python3 python3-dev python3-pip python3-virtualenv -y \
    && python3 -V \
    && pip3 install --upgrade pip

# Default to python3 since python2 has been completely removed from latest Ubuntu release and is not recommended for any new development
ENV PYSPARK_DRIVER_PYTHON python3
ENV PYSPARK_PYTHON python3

RUN pip3 install numpy pandas \
    && python3 -c "import numpy as np"

Now build a container image that we are going to call local/pyspark, in production this could be coming from other repositories that are specifically configured and allowed in container-executor.cfg (see above).

$ ls -l pyspark/
total 4
-rw-rw-r-- 1 sshuser sshuser 478 Feb 18 02:03 Dockerfile
$ docker build -t local/pyspark pyspark/

Verify that local image works

$ docker run -it local/pyspark bash -c "(java -version && python -V && python3 -V)"

Run pi spark job on the cluster

Run following:

PYSPARK_IMAGE=local/pyspark
SPARK_EXAMPLES_JAR=/usr/hdp/current/spark3-client/examples/jars/spark-examples.jar
HDP_VERSION=$(hdp-select status spark3-client | awk '{print $3}')
SPARK_JARS_PATH=/usr/hdp/${HDP_VERSION}/spark3/jars
SPARK_CUSTOM_JARS_PATH=/usr/lib/hdinsight-spark
HDI_LOGGING=/usr/lib/hdinsight-logging
HDI_COMMON=/usr/lib/hdinsight-common
HADOOP_CONF_DIR=/etc/hadoop/conf
CUSTOM_CLASSPATH="${SPARK_JARS_PATH}/*:${SPARK_CUSTOM_JARS_PATH}/*:${HDI_LOGGING}/*"
MOUNTS="/etc/passwd:/etc/passwd:ro,/etc/group:/etc/group:ro,${HADOOP_CONF_DIR}:${HADOOP_CONF_DIR}:ro,${HDI_COMMON}:${HDI_COMMON}:ro,${HDI_LOGGING}:${HDI_LOGGING}:ro,${SPARK_JARS_PATH}:${SPARK_JARS_PATH}:ro,${SPARK_CUSTOM_JARS_PATH}:${SPARK_CUSTOM_JARS_PATH}:ro"

spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn --deploy-mode cluster --num-executors 1 \
    --driver-memory 2g --executor-memory 1g --executor-cores 2 \
    --conf "spark.driver.extraJavaOptions=-Dhdp.version=${HDP_VERSION}" \
    --conf "spark.executor.extraJavaOptions=-Dhdp.version=${HDP_VERSION}" \
    --conf "spark.yarn.am.extraJavaOptions=-Dhdp.version=${HDP_VERSION}" \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=${PYSPARK_IMAGE} \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=${MOUNTS} \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL=true \
    --conf spark.executor.extraClassPath="${CUSTOM_CLASSPATH}" \
    --conf spark.driver.extraClassPath="${CUSTOM_CLASSPATH}" \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=${PYSPARK_IMAGE} \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=${MOUNTS} \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL=true \
    ${SPARK_EXAMPLES_JAR} 10

In the end we should see something like below:

...
...
23/02/19 03:36:44 INFO Client [main]: Application report for application_1676773095123_0019 (state: RUNNING)
23/02/19 03:36:45 INFO Client [main]: Application report for application_1676773095123_0019 (state: RUNNING)
23/02/19 03:36:46 INFO Client [main]: Application report for application_1676773095123_0019 (state: RUNNING)
23/02/19 03:36:47 INFO Client [main]: Application report for application_1676773095123_0019 (state: RUNNING)
23/02/19 03:36:48 INFO Client [main]: Application report for application_1676773095123_0019 (state: RUNNING)
23/02/19 03:36:49 INFO Client [main]: Application report for application_1676773095123_0019 (state: RUNNING)
23/02/19 03:36:50 INFO Client [main]: Application report for application_1676773095123_0019 (state: RUNNING)
23/02/19 03:36:51 INFO Client [main]: Application report for application_1676773095123_0019 (state: FINISHED)
...

The output is available in stdout for the application and can be seen on Yarn UI by going to https://[clustername].azurehdinsight.net/yarnui (example below):

Log Type: stdout
Log Upload Time: Sun Feb 19 03:36:52 +0000 2023
Log Length: 33

Pi is roughly 3.1445191445191445

Next we are going to run a simple numpy job. We already installed numpy on our local/pyspark container image, so we don't need anything additional here. First create the sample script:

# numpy_example.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("numpy_example").getOrCreate()
sc = spark.sparkContext

import numpy as np
from numpy import random

arr = np.arange(15).reshape(3, 5)
print("Original array: ", arr)
random.shuffle(arr)
print("Array after shuffling: ", arr)

Now run the following (output will again be available on stdout for the application on Yarn UI):

PYSPARK_IMAGE=local/pyspark
SPARK_EXAMPLES_JAR=/usr/hdp/current/spark3-client/examples/jars/spark-examples.jar
HDP_VERSION=$(hdp-select status spark3-client | awk '{print $3}')
SPARK_JARS_PATH=/usr/hdp/${HDP_VERSION}/spark3/jars
SPARK_CUSTOM_JARS_PATH=/usr/lib/hdinsight-spark
HDI_LOGGING=/usr/lib/hdinsight-logging
HDI_COMMON=/usr/lib/hdinsight-common
HADOOP_CONF_DIR=/etc/hadoop/conf
CUSTOM_CLASSPATH="${SPARK_JARS_PATH}/*:${SPARK_CUSTOM_JARS_PATH}/*:${HDI_LOGGING}/*"
MOUNTS="/etc/passwd:/etc/passwd:ro,/etc/group:/etc/group:ro,${HADOOP_CONF_DIR}:${HADOOP_CONF_DIR}:ro,${HDI_COMMON}:${HDI_COMMON}:ro,${HDI_LOGGING}:${HDI_LOGGING}:ro,${SPARK_JARS_PATH}:${SPARK_JARS_PATH}:ro,${SPARK_CUSTOM_JARS_PATH}:${SPARK_CUSTOM_JARS_PATH}:ro"

spark-submit --master yarn --deploy-mode cluster --num-executors 1 \
    --driver-memory 2g --executor-memory 1g --executor-cores 2 \
    --conf "spark.driver.extraJavaOptions=-Dhdp.version=${HDP_VERSION}" \
    --conf "spark.executor.extraJavaOptions=-Dhdp.version=${HDP_VERSION}" \
    --conf "spark.yarn.am.extraJavaOptions=-Dhdp.version=${HDP_VERSION}" \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=${PYSPARK_IMAGE} \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=${MOUNTS} \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL=true \
    --conf spark.executor.extraClassPath="${CUSTOM_CLASSPATH}" \
    --conf spark.driver.extraClassPath="${CUSTOM_CLASSPATH}" \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=${PYSPARK_IMAGE} \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=${MOUNTS} \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL=true \
    numpy_example.py

The output should look something like below in stdout on YarnUI:

Log Type: stdout
Log Upload Time: Sun Feb 19 04:37:08 +0000 2023
Log Length: 151

Original array:  [[ 0  1  2  3  4]
 [ 5  6  7  8  9]
 [10 11 12 13 14]]
Array after shuffling:  [[ 0  1  2  3  4]
 [10 11 12 13 14]
 [ 5  6  7  8  9]]

Similar command to launch spark-shell will be:

PYSPARK_IMAGE=local/pyspark
SPARK_EXAMPLES_JAR=/usr/hdp/current/spark3-client/examples/jars/spark-examples.jar
HDP_VERSION=$(hdp-select status spark3-client | awk '{print $3}')
SPARK_JARS_PATH=/usr/hdp/${HDP_VERSION}/spark3/jars
SPARK_CUSTOM_JARS_PATH=/usr/lib/hdinsight-spark
HDI_LOGGING=/usr/lib/hdinsight-logging
HDI_COMMON=/usr/lib/hdinsight-common
HADOOP_CONF_DIR=/etc/hadoop/conf
CUSTOM_CLASSPATH="${SPARK_JARS_PATH}/*:${SPARK_CUSTOM_JARS_PATH}/*:${HDI_LOGGING}/*"
MOUNTS="/etc/passwd:/etc/passwd:ro,/etc/group:/etc/group:ro,${HADOOP_CONF_DIR}:${HADOOP_CONF_DIR}:ro,${HDI_COMMON}:${HDI_COMMON}:ro,${HDI_LOGGING}:${HDI_LOGGING}:ro,${SPARK_JARS_PATH}:${SPARK_JARS_PATH}:ro,${SPARK_CUSTOM_JARS_PATH}:${SPARK_CUSTOM_JARS_PATH}:ro"

spark-shell --master yarn --num-executors 1 \
    --driver-memory 2g --executor-memory 1g --executor-cores 2 \
    --conf "spark.driver.extraJavaOptions=-Dhdp.version=${HDP_VERSION}" \
    --conf "spark.executor.extraJavaOptions=-Dhdp.version=${HDP_VERSION}" \
    --conf "spark.yarn.am.extraJavaOptions=-Dhdp.version=${HDP_VERSION}" \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=${PYSPARK_IMAGE} \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=${MOUNTS} \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL=true \
    --conf spark.executor.extraClassPath="${CUSTOM_CLASSPATH}" \
    --conf spark.driver.extraClassPath="${CUSTOM_CLASSPATH}" \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=${PYSPARK_IMAGE} \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=${MOUNTS} \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL=true

And for pyspark will be:

PYSPARK_IMAGE=local/pyspark
SPARK_EXAMPLES_JAR=/usr/hdp/current/spark3-client/examples/jars/spark-examples.jar
HDP_VERSION=$(hdp-select status spark3-client | awk '{print $3}')
SPARK_JARS_PATH=/usr/hdp/${HDP_VERSION}/spark3/jars
SPARK_CUSTOM_JARS_PATH=/usr/lib/hdinsight-spark
HDI_LOGGING=/usr/lib/hdinsight-logging
HDI_COMMON=/usr/lib/hdinsight-common
HADOOP_CONF_DIR=/etc/hadoop/conf
CUSTOM_CLASSPATH="${SPARK_JARS_PATH}/*:${SPARK_CUSTOM_JARS_PATH}/*:${HDI_LOGGING}/*"
MOUNTS="/etc/passwd:/etc/passwd:ro,/etc/group:/etc/group:ro,${HADOOP_CONF_DIR}:${HADOOP_CONF_DIR}:ro,${HDI_COMMON}:${HDI_COMMON}:ro,${HDI_LOGGING}:${HDI_LOGGING}:ro,${SPARK_JARS_PATH}:${SPARK_JARS_PATH}:ro,${SPARK_CUSTOM_JARS_PATH}:${SPARK_CUSTOM_JARS_PATH}:ro"

pyspark --master yarn --num-executors 1 \
    --driver-memory 2g --executor-memory 1g --executor-cores 2 \
    --conf "spark.driver.extraJavaOptions=-Dhdp.version=${HDP_VERSION}" \
    --conf "spark.executor.extraJavaOptions=-Dhdp.version=${HDP_VERSION}" \
    --conf "spark.yarn.am.extraJavaOptions=-Dhdp.version=${HDP_VERSION}" \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=${PYSPARK_IMAGE} \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=${MOUNTS} \
    --conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL=true \
    --conf spark.executor.extraClassPath="${CUSTOM_CLASSPATH}" \
    --conf spark.driver.extraClassPath="${CUSTOM_CLASSPATH}" \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=${PYSPARK_IMAGE} \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=${MOUNTS} \
    --conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_DELAYED_REMOVAL=true

In this post, we ran Spark pi job and a simple numpy script on docker containers on yarn. We also ran spark-shell and pyspark that would run driver and executors on docker containers. Running the shells can be very helpful in debugging container issues because the containers don't exit even on job failure and that means we can run $ docker exec -it [container_name] bash to attach a shell to the running container.

Links Awards & recognitions Resume Personal Technical Miscellaneous My Blog
 
Last Updated: Feb 18 2023