[link]
The paper introduced the famous MapReduce paradigm and created a huge impact in the BigData world. Many systems including Hadoop MapReduce and Spark were developed along the lines of the paradigm described in the paper. MapReduce was conceptualized to handle cases where computation to be performed was straightforward but parallelizing the computation and taking care of other aspects of distributed computing was a pain. A MapReduce computation takes as input a set of key/value pairs and outputs another set of key/value pairs. The computation consists of 2 parts: 1. Map — A function to process input key/value pairs to generate a set of intermediate key/value pairs. All the values corresponding to each intermediate key are grouped together and sent over to the Reduce function. 2. Reduce — A function that merges all the intermediate values associated with the same intermediate key. The Map/Reduce primitives are inspired by similar primitives defined in Lisp and other functional programming languages. A program written in MapReduce is automatically parallelized without the programmer having to care about the underlying details of partitioning the input data, scheduling the computations or handling failures. The paper mentions many interesting applications like distributed grep, inverted index and distributed sort which can be expressed by MapReduce logic. #### Execution Overview When MapReduce function is invoked, following steps take place: 1. The input data is partitioned into a set of M splits of equal size. 2. One of the nodes in the cluster becomes the master and rest become the workers. There are M Map and R Reduce tasks. 3. The Map invocations are distributed across multiple machines containing the input partitions. 4. Each worker reads the content of the partition and applies the Map function to it. Intermediate results are buffered in memory and periodically written back to local disk. 5. The locations are passed on to the master which passes it on to reduce workers. These workers read the intermediate data and sort it. 6. Reduce worker iterates over the sorted data and for each unique intermediate key, passes the key and intermediate values to Reduce function. The output is appended to an output file. 7. Once all map and reduce tasks are over, the control is returned to the user program. As we saw, the map phase is divided into M tasks and reduce phase into R tasks. Keeping M and R much larger than the number of nodes helps to improve dynamic load balancing and speeds up failure recovery. But since the master has to make O(M+R) scheduling decisions and keep O(M*R) states in memory, the value of M and R can not be arbitrarily large. #### Master Node The master maintains the state of each map-reduce task and the identity of each worker machine. The location of the intermediate file also moves between the map and reduce operations via the master. The master pings each worker periodically. In case, it does not her back, it marks the worker as failed and assigns its task to some other worker. If a map task fails, all the reduce workers are notified about the newly assigned worker. Master failure results in computation termination in which case the client may choose to restart the computation. #### Locality MapReduce takes advantage of the fact that input data is stored on the machines performing the computations. The master tries to schedule a map job on a machine that contains the corresponding input data. Thus, most of the data is read locally and network IO is saved. This locality optimization is inspired by active disks where computation is pushed to processing elements close to the disk. #### Backup Tasks Stragglers are machines that take an unusually long time to complete one of the last few Map or Reduce tasks and can increase the overall execution time of the program. To account for these, when a MapReduce operation is close to completion, the master schedules backup executions of remaining in-progress tasks. This may lead to some redundant computation but can help to reduce the start-to-end execution time. #### Refinements MapReduce supports a variety of refinements including: 1. Users can specify an optional combiner function that performs a partial merge over the data before sending it over the network. This combiner operation would be performed after the Map function and would reduce the amount of data to be sent over the network. 2. MapReduce can be configured to detect if certain records fail deterministically and skip those records. It is very useful in scenarios where a few missing records can be tolerated. 3. Within a given partition, the intermediate key/value pairs are guaranteed to be processed in increasing key order. 4. Side-effect is supported in the sense that MapReduce tasks can produce auxiliary files as additional outputs from their operation. 5. MapReduce can be run in sequential mode on the local machine to facilitate debugging and profiling. 6. The master runs an internal HTTP Server that exports status pages showing metadata of computation and links to output and error files. 7. MapReduce provides a counter facility to keep track of occurrence of various events like the number of documents processed. Counters like the number of key-value pairs processed are tracked by default. Other refinements include custom partitioning function and support for different input/output types. MapReduce was profiled for two computations(grep and sort) running on a large cluster of commodity machines. The results were very solid. The grep program scanned ten billion records in 150 seconds and the sort program could sort ten billion records in 891 seconds (including the startup overhead). Moreover, the profiling showed the benefit of backup tasks and that the system is resilient to node failure. Other than performance, MapReduce offers many more benefits. The user code tends to be small and understandable as the code taking care of the distributed aspect is now abstracted. So the user does not have to worry about issues like machine failure and networking error. Moreover the system can be easily scaled horizontally. Lastly, the performance is good enough that conceptually unrelated computations can be maintained separately instead of having to mix every thing together in the name of saving that extra pass over the data.
Your comment:
|
[link]
This paper, by Yahoo, describes a new language called Pig Latin which is intended to provide a middle ground between declarative SQL-style language (which many developers find unnatural) and procedural map-reduce model (which is very low-level and rigid). It also introduces a novel, interactive debugging environment called Pig Pen. #### Overview A Pig Latin program is a sequence of steps — each step carrying out a single high-level transformation. Effectively the program specifies the query execution plan itself. The program then compiles into map-reduce jobs which are run on Hadoop (though other backends can also be plugged in). Pig Latin is more expressive than map-reduce which is essentially limited to use a one-input, two-stage data flow model. Moreover, since the map and reduce functions are opaque for each other, optimizations are hard to bake in the system itself. This limitation is also overcome with Pig Latin which allows the programmer to order the execution of individual steps by specifying the execution plan. Unlike traditional DBMS, Pig does not require data to be imported into system managed tables as it meant for offline, ad-hoc, scan-centric, read-only workloads. Pig supports User Defined Functions (UDFs) out of the box. Since it targets only parallel processing, there is no inbuilt support for operations like non-equi joins or correlated subqueries. These operations can still be performed using UDFs. #### Nested Data Model Pig supports a flexible, nested data model with 4 supported types- 1. Atom: Simple values like string or integer. eg ‘medium’ 2. Tuple: Sequence of ‘fields’ which can be of any type. eg (‘medium’, 12) 3. Bag: Collection of tuples. eg {(‘medium’, 12), ((‘github’, ‘twitter’), 12)} 4. Map: Collection of key-value pairs where keys are atoms and values can be any type. eg [‘key’ -> ‘value’, ‘anotherKey’ -> (‘another’, ‘value’)] #### Inbuilt functions 1. FOREACH — Specifies how each tuple is to be processed. The semantics require that there should be no dependence between processing of different input tuples to allow parallel processing. 2. COGROUP — Suppose we have two datasets: result = (query, url) //a query and its result url revenue = (query, amount) //a query and revenue generated by the query. Cogrouping these two datasets, we get grouped_data = COGROUP results BY query, revenue BY query. grouped_data would contain one tuple for each group. The first field of the tuple is the group identifier and the other fields are bags — one for each dataset being cogrouped.So 1st bag would correspond to results and other to revenue. A sample dataset has been shown here. COGROUP is one level lower than JOIN as it only groups together tuples into nested bags. It can be followed by an aggregate operation or cross product operation (leading to the result expected from JOIN operation). GROUP is same as COGROUP on a single dataset. 3. Nested Operations where commands can be nested within FOREACH command to process bags within tuples. Other functions include — LOAD, STORE, FILTER, JOIN, CROSS, DISTINCT, UNION and ORDER and are similar in operation to equivalent SQL operations. It may be argued as to how does Pig differ from SQL-style query language when it seems to be using similar operations. Compare the following queries which generate the same result. The first one is written in SQL (declarative) and other in Pig (procedural) SELECT category, AVG(pagerank) FROM urls WHERE pagerank > 0.2 GROUP BY category HAVING COUNT(*) > 106 good_urls = FILTER urls BY pagerank > 0.2; groups = GROUP good_urls BY category; big_groups = FILTER groups BY COUNT(good_urls)>106 ; output = FOREACH big_groups GENERATE category, AVG(good_urls.pagerank); #### Implementation The Pig interpreter parses the commands and verifies that the referred input files and bags are valid. It then builds a logical plan for every bag that is being defined using the logical plans for the input bags, and the current command. These plans are evaluated lazily to allow for in-memory pipelining and filter reordering. The parsing and logical plan construction are independent of the execution platform while the compilation of the logical plan into a physical plan depends on the execution platform. For each COGROUP command, the compiler generates a map-reduce job where the map function assigns keys for grouping and the reduce function is initially a no-op. The commands intervening between two subsequent COGROUP commands A and B are pushed into the reduce function of A to reduce the amount of data to be materialized between different jobs. The operations before the very first COGROUP operation are pushed into the map function of A. The ORDER command compiles into two map-reduce jobs. The first job samples the input data to determine quantiles of the sort key. The second job range-partitions the input data according to the quantiles to provide equal-sized partitions, followed by local sorting in the reduce phase, resulting in a globally sorted file. The inflexibility of the map-reduce primitive results in some overheads while compiling Pig Latin into map-reduce jobs. For example, data must be materialized and replicated on the distributed file system between successive map-reduce jobs. When dealing with multiple data sets, an additional field must be inserted in every tuple to indicate which data set it came from. However, the Hadoop map-reduce implementation does provide many desired properties such as parallelism, load balancing, and fault-tolerance. Given the productivity gains to be had through Pig Latin, the associated overhead is often acceptable. Besides, there is the possibility of plugging in a different execution platform that can implement Pig Latin operations without such overheads. Since the files reside in the Hadoop distributed file system, LOAD operation can run in parallel. Similarly, parallelism is achieved for FILTER and FOREACH operation as any map-reduce job runs several map and reduce instances in parallel. COGROUP operation runs in parallel by re-partitioning the output from multiple map instances to multiple reduce instances. To achieve efficiency when working with nested bags, Pig uses Hadoop’s combiner function to achieve a two-tier tree evaluation of algebraic functions. So all UDFs, of algebraic nature, benefit from this optimization. Of course, non-algebraic functions can not take advantage of this. #### Pig Pen It is the interactive debugging environment that also helps to construct Pig Latin program. Typically the programmer would write the a program and run it on a dataset, or a subset of the dataset (if running over the entire dataset is too expensive) to check for correctness and change the program accordingly. Pig Pen can dynamically create a side data set (a subset of the complete dataset), called sandbox dataset, that can be used to test the program being constructed. This dataset is aimed to be real (ie subset of actual data), concise and complete(illustrate the key semantics of each command). While the paper does not go into the depth of how this dataset is created, it does mention that it starts by taking small, random samples of base data and synthesizes additional data tuples to improve completeness. Within Yahoo, Pig Latin has been used in a variety of scenarios like computing rollup aggregates and performing temporal and session analysis. While Pig Latin does provide a powerful nested data model and supports UDFs making it easier to write and debug map-reduce jobs, it does not deal with issues like materializing and replicating data between successive map-reduce jobs. The paper argues that this overhead is acceptable given the numerous advantages Hadoop offers. Pig has come a long way since the paper was written. A lot of new functions have been added and it now comes with an interactive shell called Grunt. Moreover, now UDFs can be written in various scripting languages and not just Java. All these changes have made Pig more powerful and accessible than before. |