Wed 30 Mar 2016

PySpark Carpentry: How to Launch a PySpark Job with Yarn-cluster

Using PySpark to process large amounts of data in a distributed fashion is a great way to gain business insights. However, the machine from which tasks are launched can quickly become overwhelmed. This article will show you how to run pyspark jobs so that the Spark driver runs on the cluster, rather than on the submission node.

Introduction

A spark job is composed of two types of processes: the executors and the driver. The driver manages the workflow, by maintaining metadata about the RDDs and assigning work to each of the executors. When launching a job, the default behavior is for the driver to run on the gateway machine. This is a problem because in addition to the resources (for example memory or system threads) necessary to manage the executors, the driver can also locally collect and process data. As a result, this machine can quickly become the choke point for job submissions.

One solution for this problem is to launch the jobs in such a way that the driver does not get executed on the gateway node but rather on one of the machines in the cluster. There are several gotchas when doing this:

  1. Strange approach to setting the SPARK_HOME environment variable
  2. Dependencies have to be shipped with the job
  3. It is not possible to use files from the gateway node directly
  4. It is not possible to write the results to the gateway node directly
  5. Monitoring jobs can be more tricky
  6. If the node your driver process is on goes away, your job hangs

Set up

To make this whole article repeatable, here is a very simple spark script called “go_to_sleep.py”. As you can guess from the name, this script has a simple mapper that just sleeps for a defined number of seconds. Prior to completing, the script outputs a success message to a separate file.

import time
from pyspark import SparkContext, SparkConf
import logging
import json


def sleep_mapper(seconds):
    time.sleep(seconds)


if __name__ == "__main__":
    logging.getLogger("py4j").setLevel(logging.ERROR)

    with open("go_to_sleep.in") as f_in:
        data = json.load(f_in)

    conf = SparkConf().setAppName("sleep_mapper")
    sc = SparkContext(conf=conf)
    sc.\
        parallelize(
            [data.get("duration") for number in xrange(10000)]
        ).\
        map(sleep_mapper)

    with open("go_to_sleep.out", "w") as f:
        f.write("Congratulations!")
        f.write("\n")

To run this example, you will need to create the companion go_to_sleep.in and put this one line it it.

{"duration": 10}

Here is how you would run this script with the driver residing on the gateway machine.

/usr/bin/spark-submit --master yarn-client --queue default \
    --num-executors 20 --executor-memory 1G --executor-cores 2 \
    --driver-memory 1G \
    go_to_sleep.py

Getting the logs for your spark application

When a spark application fails, the logs that are dumped to the gateway machine are not always very informative.

Here is a sample output on the gateway machine:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/03/30 11:46:34 INFO yarn.Client: Requesting a new application from cluster with 40 NodeManagers
16/03/30 11:46:34 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (61440 MB per container)
16/03/30 11:46:34 INFO yarn.Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead
16/03/30 11:46:34 INFO yarn.Client: Setting up container launch context for our AM
16/03/30 11:46:34 INFO yarn.Client: Preparing resources for our AM container
16/03/30 11:46:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/03/30 11:46:35 INFO yarn.Client: Uploading resource file:/usr/lib/spark/lib/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar -> hdfs://magnetic-hadoop-dev/user/thomas/.sparkStaging/applicati
on_1458811581197_13197/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar
16/03/30 11:46:36 INFO yarn.Client: Uploading resource file:/srv/thomas/go_to_sleep.in -> hdfs://magnetic-hadoop-dev/user/thomas/.sparkStagin
g/application_1458811581197_13197/go_to_sleep.in
16/03/30 11:46:36 INFO yarn.Client: Uploading resource file:/srv/thomas/go_to_sleep.py -> hdfs://magnetic-hadoop-dev/user/thomas/.sparkStagin
g/application_1458811581197_13197/go_to_sleep.py
16/03/30 11:46:36 INFO yarn.Client: Setting up the launch environment for our AM container
16/03/30 11:46:36 INFO spark.SecurityManager: Changing view acls to: thomas
16/03/30 11:46:36 INFO spark.SecurityManager: Changing modify acls to: thomas
16/03/30 11:46:36 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(thomas); users with modify permissions: Set(thomas)
16/03/30 11:46:36 INFO yarn.Client: Submitting application 13197 to ResourceManager
16/03/30 11:46:36 INFO impl.YarnClientImpl: Submitted application application_1458811581197_13197
16/03/30 11:46:37 INFO yarn.Client: Application report for application_1458811581197_13197 (state: ACCEPTED)
16/03/30 11:46:37 INFO yarn.Client:
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: root.thomas
         start time: 1459352796264
         final status: UNDEFINED
         tracking URL: http://ds-hnn001.dev.aws.mgnt.cc:8088/proxy/application_1458811581197_13197/
         user: thomas
