Double upgrade Apache Spark sort performance

By Laura Sullivan,2015-10-18 11:49
9 views 0
Double upgrade Apache Spark sort performance

    Double upgrade Apache Spark sort performance

    Difference between common Embarrassingly Parallel systems, such as graphs and Apache Spark the next generation of (Apache Hadoop data processing engine) the calculation engine main difference is that the "all - to - all" on the support of operation.Like many distributed engine, graphs and the operation of the Spark is usually to be shard data set as part of the shard, a lot of action only a single data processing node, at the same time, these operations involved in the data are only exist in the data sheet.All - to - all operating data must be set as a whole, and each output can be summarized from different shards of records.Spark groupByKey, sortByKey, as well as the reduceByKey shuffle function belongs to the common operation.

    In these distributed computing engine, shuffle refers to in a all - to - all operating data segmentation and aggregation operation again.It is obvious that in practical production, we have found that most of the Spark deployment performance, scalability and stability problems are produced in the process of the shuffle.

    Cloudera and Intel engineers are working together to expand the Spark shuffle, makes the shuffle can be more rapid and steady processing large data sets.Spark compared with graphs have more advantages in many aspects, at the same time on the stability and extensibility.Here, we from the sophisticated graphs shuffle lesson deployment, in order to improve the shuffle of sorting data output performance.

    In this article, we will step by step analysis, introduces the Spark shuffle operation implementation pattern, modification Suggestions, and analyze the performance way.More progress can be found in the ongoing SPARK - 2926.

    The Spark current operation mode

    A shuffle contains two sets of tasks: 1. Produce shuffle data phase;2. Use the shuffle data phase.In view of the historical reasons, the task of writing data to be called "map task", and the task of reading data is called "the reduce tasks", but the above role assignment confined to a single job in the process of a particular shuffle.Play the reduce task in a shuffle, may be in another shuffle map, because it is read operations performed in the former, while in the latter is data writing task, performed and consumed at a later stage.

    Graphs and Spark shuffle is used to "pull" model.In every map task, the data is written to a local disk, and then in the reduce task will remote requests to read the data.Due to shuffle is using all - to - all mode, any record group map task output could be used to reduce.A job in the shuffle when the map operation based on the following principles: all for the same reduce operating results will be written to the adjacent group, so that it is easier to get the data.Spark the default shuffle implementation (that is, the hash - -based shuffle) stage is the map for each reduce task alone open a file, the operation in a simple, but in practice has some problems,

    such as implementation Spark must maintain a lot of memory consumption, or cause a lot of random disk I/O.In addition, if M and R respectively represent the number of map and reduce in a shuffle operation, then the hash - -based shuffle need to produce a total of M * R the number of temporary files, shuffle consolidation will this number to C * R (C here represents the number of map tasks can run at the same time), but even after this change, when the number of reducer running too much or often appear "file open too many restrictions.

