Tag Cloud
Concourse CI/CD devops prometheus kubernetes monitoring modbus kepware c-programming IoT golang telegram bot python cli urwid elasticsearch aws ecs apache spark scala AWS EMR hadoop webhooks ssl nginx digital-ocean emr apache pig datapipeline found.io elastic-cloud rails try capybara docker capistrano heka bigquery kafka protobuf vim iterm javascript emberjs git scripting dnsmasq bem frontend meteorjs meteorite heroku

Tuning My Apache Spark Data Processing Cluster on Amazon EMR

Lately, I had the chance to work on some data integration at Pocketmath where I wrote a bunch of Spark scripts in Scala to run some transformations on a data set of about 250GB that will run on a monthly basis. In this post, I talk about some of the problems I encountered, and some considerations while setting up the cluster and also how I improved the performance of the the Spark tasks.

Dataset Size

The size of the data set is only 250GB, which probably isn’t even close to the scale other data engineers handle, but is easily one of the bigger sets for me. Nonetheless, I do think the transformations are on the heavy side; it involves a chain of rather expensive operations.

Multiple Shuffle Operations

The sequence of execution is something like JOIN, JOIN, UNION, EXPLODE, SORT within partition, then GROUP, COLLECT and finally another SORT eventually.

Each of these are pretty expensive shuffle operations. Not surprisingly, these operations posed some problems even at this moderate data size.

Spark Flow Chart

Another Difficulty

On top of that, I also had to split a file that looks like the examples below.

Essentially, I think of it as generating ranks or row numbers (of course, with a bunch of other data transformations)

From this format:

51abebcfab2ef2abeed2f 8,120,384,898 
21abfbbeef5791adef3f2 1,9,1214,8827 
...

Into these two files:

# 1,2 are like IDs
51abebcfab2ef2abeed2f 1
21abfbbeef5791adef3f2 2
...
1 8,120,384,898 
2 1,9,1214,8827 

...

It turns out that generating a consistent row number like this is a difficult operation for Spark to handle. Matter of fact, it is probably an expensive task for any distributed system to perform.

Dataframes are available in Spark 2.0 and I mainly use that data structure. The only way that I know of currently to generate these row numbers with a Dataframe is to first convert into an RDD and do a zipWithIndex on it.

val segRdd = segmentIdGroups.rdd
val rows = segRdd.zipWithIndex.map { case (r: Row, id: Long) => Row.fromSeq((id+1) +: r.toSeq) }

Okay, there is actually another method that involves windows and partitions but unfortunately it basically moves all the data into one partition, which isn’t feasible for me.

Executer Cores and Memory Allocation

While starting the Spark task in Amazon EMR, I manually set the --executor-cores and --executor-memory configurations. The calculation is somewhat non-intuitive at first because I have to manually take into account the overheads of YARN, the application master/driver cores and memory usage et cetera.

As a guideline, YARN overheads take roughly 7% and allocating more from there is generally good practice. This ensures enough is left for system processes and Hadoop e.g.

Using an instance of r3.8xlarge as an example:

# 122GB RAM, 16 cores

$CORES_PER_DRIVER=4
$MEMORY_PER_DRIVER=24G

$CORES_PER_EXECUTOR=4
$MEMORY_PER_EXECUTOR=24G

Each instance could potentially run 4 executors, with 4 cores per executor. Available memory for the HEAP is 122/4 = 30.5G.

To account for the overheads, I multiply the available memory by 0.93. This works out to be 28G. For my experiment, I used 24G just to account for the overheads but on hindsight, 26G should be enough too.

Recap:

total_memory_available / total_cores_available * (1 - 0.07)

// 122GB/16 * (1 - 0.07) = 28G
// Leave some leeway, to 24-26G

This directly affects how many executors that can be deployed per instance and also affects the memory available for each task, and consequently for each shuffle operation.

Task / Partition Size

Another critical configuration is the task size; it is something that I think should be considered carefully because the task will slow down by quite a bit if it starts to spill to disk.

Initially, I just set the default_parallelism in Spark and expected the system to automatically handle the rest, and was surprised to see some stages spilling to disk causing the cluster to slow down. I later found out that default_parallelism is only used for certain operations and for the rest of the time, Spark would infer the size by looking at the input from a previous stage, which happens to be the number of files it reads: 300.

Each of these files are roughly 800-900MB. And having that few paritions is not ideal since each of these tasks / partitions will be too big to fit into memory.

Well alright, this actually depends on your executor setup too.

I had to force a repartition via df.repartition(2000) right after the reading of the files. This would immediately add a shuffle step but performs better later on in other tasks in my opinion, YMMV though.

Shuffle Memory Usage, Executor Memory-to-CPU ratio

