[link]
This paper talks about how Spark SQL intends to integrate relational processing with Spark itself. It builds on the experience of previous efforts like Shark and introduces two major additions to bridge the gap between relational and procedural processing: 1. DataFrame API that provides a tight integration between relational and procedural processing by allowing both relational and procedural operations on multiple data sources. 2. Catalyst, a highly extensible optimizer which makes it easy to add new data sources and algorithms. #### Programming Interface Spark SQL uses a nested data model based on Hive and supports all major SQL data types along with complex (eg array, map, etc) and user-defined data types. It ships with a schema inference algorithm for JSON and other semistructured data. This algorithm is also used for inferring the schemas of RRDDs (Resilient Distributed Datasets) of Python objects. The algorithm attempts to infer a static tree structure of STRUCT types (which in turn may contain basic types, arrays etc) in one pass over the data. The algorithm starts by finding the most specific Spark SQL type for each record and then merges them using an associative most specific supertype function that generalizes the types of each field. A DataFrame is a distributed collection of rows with the same schema. It is equivalent to a table in an RDBMS. They are similar to the native RDDs of Spark as they are evaluated lazily, but unlike RDDs, they have a schema. A DataFrame represents a logical plan and a physical plan is built only when an output function like save is called. Deferring the execution in this way makes more space for optimizations. Moreover. DataFrames are analyzed eagerly to identify if the column names and data types are valid or not. DataFrames supports query using both SQL and a DSL which includes all common relational operators like select, where, join and groupBy. All these operators build up an abstract syntax tree (AST) of the expression (think of an expression as a column in a table), which is then optimized by the Catalyst. Spark SQL can cache data in memory using columnar storage which is more efficient than Spark’s native cache which simply stores data as JVM objects. The DataFrame API supports User-defined functions (UDFs) which can use the full Spark API internally and can be registered easily. To query native datasets, Spark SQL creates a logical data scan operator (pointing to the RDD) which is compiled into a physical operator that accesses fields of the native objects in-place, extracting only the fiel needed for a query. This is better than traditional object-relational mapping (ORM) which translates an entire object into a different format. Spark MLlib implemented a new API based on pipeline concept (think of a pipeline as a graph of transformations on the data) and choose DataFrame as the format to exchange data between pipeline stages. This makes is much easier to expose MLlib’s algorithms in Spark SQL. #### Catalyst Catalyst is an extensible optimizer based on Scala’s standard features. It supports both rule-based and cost-based optimizations and makes it easy to add new optimization techniques and data sources to Spark SQL. At its core, Catalyst is powered by a general library that represents and manipulates trees by applying rules to them. Tree is the main data type in Catalyst and is composed of node objects where a node has a type and zero or more children. Rules are functions to transform one tree to another. Trees offer a transform method that applies a pattern matching function recursively on all the nodes of the tree, transforming only the matching nodes. Rules can match multiple patterns in the same transform call and can contain arbitrary Scala code which removes the restriction of using a Domain Specific Language (DSL) only. Catalyst groups rules into batches and executes each batch until it reaches a fixed point (ie the tree stops changing). This means that each rule can be simple and self-contained while producing a global effect on the tree. Since both nodes and trees are immutable, optimizations can be easily performed in parallel as well. Spark SQL uses Catalyst in four phases: **Logical Plan Analysis** which requires resolving attribute references (one for which we do not know the type or which have not been matched to an input table). It uses a Catalog object to track the tables in all data sources to resolve references. **Logical Optimization** phase applies standard rule-based optimizations to the logical plan which include constant folding, predicate pushdown, projection pruning, null propagation, Boolean expression simplification, etc. In **Physical Planning** phase, Spark SQL generates multiple physical plans corresponding to a single logical plan and then selects one of the plans using a cost model. It also performs some rule-based physical optimizations like the previous case. In **Code Generation** phase, Catalyst uses quassiquotes (provided by Scala) to construct an AST that can be fed to Scala Compiler and bytecode can be generated at runtime. Extensions can be added even without understanding how Catalyst works. For example, to add a data source, one needs to implement a createRelation function that takes a set of key-value parameters and returns a BaseRelation object if successfully loaded. This BaseRelation can then implement interfaces to allow Spark SQL access to data. Similarly to add user-define types (UDTs), the UDTs are mapped to Catalyst’s inbuilt types. So one needs to provide a mapping from an object of UDT to a Catalyst row of built in types and an inverse mapping back. #### Future Work Some recent work has also shown that it is quite easy to add special planning rules to optimize for specific use cases. For example, researchers in ADAM Project added a new rule to use interval trees instead of using normal joins for a computational genomics problem. Similarly, other works have used Catalyst to improve generality of online aggregation to support nested aggregate queries. These examples show that Spark and Spark SQL is quite easy to adapt to new use cases as well. As I mentioned previously, I am experimenting with Spark SQL and it does look promising. I have implemented some operators and it is indeed quite easy to extend. I am now looking forward to developing more concrete thing on top of it.
Your comment:
|
[link]
Data processing frameworks like MapReduce and Spark can do things that relational databases can't do very easily. For example, they can operate over semi-structured or unstructured data, and they can perform advanced analytics. On the other hand, Spark's API allows user to run arbitrary code (e.g. rdd.map(some_arbitrary_function)) which prevents Spark from performing certain optimizations. Spark SQL marries imperative Spark-like data processing with declarative SQL-like data processing into a single unified interface. Spark's main abstraction was an RDD. Spark SQL's main abstraction is a DataFrame: the Spark analog of a table which supports a nested data model of standard SQL types as well as structs, arrays, maps, unions, and user defined types. DataFrames can be manipulated as if they were RDDs of row objects (e.g. dataframe.map(row_func)), but they also support a set of standard relational operators which take ASTs, built using a DSL, as arguments. For example, the code users.where(users("age") < 40) constructs an AST from users("age") < 40 as an argument to filter the users DataFrame. By passing in ASTs as arguments rather than arbitrary user code, Spark is able to perform optimizations it previously could not do. DataFrames can also be queries using SQL. Notably, integrating queries into an existing programming language (e.g. Scala) makes writing queries much easier. Intermediate subqueries can be reused, queries can be constructed using standard control flow, etc. Moreover, Spark eagerly typechecks queries even though their execution is lazy. Furthermore, Spark SQL allows users to create DataFrames of language objects (e.g. Scala objects), and UDFs are just normal Scala functions. DataFrame queries are optimized and manipulated by a new extensible query optimizer called Catalyst. The query optimizer manipulates ASTs written in Scala using rules, which are just functions from trees to trees that typically use pattern matching. Queries are optimized in four phases: 1. Analysis. First, relations and columns are resolved, queries are typechecked, etc. 2. Logical optimization. Typical logical optimizations like constant folding, filter pushdown, boolean expression simplification, etc are performed. 3. Physical planning. Cost based optimization is performed. 4. Code generation. Scala quasiquoting is used for code generation. Catalyst also makes it easy for people to add new data sources and user defined types. Spark SQL also supports schema inference, ML integration, and query federation: useful features for big data. |