16/03/30 11:46:38 INFO yarn.Client: Application report for application_1458811581197_13197 (state: ACCEPTED)
16/03/30 11:47:01 INFO yarn.Client: Application report for application_1458811581197_13197 (state: FAILED)
16/03/30 11:47:01 INFO yarn.Client:
         client token: N/A
         diagnostics: Application application_1458811581197_13197 failed 2 times due to AM Container for appattempt_1458811581197_13197_000002 exited with  exitCode: 1
For more detailed output, check application tracking page:http://ds-hnn001.dev.aws.mgnt.cc:8088/proxy/application_1458811581197_13197/Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e103_1458811581197_13197_02_000001
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
        at org.apache.hadoop.util.Shell.run(Shell.java:455)
        at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
        at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: root.thomas
         start time: 1459352796264
         final status: FAILED
         tracking URL: http://ds-hnn001.dev.aws.mgnt.cc:8088/cluster/app/application_1458811581197_13197
         user: thomas
Exception in thread "main" org.apache.spark.SparkException: Application finished with failed status
        at org.apache.spark.deploy.yarn.Client.run(Client.scala:622)
        at org.apache.spark.deploy.yarn.Client$.main(Client.scala:647)
        at org.apache.spark.deploy.yarn.Client.main(Client.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Luckily this output does give us two very valuable pieces of information:

Gotcha 1: Strange approach to setting the SPARK_HOME environment variable

This environment variable needs to be set in the appMaster environment and in the executor environment in order for the job to run. However, in both cases, this variable is not used so it can be set to /dev/null. (This is a known issue in some Spark versions.)

Here is how the command used to launch the go_to_sleep job with the driver residing in the cluster.

/usr/bin/spark-submit --master yarn-cluster --queue default \
    --num-executors 20 --executor-memory 1G --executor-cores 2 \
    --driver-memory 1G \
    --conf spark.yarn.appMasterEnv.SPARK_HOME=/dev/null \
    --conf spark.executorEnv.SPARK_HOME=/dev/null \
    --files go_to_sleep.in \
    go_to_sleep.py

This is what the log message would look like if the SPARK_HOME environment variable were not set properly (see the getting your logs section for more information):

Container: container_e103_1458811581197_13197_02_000001 on ds-hdp001.dev.aws.mgnt.cc_8041
===========================================================================================
LogType:stderr
Log Upload Time:Wed Mar 30 11:47:02 -0400 2016
LogLength:1734
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/03/30 11:46:51 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT]
16/03/30 11:46:54 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1458811581197_13197_000002
16/03/30 11:46:57 INFO spark.SecurityManager: Changing view acls to: yarn,thomas
16/03/30 11:46:57 INFO spark.SecurityManager: Changing modify acls to: yarn,thomas
16/03/30 11:46:57 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, thomas); users with modify permissions: Set(yarn, thomas)
16/03/30 11:46:57 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread
16/03/30 11:46:57 INFO yarn.ApplicationMaster: Waiting for spark context initialization
16/03/30 11:46:57 INFO yarn.ApplicationMaster: Waiting for spark context initialization ...
16/03/30 11:46:59 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0, (reason: Shutdown hook called before final status was reported.)
16/03/30 11:46:59 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED (diag message: Shutdown hook called before final status was reported.)
16/03/30 11:46:59 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1458811581197_13197

LogType:stdout
Log Upload Time:Wed Mar 30 11:47:02 -0400 2016
LogLength:745
Log Contents:
Traceback (most recent call last):
  File "go_to_sleep.py", line 21, in <module>
    conf = SparkConf().setAppName('sleep_mapper')
  File "/mnt/ephemeral10/yarn/nm/usercache/thomas/filecache/90/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/conf.py", line 97, in __init__
  File "/mnt/ephemeral10/yarn/nm/usercache/thomas/filecache/90/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/context.py", line 222, in _ensure_initialized
  File "/mnt/ephemeral10/yarn/nm/usercache/thomas/filecache/90/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/java_gateway.py", line 32, in launch_gateway
  File "/usr/lib64/python2.6/UserDict.py", line 22, in __getitem__
    raise KeyError(key)
