Databricks liancheng: Spark SQL structured data analysis
Data scientists are already familiar with R and Pandas and so on traditional data analysis framework Although provides intuitive and easy to use API, but limited to the single, cannot cover large distributed data model.In Spark 1.3.0 to Spark SQL is based on the original SchemaRDD, introduced the Spark DataFrame API, not only for Scala, Python, Java language environment provides the form such as R and Pandas API, and naturally inherited the Spark SQL distributed processing ability.In addition, the Spark 1.2.0 introduced in the external data source apis also got further perfect, integrates the complete data to support, to complement the Spark SQL data source more interoperability the final piece of the puzzle.Through the power of the data analysis and shake them in big data analysis of stones;Four two dial one thousand jins, delight!
Figure 1: the rapid growth of the Spark
The Spark SQL is one of the core components of the Spark, in April 2014 with Spark version 1.0.Above the left shows the Spark 1.0 was released in April last year increase in the number of open source contributors, basically presents a linear growth trend.Shown on the right side of the increase in the number of monthly PR is also fast.It is worth mentioning that in the midst of the Spark 1.3, Spark SQL finally graduated from alpha stage, in addition to the part of the developer API, all the public API has stabilized, can be at ease use.
As a Shark's successor, the Spark is one of the main function of SQL access the Hive of existing data.The integration with the Hive at the same time, the Spark SQL also provides a JDBC/ODBC interface.Tableau, Qlik third-party tools can be through the interface connected to the Spark SQL data processing with the help of a Spark.
However, Spark SQL application is not limited to SQL.In fact the name "Spark SQL" is not appropriate.According to the definition of Spark the official document: Spark SQL is a Spark component for dealing with structured data - the definition emphasizes the "structured data", not "SQL".A newly issued Spark 1.3 more complete expression of the Spark of SQL vision: let developers use the shorter code processing data, less as far as possible let the Spark automatically optimize SQL execution process at the same time, in order to achieve lower development costs, improve data analysis of the purpose of the execution efficiency.To this end, we in the Spark 1.3 introduced similar DataFrame to R and Python Pandas interface API, continues the development experience of traditional stand-alone data analysis, and promote the distributed data scenarios.
Similar to RDD, DataFrame container is a distributed data.However DataFrame is more like a traditional database two-dimensional table, in addition to the data, also to learn the structure of the data information, namely schema.At the same time, similar to the Hive, DataFrame also supports nested data types (struct array, and the map).Point of view, from the perspective of API to use DataFrame API provides a set of high-level relationship operations, is more friendly than functional RDD API, the threshold is lower.Because of the similar to the DataFrame R and Pandas, Spark DataFrame inherits the traditional single well data analysis of development experience.
Figure 2: the difference between DataFrame and RDD
The difference between above intuitively reflects the DataFrame and RDD.On the left side of the RDD [Person] although in Person, for a type parameter, but the Spark framework itself does not understand the Person ` internal structure of a class.And on the right side of the DataFrame has provided the detailed structural information, make the Spark can clearly know the SQL data set contains what columns, each column what is the name and type.After know this information, the Spark of SQL query optimizer can be targeted optimization.A less
appropriate examples, the difference of some similar to the dynamic type of Python and the difference between the static type of the c + +.The latter because of the type of detailed information at compile time, compile time can compile a more targeted and more optimized executable code.
External data sources API
For users, however, only a structured data abstraction are not enough.Tend to be in a variety of formats to store data in a variety of systems, and users will want to get the data from different data sources conveniently, mixing processing, and will result in a specific format to write back to data source or directly to some form of show.Spark 1.2 introduced the external data source of the API is to solve this problem.Spark a big advantage is that external data sources SQL API can be in the query under all kinds of information to a data source, so as to make full use of the optimization ability of the data source to complete columns and pruning, filter under the condition of pushing such as optimization, to achieve reduction of IO, to enhance the efficiency of the execution.Since the 1.2 release, emerge out of the community a variety of external data sources.Below is the Spark 1.3 supports an overview of various data sources (on the left is the Spark of SQL built-in support data source, the right side of the data source of service to the community developers).With the help of the external data source apis, DataFrame actually became a variety of data format and storage system for data exchange in the middle of the media: within the Spark SQL, data from all walks of life to be loaded for the DataFrame mix, unified into a single form, and then on the basis of data analysis and value.
Figure 3: DataFrame support various external data sources
Spark SQL power big data analysis
To streamline the code
DataFrame bring one of the most clear advantage is to help the user to further simplify the code.The figure below shows the respectively with Hadoop MR, Python RDD API, and Python DataFrame API to achieve three code snippet of the same business logic.Obviously Hadoop MR largest amount of code, and it's not easy to see what was in the business logic.Python RDD API version to streamline a lot, but it is still not easy to see what is doing.Python DataFrame API version than Python RDD API version are a step more diligence;And, more importantly, anyone know SQL, can see what it was doing - visible, taFrame API can not only make the code more concise, and significantly improved the readability.Spark 1.3 provides a Python, Scala and Java three languages DataFrame API binding, for users to select as needed.
Figure 4: Hadoop MR, Python RDD API, Python DataFrame API code examples
In addition to this, the Spark SQL also aimed at some common scenarios in the processing of large data and model provides some convenient tool, enables users in dealing with a recurring pattern in the different projects to avoid repeat or highly similar code:
; JSON schema automatic derivation
JSON is a good readability of important structured data format, many of the original data is often exist in the form of JSON.Yet the JSON data volume is too big, not conducive to mass data analysis.So a common data processing step is to convert JSON to ORC, Parquet and efficient column type storage format.Different versions of the JSON data, however, tend to have different schema (for example, the new version of the Twitter API returned data may be more than the old version of the API data returned by the number of columns).Manual merge the JSON data set all the records of the schema is a very boring tedious task.Spark SQL in handling the JSON data can be automatically scan the entire data sets, all recorded data column in the complete, complete schema is deduced.(for the same name but different types of columns, the Spark SQL will try code out of a public type.)
Figure 5: the Spark of irregular JSON data processing
The picture above shows the Spark SQL for three personal information not neat JSON record sorting and schema derivation process.Record with article 1 of the article 2 are similar, but more a age fields, article 3 and the former two are very similar, but the height field is of type double instead of int.The Spark SQL JSON data processing is made, there will be all the columns that are included in the final schema, for the same name but different types of columns, take all kinds of common parent types (such as int and double common parent type double).Through such processing, we finally got the right DataFrame.
; The partition table Hive style
Hive partition table can be thought of as a simple index.Each partition of a partitioned table column corresponds to each partition of the directory, the directory to < name > = < value > column format.Spark a Parquet data has realized automatic partitioning in 1.3 function: when the data exist in the directory structure of a Hive partition table, do not need to
Hive metastore of metadata, the Spark of SQL can be automatically identified as the partition table.Hence, when dealing with this table, partition pruning partitions, such as special optimization can also be implemented.
Improve execution efficiency
Using the DataFrame API, not only the code can be more concise. More importantly, performance can be improved.Below compares with the Scala, Python RDD API and DataFrame API implementation accumulative ten million integer comparison on the performance of the four procedures.As you can see, the Python DataFrame API relative to the Python RDD API execution efficiency is nearly five times the number of ascension.This is because the DataFrame API actually assembled by only a small size of logical query plan, Python side need to send to the JVM can query plan, the lion's share of the computing tasks is responsible by the JVM end.When using Python RDD API, Python VM and requires a lot of across processes data exchange between the JVM, so as to slow down the speed of Python RDD API.
It is important to note that not only the Python API have significant performance improvements, even if is to use Scala, DataFrame API version twice as fast than RDD API.The example above logic is very simple, the effect of the query optimizer is not obvious, so why would have accelerated effect?RDD API is functional, emphasizing the invariance, in most situations tend to create a new object rather than to modify the old object.Although this characteristic brings clean API, but also makes the Spark applications tend to create a large number of temporary objects at run time, pressure on the GC.On the basis of the existing RDD API, we are certainly mapPartitions method can be used to override RDD within a single shard data creation way, in the variable object reuse way to reduce the overhead of the object allocation and GC, but it sacrifices the readability of the code, and requires the developer to have the Spark run-time mechanism, higher threshold.SQL within the framework of the Spark has, on the other hand, in a variety of possible reuse objects as far as possible, this broke the invariance, government departments, but the data back to the user, also will start to immutable data.Using the DataFrame API for development, can freely enjoy the optimization results.
Reduce data read
Analysis of large data, the fastest method is to ignore it.The "ignore" not turn a blind eye, but according to the query conditions for proper pruning.
Discussing the partition table above mentioned partition cut branches is one of the - when the query involves the partitioning column filter conditions, we can cut according to the query conditions must not contain the target data partition directory, thus reducing the IO.
For some "smart" data type, the Spark SQL can also according to the data in the file attached statistics for pruning.In simple terms, in this kind of data format, data is saved, every piece of data with a maximum, minimum, number of null values such as some basic statistical
information.When statistics table name a data segment certainly does not include the target data conform to the query conditions, the data segments can skip (for example of a certain integer column a maximum of 100, and requiring a query condition > 200).
In addition, the Spark SQL also can make full use of RCFile, ORC, Parquet, such as the advantages of column type storage format, just scan queries involving real columns, ignore the remaining columns of data.
Spark SQL third goal, is to make the query optimizer to help us optimize execution efficiency, and liberate the productivity of developers, let the novice can also write efficient program.
Figure 6: the Spark SQL query optimization engine
DataFrame behind is full of Spark SQL query optimization engine, its overall structure as shown in the above.Through SQL/HiveQl parser or DataFrame API structure logic execution plan after the analysis of the analyzer and optimization to optimize execution plan, then to a physical implementation plan, and ultimately converted to RDD DAG in Spark engine is carried out.
Figure 7: demographic data analysis example
To illustrate the query optimization, we see the picture above shows the population data analysis example.Two DataFrame diagram is constructed, to join them and then made a filter operation.If left intact to carry out the execution plan, the execution efficiency is not high.Because join is the price of a larger operation, also may produce a larger data set.If we can filter pushed down to join below, to filter the DataFrame first, then join the filtered small result sets, can effectively shorten the execution time.The Spark of SQL query optimizer did just that.Logic, in short, the query plan optimization based on relational algebra is a use of equivalent transformation, the high cost of replacement for the low cost operation process.
Get the optimization of the implementation plan in converted to objects In the process of the implementation plan, can also according to the specific characteristics of the data source to push filtering under the condition of only the data source.The physical implementation plan on the right side of the Filter are disappear, because pass used to perform the final read operations in table scan node.
For ordinary developers, the meaning of the query optimizer is that even not experienced programmers write suboptimal query, can also be converted to enforce effective form as much as possible.
DataFrame As The New RDD
In Spark 1.3 DataFrame has started to replace RDD become the new data sharing abstractions.Spark under ML sample set up a set of by cutting word, word count, logistic regression of multiple links such as machine learning assembly line.The assembly line of the input, the data exchange between each link, and the output results of the pipeline are DataFrame to said.
Figure 8: machine learning assembly line
Relative to RDD, DataFrame has several features:
1. Contain schema information, can be targeted optimization.
2. The users have more friendly, more intuitive API.
3. Tight integration with external data sources API that can be used as a variety of storage format and
medium of data exchange between the storage system.
As a more efficient than RDD Abstract according to the sharing, DataFrame allows us to
more easily to be a platform for the integration of the data line.