Hash - -based individual map tasks in the shuffle

    Sort - -based individual map tasks in the shuffle

    In order to further improve the stability and performance of the shuffle, starting from version 1.1, the Spark introduced "sort - -based shuffle" implementation, its function and the use of graphs is similar to map mode.The map of each task at deployment time, the output will be stored in the memory (until we run out of available memory), and then sorted in the reduce task, then spill into a separate file.If this happened many times in a single task, then the output of this task will be merged.

    In the process of reduced, a set of thread responsible for fetching remote map output blocks.After the data to enter, will they be deserialized, into another is suitable for all - to - all operation on data structure.In similar groupByKey, reduceByKey and aggregateByKey aggregation operations, such as the result becomes a ExternalAppendOnlyMap (essentially a overflow will spill to hard disk memory hash map).In similar sortByKey sorting operation, the output will become a ExternalSorter (after categorizing result might spill to hard disk, and the results are sorted returns an iterator).

    Fully Sort - -based Shuffle

    The way described above, there are two disadvantages:

    ; Each Spark the reduce tasks need to be open at the same time a large number of deserialization

    record, leading to memory a lot of consumption, and a lot of Java objects to the JVM garbage

    collection (garbage collection) pressure, will cause the system slow and caton, at the same time as a

    result of this version than the serialized version of the memory consumption is huge, and Spark

    more frequent spill earlier, to cause the disk I/O is also more frequent.In addition, because the

    footprint assessment of the deserialized object was hard to achieve 100% accuracy, so keep a lot of

    deserialize objects will increase the possibility of insufficient memory.

    ; In the boot needs within the fragmentation of the sorting operation, we need to be two sorts:

    mapper sort by fragmentation, ordered by Key reducer.

    When we modify the map in the shard by Key sort the results, so that when we reduce as long as we merge each map task sorted it blocks.We can according to the serialization of each block pattern will deposit into memory, then deserialize seriatim in merger will result.So at any time, memory deserialization record is the maximum number of amount of blocks have merged.

    The individual map tasks to completely sort - -based shuffle

    A single reduce task can receive from thousands of map task blocks, in order to make this multiplex merge more efficient, especially in the case of data more than the available memory, we introduce the hierarchical merging (tiered merger) concept.If you need to merge many stored on disk blocks, such doing can minimize disk seek quantity.Hierarchical merging

    applies to ExternalAppendOnlyMap and internal merger ExternalSorter steps, but we haven't been modified for the time being.

    High-performance integrated

    Each task has a set of thread is responsible for shuffle data synchronization and each task corresponds to the memory pool has 48 MB, used to store the corresponding data.

    We introduced SortShuffleReader, first obtained from the memory pool to blocks, and then the way (key, value) to the user code returns an iterator object.

    Spark a shuffle of all task sharing memory area, the default size is complete executor 20% of the heap.When blocks into SortShuffleReader will try to call from the main area shuffle the required memory, until memory stuffed with call fails, then we need the spill of data to a hard disk to free memory.SortShuffleReader will all (well, not all, sometimes only a small part of the spill) data in memory block in a separate file and stored in hard disk.As blocks are deposited in the hard disk, a background thread to monitor, and, when necessary, incorporating these files for larger disk blogs."Final merge" will all end in the hard disk and memory blocks all together.

    How to determine it is time for a temporary "disk to disk" merger?

    Spark. Shuffle. MaxMergeFactor (default is 100) controls the maximum of a number of hard disk blocks can be combined, when the number of hard disk blocks more than limit, a background thread will run a merger in order to reduce the number of (but not immediately work, for details, please see the code).When determining how many blocks need to merge, the thread will need to perform first merger set to minimum the number of blocks, and incorporating the value as the amount of limit, as much as possible in order to reduce the number of blocks of the merger.Therefore, if the spark. Shuffle. MaxMergeFactor is 100, and the final number of disk blocks was 110, so only a total of 11 blocks of merger, can will keep the number of disk blocks in just 100 in the end.Want to merge a single blocks, will need extra to merge again, and could lead to unnecessary disk I/O.

Hierarchical merging maxMergeWidth is 4.Each rectangle represents a segment, in

which the three combined into one, and then the final four segment are merged into an iterator,

in preparation for the next operation.

    Compared with the performance of sortByKey

    When we tested using SparkPerf sortbykey, after the corresponding modification, performance if you have any change.In which we chose two different size of data set, in order to compare our changes in memory is enough to support all shuffle data, and not enough to support the case for performance gain.

    Spark sortByKey changes lead to two jobs and three stages.

    ; Data sampling Sample stage: to create a partition, partition size is equal.

    ; Map phase: write for reduce phase shuffle the bucket.

    ; Reduce phase: shuffle related results are obtained, according to a particular data set partition


    To introduce a node of the cluster benchmark, 6 each executor contains 24 core and 36 GB of memory, large data set with 20 billion records, compressed on HDFS accounted for 409.8 GB.Small data set with 2 billion records, after the compression on HDFS accounted for 15.9 GB.Each record contains a pair of key-value pairs, 10 string in two case, we tested in more than 1000 shard, operation schedule of each stage and the total job following figure shows:

    Large data sets (the lower the better)

    Small data set (the lower the better)

    Sampling stage takes the same, because this stage does not involve shuffle process;In the map phase, under our improvement, according to the Key in each shard the data sorting, resulted in the running time of the phase increased (a 37% increase in the large data sets, small data set is 27%).But increase is greater in the reduce phase compensation, because now need to merge sort data, after the time consuming of the reduce phase of the two data sets were decreased by 66%, 27%, so as to make the large data set to accelerate small data set speed up 17%.

    There are?

    SPARK - 2926 is the SPARK of shuffle several results of improvement plan, one of many ways in this version on shuffle can better memory management:

    ; SPARK - 4550 using the map output data in memory buffer as raw data, replace the Java

    objects.The map output data space consumption less, less so as to make the spill, faster on the

    comparison of the original data.

    ; SPARK - 4452 different track in more detail the shuffle data structure of memory allocation, at the

    same time, will not need to consume memory back as soon as possible.

    ; Tracking agroupBy SPARK - 3461 after the particular Key value of the corresponding string or

    nodes, rather than all its loading into memory at a time.

Report this document

For any questions or suggestions please email