KeyError: 'SPARK_HOME'

Gotcha 2: Dependencies have to be shipped with the job

When the driver process runs on the gateway node, it is able to make use of all of the scripts and packages that you have available (and added to PYTHONPATH). This is not the case when the driver is shipped to the executors.

Note : If the third party packages are available as *.zip or *.egg files, you could also follow the process described above for scripts.

Here is what the error log would look like if your dependencies are not handled properly (see the getting your logs section for more information).

Container: container_e103_1458811581197_11796_01_000001 on ds-hdp001.dev.aws.mgnt.cc_8041
===========================================================================================
LogType:stderr
Log Upload Time:Tue Mar 29 21:57:47 -0400 2016
LogLength:1734
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/03/29 21:57:35 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT]
16/03/29 21:57:36 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1458811581197_11796_000001
16/03/29 21:57:37 INFO spark.SecurityManager: Changing view acls to: yarn,thomas
16/03/29 21:57:37 INFO spark.SecurityManager: Changing modify acls to: yarn,thomas
16/03/29 21:57:37 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, thomas); users with modify permissions: Set(yarn, thomas)
16/03/29 21:57:37 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread
16/03/29 21:57:37 INFO yarn.ApplicationMaster: Waiting for spark context initialization
16/03/29 21:57:37 INFO yarn.ApplicationMaster: Waiting for spark context initialization ...
16/03/29 21:57:37 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0, (reason: Shutdown hook called before final status was reported.)
16/03/29 21:57:37 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED (diag message: Shutdown hook called before final status was reported.)
16/03/29 21:57:37 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1458811581197_11796

LogType:stdout
Log Upload Time:Tue Mar 29 21:57:47 -0400 2016
LogLength:152
Log Contents:
Traceback (most recent call last):
  File "go_to_sleep.py", line 5, in <module>
    import missing_package
ImportError: No module named missing_package

It is clear who the culprit is. The missing_package package is not available to the executors.

Gotcha 3: It is not possible to use files from the gateway node directly

When the driver process runs on the gateway node all local files are available to the driver since it runs as an ordinary local process on this machine. This is not the case when the driver runs on the executors. Fortunately, the logs are able to shed more light on the problem:

Container: container_e103_1458811581197_12732_01_000001 on ds-hdp001.dev.aws.mgnt.cc_8041
===========================================================================================
LogType:stderr
Log Upload Time:Wed Mar 30 09:06:06 -0400 2016
LogLength:1734
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/03/30 09:05:52 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT]
16/03/30 09:05:54 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1458811581197_12732_000001
16/03/30 09:05:57 INFO spark.SecurityManager: Changing view acls to: yarn,thomas
16/03/30 09:05:57 INFO spark.SecurityManager: Changing modify acls to: yarn,thomas
16/03/30 09:05:57 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, thomas); users with modify permissions: Set(yarn, thomas)
16/03/30 09:05:57 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread
16/03/30 09:05:57 INFO yarn.ApplicationMaster: Waiting for spark context initialization
16/03/30 09:05:57 INFO yarn.ApplicationMaster: Waiting for spark context initialization ...
16/03/30 09:05:59 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0, (reason: Shutdown hook called before final status was reported.)
16/03/30 09:05:59 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED (diag message: Shutdown hook called before final status was reported.)
16/03/30 09:05:59 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1458811581197_12732

LogType:stdout
Log Upload Time:Wed Mar 30 09:06:06 -0400 2016
LogLength:190
Log Contents:
Traceback (most recent call last):
  File "go_to_sleep.py", line 18, in <module>
    with open('go_to_sleep.in', 'r') as f_in:
IOError: [Errno 2] No such file or directory: 'go_to_sleep.in'



