Resource Allocation is an important aspect during the execution of any spark job. If not configured correctly, a spark job can consume entire cluster resources and make other applications starve for resources.
Here I have tried to provide some insights on configuration of resource allocation while running spark. The focus area is how to configure the number of executors, memory settings of each executors and the number of cores for a Spark Job.
While launching application using spark-submit, we use some of the commonly used options like --class, --master, --deploy-mode, --conf, --num-executor, --executor-memory, --total-executor-core
The configurations and recommendations mentioned here may differ a little bit as far as the cluster managers like YARN, Mesos or Spark standalone are concerned. Let’s understand few options here.
--executor-cores
With this flag, the number of cores can be specified while invoking spark-submit, spark-shell, and pyspark from the command line, or by setting the spark.executor.cores property in the spark-defaults.conf file or on a SparkConf object. This property controls the number of concurrent tasks an executor can run. Example --executor-cores 3 means each executor can run a maximum of three tasks at the same time.
--executor-memory
With this flag, the heap memory size can be configured. The same can also be configured by spark.executor.memory property. This property impacts the amount of data Spark can cache, as well as the maximum sizes of the shuffle data structures used for grouping, aggregations, and joins.
--num-executors
With this flag, the number of executors requested are specified. The same properties can also be set using spark.executor.instances configuration property. If Dynamic allocation is enabled, we can avoid setting this properties.
Keywords – Apache Spark, Number of executor, Executor memory, Executor Cores, YARN, Application Master, Dynamic Allocation
Use Case
Let’s understand through an example. A cluster having 6 nodes running node managers. Each node is having 16 cores and 64 GB of memory. To summaries
- No of nodes: 6
- Executor core: 16
- Executor memory: 64 GB
We avoid allocating 100% of resource to YARN containers because the node needs some resources to run OS and Hadoop daemons. The application master takes 1 GB of memory and 1 core by default.
Now excluding the resource taken by application master, it would be
- No of nodes: 6
- Executor core: 16 - 1 = 15
- Executor memory: 64 - 1= 63 GB
Hence the arguments are:
- --num-executor 6
- --executor-core 15
- --executor-memory 63G
But this approach is WRONG and here are the reasons.
- We have already defined that each node manager has the capacity of 63 GB. Hence 63G for executor memory is obviously as overhead to the node manager.
- The application master will take up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node.
- Each node is having 16 cores and 15 cores is allocated per executor can lead to bad I/O throughput.
Better option / Correct Configuration
Number of cores specifies concurrent tasks for each executor. So, we might think, more concurrent tasks for each executor will give better performance. But research shows that any application with more than 5 concurrent tasks, would lead to bad show.
This number came from the ability of executor and not from how many cores a system has. So, the number 5 stays same even if you have double (32) cores in the CPU. Now we are considering executor-core = 5
The earlier wrong calculation was
- --num-executor 6
- --executor-core 15 (reduced to 5)
- --executor-memory 63G
Now num-executor per node = 15 / 5 = 3. For 6 nodes, num-executor = 6 * 3 = 18. But out of 18 executors, one executor will be allocated to Application master, hence num-executor will be 18-1=17.
Available memory is 63G. so memory per each executor will be 63/3 = 21G. However small overhead memory is also needed to determine the full memory request to YARN for each executor.
Formula for that overhead is max (384, .07 * spark.executor.memory)
Calculating that overhead - .07 * 21 (Here 21 is calculated as above 63/3) = 1.47
Since 1.47 GB > 384 MB, the overhead is 1.47.
Take the above from each 21 above => 21 - 1.47 ~ 19 GB
So, executor memory - 19 GB
The final arguments recommended to be used in spark-submit are:
- --num-executor 17
- --executor-core 5
- --executor-memory 19G
Conclusion
By default, resources in Spark are allocated statically. It can lead to some problematic cases. Above all, it's difficult to estimate the exact workload and thus define the corresponding number of executors.
If num-executor, executor-memory, total-executor-core aren’t defined correctly, it can produce 2 situations: underuse and starvation of resources. The first one means that too many resources were reserved but only a small part of them is used. The second case means that one processing takes all available resources and prevents other applications to start.
Dynamic resource allocation is one of solutions for above problems. It adapts resources used in processing according to the workload. This feature is controlled by spark.dynamicAllocation.enabled configuration entry. It helps to avoid the situation where the cluster composition doesn't fit to the workload. To use the dynamic resource allocation, the external shuffle service must be enabled.
References
https://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
http://spark.apache.org/docs/latest/submitting-applications.html
https://mapr.com/blog/resource-allocation-configuration-spark-yarn/
https://www.cloudera.com/documentation/enterprise/5-8-x/topics/admin_spark_tuning.html