PySpark Carpentry: How to Launch a PySpark Job with Yarn-cluster
Using PySpark to process large amounts of data in a distributed fashion is a great way to gain business insights. However, the machine from which tasks are launched can quickly become overwhelmed. This article will show you how to run pyspark jobs so that the Spark driver runs on the cluster, rather than on the submission node.
Introduction
A spark job is composed of two types of processes: the executors and the driver. The driver manages the workflow, by maintaining metadata about the RDDs and assigning work to each of the executors. When launching a job, the default behavior is for the driver to run on the gateway machine. This is a problem because in addition to the resources (for example memory or system threads) necessary to manage the executors, the driver can also locally collect and process data. As a result, this machine can quickly become the choke point for job submissions.
One solution for this problem is to launch the jobs in such a way that the driver does not get executed on the gateway node but rather on one of the machines in the cluster. There are several gotchas when doing this:
- Strange approach to setting the SPARK_HOME environment variable
- Dependencies have to be shipped with the job
- It is not possible to use files from the gateway node directly
- It is not possible to write the results to the gateway node directly
- Monitoring jobs can be more tricky
- If the node your driver process is on goes away, your job hangs
Set up
To make this whole article repeatable, here is a very simple spark script called “go_to_sleep.py”. As you can guess from the name, this script has a simple mapper that just sleeps for a defined number of seconds. Prior to completing, the script outputs a success message to a separate file.
import time
from pyspark import SparkContext, SparkConf
import logging
import json
def sleep_mapper(seconds):
time.sleep(seconds)
if __name__ == "__main__":
logging.getLogger("py4j").setLevel(logging.ERROR)
with open("go_to_sleep.in") as f_in:
data = json.load(f_in)
conf = SparkConf().setAppName("sleep_mapper")
sc = SparkContext(conf=conf)
sc.\
parallelize(
[data.get("duration") for number in xrange(10000)]
).\
map(sleep_mapper)
with open("go_to_sleep.out", "w") as f:
f.write("Congratulations!")
f.write("\n")
To run this example, you will need to create the companion go_to_sleep.in and put this one line it it.
{"duration": 10}
Here is how you would run this script with the driver residing on the gateway machine.
/usr/bin/spark-submit --master yarn-client --queue default \
--num-executors 20 --executor-memory 1G --executor-cores 2 \
--driver-memory 1G \
go_to_sleep.py
Getting the logs for your spark application
When a spark application fails, the logs that are dumped to the gateway machine are not always very informative.
Here is a sample output on the gateway machine:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/03/30 11:46:34 INFO yarn.Client: Requesting a new application from cluster with 40 NodeManagers
16/03/30 11:46:34 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (61440 MB per container)
16/03/30 11:46:34 INFO yarn.Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead
16/03/30 11:46:34 INFO yarn.Client: Setting up container launch context for our AM
16/03/30 11:46:34 INFO yarn.Client: Preparing resources for our AM container
16/03/30 11:46:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/03/30 11:46:35 INFO yarn.Client: Uploading resource file:/usr/lib/spark/lib/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar -> hdfs://magnetic-hadoop-dev/user/thomas/.sparkStaging/applicati
on_1458811581197_13197/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar
16/03/30 11:46:36 INFO yarn.Client: Uploading resource file:/srv/thomas/go_to_sleep.in -> hdfs://magnetic-hadoop-dev/user/thomas/.sparkStagin
g/application_1458811581197_13197/go_to_sleep.in
16/03/30 11:46:36 INFO yarn.Client: Uploading resource file:/srv/thomas/go_to_sleep.py -> hdfs://magnetic-hadoop-dev/user/thomas/.sparkStagin
g/application_1458811581197_13197/go_to_sleep.py
16/03/30 11:46:36 INFO yarn.Client: Setting up the launch environment for our AM container
16/03/30 11:46:36 INFO spark.SecurityManager: Changing view acls to: thomas
16/03/30 11:46:36 INFO spark.SecurityManager: Changing modify acls to: thomas
16/03/30 11:46:36 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(thomas); users with modify permissions: Set(thomas)
16/03/30 11:46:36 INFO yarn.Client: Submitting application 13197 to ResourceManager
16/03/30 11:46:36 INFO impl.YarnClientImpl: Submitted application application_1458811581197_13197
16/03/30 11:46:37 INFO yarn.Client: Application report for application_1458811581197_13197 (state: ACCEPTED)
16/03/30 11:46:37 INFO yarn.Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: root.thomas
start time: 1459352796264
final status: UNDEFINED
tracking URL: http://ds-hnn001.dev.aws.mgnt.cc:8088/proxy/application_1458811581197_13197/
user: thomas
16/03/30 11:46:38 INFO yarn.Client: Application report for application_1458811581197_13197 (state: ACCEPTED)
16/03/30 11:47:01 INFO yarn.Client: Application report for application_1458811581197_13197 (state: FAILED)
16/03/30 11:47:01 INFO yarn.Client:
client token: N/A
diagnostics: Application application_1458811581197_13197 failed 2 times due to AM Container for appattempt_1458811581197_13197_000002 exited with exitCode: 1
For more detailed output, check application tracking page:http://ds-hnn001.dev.aws.mgnt.cc:8088/proxy/application_1458811581197_13197/Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e103_1458811581197_13197_02_000001
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: root.thomas
start time: 1459352796264
final status: FAILED
tracking URL: http://ds-hnn001.dev.aws.mgnt.cc:8088/cluster/app/application_1458811581197_13197
user: thomas
Exception in thread "main" org.apache.spark.SparkException: Application finished with failed status
at org.apache.spark.deploy.yarn.Client.run(Client.scala:622)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:647)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Luckily this output does give us two very valuable pieces of information:
-
The application Id: In this case, the application Id is application_1458811581197_13197. This id can be used to then obtain the application logs using the following command:
yarn logs -applicationId application_1458811581197_13197
The output of this command contains the entire application logs, including any error message that your job generated. We will be using these logs many times in this article in order to diagnose problems and fix them.
-
The application tracking url: In this case, the application tracking url is http://ds-hnn001.dev.aws.mgnt.cc:8088/proxy/application_1458811581197_13197. Navigating to the url and clicking on the “logs” links associated with each node our your cluster, would give you similar insights. One drawback of this approach is that the information you are looking for may be scattered across several nodes.
Gotcha 1: Strange approach to setting the SPARK_HOME environment variable
This environment variable needs to be set in the appMaster environment and in the executor environment in order for the job to run.
However, in both cases, this variable is not used so it can be set to /dev/null
.
(This is a known issue in some Spark versions.)
Here is how the command used to launch the go_to_sleep job with the driver residing in the cluster.
/usr/bin/spark-submit --master yarn-cluster --queue default \
--num-executors 20 --executor-memory 1G --executor-cores 2 \
--driver-memory 1G \
--conf spark.yarn.appMasterEnv.SPARK_HOME=/dev/null \
--conf spark.executorEnv.SPARK_HOME=/dev/null \
--files go_to_sleep.in \
go_to_sleep.py
This is what the log message would look like if the SPARK_HOME environment variable were not set properly (see the getting your logs section for more information):
Container: container_e103_1458811581197_13197_02_000001 on ds-hdp001.dev.aws.mgnt.cc_8041
===========================================================================================
LogType:stderr
Log Upload Time:Wed Mar 30 11:47:02 -0400 2016
LogLength:1734
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/03/30 11:46:51 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT]
16/03/30 11:46:54 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1458811581197_13197_000002
16/03/30 11:46:57 INFO spark.SecurityManager: Changing view acls to: yarn,thomas
16/03/30 11:46:57 INFO spark.SecurityManager: Changing modify acls to: yarn,thomas
16/03/30 11:46:57 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, thomas); users with modify permissions: Set(yarn, thomas)
16/03/30 11:46:57 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread
16/03/30 11:46:57 INFO yarn.ApplicationMaster: Waiting for spark context initialization
16/03/30 11:46:57 INFO yarn.ApplicationMaster: Waiting for spark context initialization ...
16/03/30 11:46:59 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0, (reason: Shutdown hook called before final status was reported.)
16/03/30 11:46:59 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED (diag message: Shutdown hook called before final status was reported.)
16/03/30 11:46:59 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1458811581197_13197
LogType:stdout
Log Upload Time:Wed Mar 30 11:47:02 -0400 2016
LogLength:745
Log Contents:
Traceback (most recent call last):
File "go_to_sleep.py", line 21, in <module>
conf = SparkConf().setAppName('sleep_mapper')
File "/mnt/ephemeral10/yarn/nm/usercache/thomas/filecache/90/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/conf.py", line 97, in __init__
File "/mnt/ephemeral10/yarn/nm/usercache/thomas/filecache/90/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/context.py", line 222, in _ensure_initialized
File "/mnt/ephemeral10/yarn/nm/usercache/thomas/filecache/90/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar/pyspark/java_gateway.py", line 32, in launch_gateway
File "/usr/lib64/python2.6/UserDict.py", line 22, in __getitem__
raise KeyError(key)
KeyError: 'SPARK_HOME'
Gotcha 2: Dependencies have to be shipped with the job
When the driver process runs on the gateway node, it is able to make use of all of the scripts and packages that you have available (and added to PYTHONPATH). This is not the case when the driver is shipped to the executors.
- Third party packages: If you require a third party package to be installed on the executors, do so when setting up the node (for example with pip).
- Scripts: If you require custom scripts, create a *.zip (or *.egg) file containing all of your dependencies and ship it to the executors using
--py-files
command line option. Two things to look out for:- Make sure that you also include ‘nested dependencies’
- Make sure that your *.py files are at the top level of the *.zip file.
Note : If the third party packages are available as *.zip or *.egg files, you could also follow the process described above for scripts.
Here is what the error log would look like if your dependencies are not handled properly (see the getting your logs section for more information).
Container: container_e103_1458811581197_11796_01_000001 on ds-hdp001.dev.aws.mgnt.cc_8041
===========================================================================================
LogType:stderr
Log Upload Time:Tue Mar 29 21:57:47 -0400 2016
LogLength:1734
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/03/29 21:57:35 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT]
16/03/29 21:57:36 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1458811581197_11796_000001
16/03/29 21:57:37 INFO spark.SecurityManager: Changing view acls to: yarn,thomas
16/03/29 21:57:37 INFO spark.SecurityManager: Changing modify acls to: yarn,thomas
16/03/29 21:57:37 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, thomas); users with modify permissions: Set(yarn, thomas)
16/03/29 21:57:37 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread
16/03/29 21:57:37 INFO yarn.ApplicationMaster: Waiting for spark context initialization
16/03/29 21:57:37 INFO yarn.ApplicationMaster: Waiting for spark context initialization ...
16/03/29 21:57:37 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0, (reason: Shutdown hook called before final status was reported.)
16/03/29 21:57:37 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED (diag message: Shutdown hook called before final status was reported.)
16/03/29 21:57:37 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1458811581197_11796
LogType:stdout
Log Upload Time:Tue Mar 29 21:57:47 -0400 2016
LogLength:152
Log Contents:
Traceback (most recent call last):
File "go_to_sleep.py", line 5, in <module>
import missing_package
ImportError: No module named missing_package
It is clear who the culprit is. The missing_package
package is not available to the executors.
Gotcha 3: It is not possible to use files from the gateway node directly
When the driver process runs on the gateway node all local files are available to the driver since it runs as an ordinary local process on this machine. This is not the case when the driver runs on the executors. Fortunately, the logs are able to shed more light on the problem:
Container: container_e103_1458811581197_12732_01_000001 on ds-hdp001.dev.aws.mgnt.cc_8041
===========================================================================================
LogType:stderr
Log Upload Time:Wed Mar 30 09:06:06 -0400 2016
LogLength:1734
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/03/30 09:05:52 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT]
16/03/30 09:05:54 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1458811581197_12732_000001
16/03/30 09:05:57 INFO spark.SecurityManager: Changing view acls to: yarn,thomas
16/03/30 09:05:57 INFO spark.SecurityManager: Changing modify acls to: yarn,thomas
16/03/30 09:05:57 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, thomas); users with modify permissions: Set(yarn, thomas)
16/03/30 09:05:57 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread
16/03/30 09:05:57 INFO yarn.ApplicationMaster: Waiting for spark context initialization
16/03/30 09:05:57 INFO yarn.ApplicationMaster: Waiting for spark context initialization ...
16/03/30 09:05:59 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0, (reason: Shutdown hook called before final status was reported.)
16/03/30 09:05:59 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED (diag message: Shutdown hook called before final status was reported.)
16/03/30 09:05:59 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1458811581197_12732
LogType:stdout
Log Upload Time:Wed Mar 30 09:06:06 -0400 2016
LogLength:190
Log Contents:
Traceback (most recent call last):
File "go_to_sleep.py", line 18, in <module>
with open('go_to_sleep.in', 'r') as f_in:
IOError: [Errno 2] No such file or directory: 'go_to_sleep.in'
Container: container_e103_1458811581197_12732_02_000001 on ds-hdp001.dev.aws.mgnt.cc_8041
===========================================================================================
LogType:stderr
Log Upload Time:Wed Mar 30 09:06:06 -0400 2016
LogLength:1734
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/flume-ng/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/03/30 09:06:01 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT]
16/03/30 09:06:02 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1458811581197_12732_000002
16/03/30 09:06:03 INFO spark.SecurityManager: Changing view acls to: yarn,thomas
16/03/30 09:06:03 INFO spark.SecurityManager: Changing modify acls to: yarn,thomas
16/03/30 09:06:03 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, thomas); users with modify permissions: Set(yarn, thomas)
16/03/30 09:06:03 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread
16/03/30 09:06:03 INFO yarn.ApplicationMaster: Waiting for spark context initialization
16/03/30 09:06:03 INFO yarn.ApplicationMaster: Waiting for spark context initialization ...
16/03/30 09:06:04 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0, (reason: Shutdown hook called before final status was reported.)
16/03/30 09:06:04 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with SUCCEEDED (diag message: Shutdown hook called before final status was reported.)
16/03/30 09:06:04 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1458811581197_12732
LogType:stdout
Log Upload Time:Wed Mar 30 09:06:06 -0400 2016
LogLength:190
Log Contents:
Traceback (most recent call last):
File "go_to_sleep.py", line 18, in <module>
with open('go_to_sleep.in', 'r') as f_in:
IOError: [Errno 2] No such file or directory: 'go_to_sleep.in'
There are two workarounds for this problem:
- Ship the files at launch time: This can be done by passing the
--files
command line option followed by a comma separated list of files. These files will be placed in the working directory of each executor. The drawback with this approach is that everything needs to be passed at launch time. Here is an example showing how to do this (repeated from the Set up section):
/usr/bin/spark-submit --master yarn-cluster --queue default \
--num-executors 20 --executor-memory 1G --executor-cores 2 \
--driver-memory 1G \
--conf spark.yarn.appMasterEnv.SPARK_HOME=/dev/null \
--conf spark.executorEnv.SPARK_HOME=/dev/null \
--files go_to_sleep.in \
go_to_sleep.py
- Pull the files from HDFS when needed: Upload the files to hdfs and downloading them inside the job’s working directory.
Here is what the go_to_sleep script would look like if we downloaded the files explicitly.
import time
from pyspark import SparkContext, SparkConf
import logging
import json
import subprocess
def sleep_mapper(seconds):
time.sleep(seconds)
if __name__ == "__main__":
logging.getLogger("py4j").setLevel(logging.ERROR)
subprocess.call(
["hadoop", "fs", "-get", "/user/thomas/go_to_sleep.in"]
)
with open("go_to_sleep.in", 'r') as f_in:
data = json.load(f_in)
conf = SparkConf().setAppName("sleep_mapper")
sc = SparkContext(conf=conf)
sc.\
parallelize(
[data.get("duration") for number in xrange(10000)]
).\
map(sleep_mapper)
with open("go_to_sleep.out", "w") as f:
f.write("Congratulations!")
f.write("\n")
Gotcha 4: It is not possible to write the results to the gateway node directly
There are two ways to work around this:
- Transform the results to an RDD and use saveAsTextFile: This is by far the cleanest way to achieve this. Granted, there are draw-backs (it appears counter intuitive, it is not possible to create a file with a single header such as a *.csv, etc.), however the simplicity of this solution makes the trade-off worth it.
- Materialize the output on the executor and manually push the file to a desired output location: There are many ways to achieve this.
Examples include uploading the file to HDFS, uploading them to Amazon S3 or copying them back to the gateway node using
scp
.
Here is what using saveAsTextFile
to persist the results look like in the case of our demo script:
import time
from pyspark import SparkContext, SparkConf
import logging
import json
import subprocess
def sleep_mapper(seconds):
time.sleep(seconds)
if __name__ == "__main__":
logging.getLogger("py4j").setLevel(logging.ERROR)
subprocess.call(
["hadoop", "fs", "-get", "/user/thomas/go_to_sleep.in"]
)
with open("go_to_sleep.in", 'r') as f_in:
data = json.load(f_in)
conf = SparkConf().setAppName("sleep_mapper")
sc = SparkContext(conf=conf)
sc.\
parallelize(
[data.get("duration") for number in xrange(10000)]
).\
map(sleep_mapper)
sc.\
parallelize(["Congratulations!"]).\
saveAsTextFile("/user/thomas/go_to_sleep.out")
Note that when using saveAsTextFile
, Spark will create a directory with the name you give, and populate the contents as files named part-XXXXX
.
If the directory already exists, pyspark will crash when it tries to perform the saveAsTextFile
operation.
Be sure the output is not present prior to launching the job (and delete it if it is).
Gotcha 5: Monitoring jobs can be more tricky
When a task is run in cluster mode, we also loose the benefits of having the driver run on the same node as the application submitted the job. Some of these benefits include logging and the ability to easily stop an application.
-
Logging: When the job is launched in client mode, the driver logs are immediately available on the gateway node. This is no longer the case in cluster mode. The good news is that obtaining the complete application log is not too hard (see this section for more information). Unfortunately, there is no longer a clean seperation between “executor-node” and “gateway-node” meaning the logs can be very verbose. Therefore, I recommend that you make your logging calls easily “grepable”.
-
Stopping applications: When the driver program resides on the gateway node, it is very easy to kill your jobs (via
ctrl + c
orkill
). This is not the case when the driver is running in the cluster. Jobs that are not behaving properly need to be explicitly killed using theyarn application -kill
command. Not doing so could lead to the cluster being clogged by non functioning jobs.
Here is what this command would look like for the example in Getting the logs for your spark application:
yarn application -kill application_1458811581197_13197
Gotcha 6: If the node your driver process is on goes away, your job hangs
Because the driver resides in the cluster it is possible for the node it is on to disappear. This is turn causes your application to hang. Since there is nothing that you can do about this, you need to live with it by:
- Writing robust spark-code that can be stopped and re-started easily, e.g. by saving partial results and designing your job to resume from there
- Checking the UI to see how your job is doing and killing then re-starting it if you don’t see any progress over time (see previous section for more information).