Container: container_e103_1458811581197_12732_02_000001 on ds-hdp001.dev.aws.mgnt.cc_8041
===========================================================================================
LogType:stderr
Log Upload Time:Wed Mar 30 09:06:06 -0400 2016
LogLength:1734
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/03/30 09:06:01 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT]
16/03/30 09:06:02 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1458811581197_12732_000002
16/03/30 09:06:03 INFO spark.SecurityManager: Changing view acls to: yarn,thomas
16/03/30 09:06:03 INFO spark.SecurityManager: Changing modify acls to: yarn,thomas
16/03/30 09:06:03 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, thomas); users with modify permissions: Set(yarn, thomas)
16/03/30 09:06:03 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread
16/03/30 09:06:03 INFO yarn.ApplicationMaster: Waiting for spark context initialization
16/03/30 09:06:03 INFO yarn.ApplicationMaster: Waiting for spark context initialization ...
16/03/30 09:06:04 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0, (reason: Shutdown hook called before final status was reported.)
16/03/30 09:06:04 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED (diag message: Shutdown hook called before final status was reported.)
16/03/30 09:06:04 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1458811581197_12732

LogType:stdout
Log Upload Time:Wed Mar 30 09:06:06 -0400 2016
LogLength:190
Log Contents:
Traceback (most recent call last):
  File "go_to_sleep.py", line 18, in <module>
    with open('go_to_sleep.in', 'r') as f_in:
IOError: [Errno 2] No such file or directory: 'go_to_sleep.in'

There are two workarounds for this problem:

/usr/bin/spark-submit --master yarn-cluster --queue default \
    --num-executors 20 --executor-memory 1G --executor-cores 2 \
    --driver-memory 1G \
    --conf spark.yarn.appMasterEnv.SPARK_HOME=/dev/null \
    --conf spark.executorEnv.SPARK_HOME=/dev/null \
    --files go_to_sleep.in \
    go_to_sleep.py

Here is what the go_to_sleep script would look like if we downloaded the files explicitly.

import time
from pyspark import SparkContext, SparkConf
import logging
import json
import subprocess

def sleep_mapper(seconds):
    time.sleep(seconds)


if __name__ == "__main__":
    logging.getLogger("py4j").setLevel(logging.ERROR)

    subprocess.call(
        ["hadoop", "fs", "-get", "/user/thomas/go_to_sleep.in"]
    )

    with open("go_to_sleep.in", 'r') as f_in:
        data = json.load(f_in)

    conf = SparkConf().setAppName("sleep_mapper")
    sc = SparkContext(conf=conf)
    sc.\
        parallelize(
            [data.get("duration") for number in xrange(10000)]
        ).\
        map(sleep_mapper)

    with open("go_to_sleep.out", "w") as f:
        f.write("Congratulations!")
        f.write("\n")

Gotcha 4: It is not possible to write the results to the gateway node directly

There are two ways to work around this:

Here is what using saveAsTextFile to persist the results look like in the case of our demo script:

import time
from pyspark import SparkContext, SparkConf
import logging
import json
import subprocess

def sleep_mapper(seconds):
    time.sleep(seconds)


if __name__ == "__main__":
    logging.getLogger("py4j").setLevel(logging.ERROR)

    subprocess.call(
        ["hadoop", "fs", "-get", "/user/thomas/go_to_sleep.in"]
    )

    with open("go_to_sleep.in", 'r') as f_in:
        data = json.load(f_in)

    conf = SparkConf().setAppName("sleep_mapper")
    sc = SparkContext(conf=conf)
    sc.\
        parallelize(
            [data.get("duration") for number in xrange(10000)]
        ).\
        map(sleep_mapper)

    sc.\
        parallelize(["Congratulations!"]).\
        saveAsTextFile("/user/thomas/go_to_sleep.out")

Note that when using saveAsTextFile, Spark will create a directory with the name you give, and populate the contents as files named part-XXXXX. If the directory already exists, pyspark will crash when it tries to perform the saveAsTextFile operation. Be sure the output is not present prior to launching the job (and delete it if it is).

Gotcha 5: Monitoring jobs can be more tricky

When a task is run in cluster mode, we also loose the benefits of having the driver run on the same node as the application submitted the job. Some of these benefits include logging and the ability to easily stop an application.

Here is what this command would look like for the example in Getting the logs for your spark application:

yarn application -kill application_1458811581197_13197

Gotcha 6: If the node your driver process is on goes away, your job hangs

Because the driver resides in the cluster it is possible for the node it is on to disappear. This is turn causes your application to hang. Since there is nothing that you can do about this, you need to live with it by:

Tags: spark

We are Hiring!

We have in Engineering, including:

Apply online and see all positions at our job board.