In general, I tried to optimize the system to avoid any form of spilling, both memory and disk. If the entire shuffle operation can fit into memory, there will be no spilling.

Each core in an executor runs a single task at any one time. Hence, with 26GB per executor and 4 cores each executor, the HEAP_SIZE allocated for each task is 26G/4 or 4G.

However, not all the memory allocated to the executor is used for shuffle operations.

The memory available for shuffle can be calculated as such:

// Per task
24/4 * 0.2 * 0.8 = 0.96GB

// 0.2 -> spark.shuffle.memoryFraction
// 0.8 -> spark.shuffle.safetyFraction

If your task is already spilling to disk, try using this formula to find out how much space it actually needs. This might help you to better fine tune the RAM-to-CPU ratio for ur executor tasks.

shuffle_write * shuffle_spill_mem * (4)executor_cores
—————————————————————————————————————————————————————————————————————
shuffle_spill_disk * (24)executor_mem * (0.2)shuffle_mem_fraction * (0.8)shuffle_safety_fraction

Splitting the task size properly is probably one of the bigger improvements while tuning my cluster. Key takeaway: It is definitely better to err on a higher number of partitions, which results in a smaller task bite size.

Counter Example

To quickly illustrate how things can go wrong in a problematic configuration, I’ll use one of my iterations as an example.

I first used c3 & m3 instances and only allocated 10G per 3 cores. This works out to be about 500MB for shuffling each task. I had only 300 partitions and my task sizes were well beyond 850MB.

It resulted in a ton more needless computations:

Shuffle Spills

Getting the Right Partition Size and Instance Type

It is crucial to get the partition size right for it to run smoothly, but getting the right instance type makes it much more efficient.

Back to my setup, with about 2000 paritions and 250GB data, each partition or task works out to be only about 125MB, which is close to the 128MB that is recommended in the official docs.

At that partition size, it is more efficient to run c3.8xlarge instances with a lower memory to core ratio. I did choose to use i2.2xlarge memory instances to eliminate any possibility of a memory constraint issue but the c3.8xlarge would’ve been much faster.

# c3.8xlarge: 32 VCPUS, 60GB Memory
$CORES_PER_EXECUTOR=4
$MEMORY_PER_EXECUTOR=6.5G

# Available memory for shuffle, more than enough for 125mb
6.5 / 4 * 0.2 * 0.8 = 0.26G

Spot Instances and HDFS

Amazon EMR allows you to bid for spot instances at a fraction of the cost of the original instance price. I use them frequently and have found them to be massively discounted during some hours.

I had HDFS running for the cluster and the results of each result stage are stored into the HDFS for future use. At first, I ran a test using spot instances completely, even for the CORE instance group, which turned out to be a big mistake.

When I lose the instances, inevitably, from getting outbidded during peak hours, the cluster loses data. In my experience, Spark is unable to fully recover from the lost data, even after taking extra time for stage-retries.

With everything taken into consideration, I found it easier to just use on-demand instances for the CORE instance group entirely.

Instance Setup

I recommend using non-spot, disk-optimized instances for the CORE instance group. For example, I got two 800GB SSDs with an i2.2xlarge which costs only ~$1.70 per hour. In comparison, c3 or r3 instances give you way lesser disk space.

For the actual computation in the task instance group, I would switch to using only spot instances (r3.8xlarge or c3.8xlarge). I’ve found that this was the most cost-efficient to run my task.

To bring it all together, I used 20 spot instances of r3.8xlarge. Mentioned this above, but I’ll say it again. Memory instances were used just to eliminate any potential issue of shuffle spills but c3.8xlarge would’ve been more efficient definitely as I’ve showned above. For the core group, there were 3 i2.2xlarge on-demand instances for the HDFS.

To Persist or Not?

In some of the heavy shuffles, I found that it was faster to persist them on disk to prevent re-calculations. This is especially true if you’re re-using scala variables further down the chain. Obviously, you’ll need to look into the total calculation time and compare it with the network read bytes (divided by an average network throughput) to see if it is worth while to persist.

One good thing is that the Amazon EMR handles the HDFS integrations seamlessly which makes it effortless to do a DISK_ONLY persistance. One thing to note is that since I was doing some disk persistance, I do end up using more disk space than the total data size. This is also why I chose to use the i2 instances for HDFS.

Recalculations vs Persistence

Recalculations will yield different sequences every time.

In my case, having to generate a key for each row was the one requirement I couldn’t remove.

Without persisting it to disk first, using the variable again would cause a re-calculation through the stages and zipWithIndex could potentially produce results which are different each time and be rendered absolutely useless.

I was forced to persist them to disk but either way, since they were heavy operations, persisting them to disk made sense too.

Resources

All of the information has been sourced from multiple sources including, but not limited to:

comments powered by Disqus