|
Welcome to ShortScience.org! |
|
|
[link]
## Introduction
A variable x is said to obey a power-law if it is drawn from a probability distribution function (pdf) of the form $p(x) = Cx^{-\alpha}$ where $C$ is called the **normalization constant** and $\alpha$ is called **scaling parameter** or exponent. Often, the power-law applies only for values greater than some minimum $x$, called $x_\text{min}$. The paper describes various statistical techniques to test if a given distribution follows a power-law or not.
Power-law distributions come in both continuous and discrete flavor with the discrete case being more involved than the continuous one. So, the discrete power-law behavior is often approximated by continuous power-law behavior for the sake of convenience. One reliable approximation is to assume that discrete values of $x$ are generated from a continuous power-law and then rounded to nearest integer to get the discrete values.
Sometimes, complementary cumulative distribution function (or CDF) is also considered where $P(X) = p(x ≥ X)$
## Fitting power-laws to empirical data
Power-law distribution makes a straight line on the log-log plot. This slope can be calculated using the method of least square linear regression. But simple line fitting does not guarantee that data follows a power-law distribution. Moreover, the assumption of independent, Gaussian noise, which is a pre-requisite for linear regression, does not hold for this case.
### Estimating scaling parameter
Assuming that we know the value of $x_\text{min}$, the value of $\alpha$ can be obtained by the *method of maximum likelihood*. Maximum likelihood estimator (MLE) for continuous case is given as:
https://github.com/shagunsodhani/powerlaw/raw/master/paper/assets/MLEContinous.png
and that for the discrete case is given as:
https://github.com/shagunsodhani/powerlaw/raw/master/paper/assets/MLEDiscrete.png
The equation for the discrete case is only an approximation as there is no exact MLE for discrete case.
MLE method outperforms several linear regression based approaches like line fitting on the log-log plot, line fitting after performing logarithmic binning (done to reduce fluctuations in the tail of the distribution), line fitting to CDF with constant size bins and line fitting to CDF without any bins. But for any finite sample size $n$ and any choice of $x_\text{min}$, there is bias present which decays as $O(1/n)$ and can be ignored for $n ≥ 50$
### Estimating $x_\text{min}$
If we choose a value of $x_\text{min}$ less than the original value, then we will get a biased value of $\alpha$ as we will be fitting power-law to non power-law region as well. If we choose a value larger than the original value, we will be losing legitimate data points (leading to statistical errors). But it is more acceptable to make a higher estimate of $x_\text{min}$ than the original value.
One approach is to plot the PDF or CDF on the log-log plot and mark the point beyond which the distribution becomes roughly straight or to plot $\alpha$ as a function of $x_\text{min}$ and mark the point beyond which the value appears relatively stable. But these approaches are not objective as roughly straight and relatively stable are not quantified.
The approach proposed by Clauset et al. [Clauset, A., M. Young, and K. S. Gleditsch, 2007, Journal of Conflict Resolution 51, 58] is as follows:
Choose a value of $x_\text{min}$ such that the probability distribution of the measured data and best-fit power-law model are as similar as possible. Similarity between distributions can be measured using Kolmogorov-Smirnov (KS) statistic which is defined as:
https://github.com/shagunsodhani/powerlaw/raw/master/paper/assets/KSStatistics.png
where $S(x)$ is CDF of given data with values greater than or equal to $x_\text{min}$ and *P(x)* is the CDF of power-law model that best fits the data in the region $x ≥ x_\text{min}$. This method works well for both continuous and discrete data and is recommended for the general case. Other measures, like weighted KS or Kuiper statistics, can also be used in place of KS statistic.
## Testing the power-law hypothesis
MLE and other approaches do not tell us whether power-law is a possible fit to the given data - all they do is find the best fit values of $x_\text{min}$ and $\alpha$ assuming the data comes from a power-law distribution. A basic approach would be to calculate the value of $x_\text{min}$ and $\alpha$ and use them to hypothesize a power-law distribution from which the data is drawn. We then check the validity of this hypothesis using the goodness-of-fit tests.
### Goodness-of-fit tests
A large number of synthetic data sets are generated from the hypothesized power-law distribution. Then each of these distributions is fitted to their own power-law model individually and the KS statistics is calculated for each distribution. The *p-* value is defined to be the fraction of synthetic datasets where the distance (KS statistic value) is greater than the distance for given dataset. A large value of *p* (close to 1) means that the fluctuations between given data and the hypothesized model could be because of statistical fluctuations alone while a small value of *p* (close to 0) means that the model is not a possible fit to the distribution.
### Dataset generation
The generated dataset needs to be such that it has a distribution similar to the given data below $x_\min$ and follows the fitted power-law above $x_\text{min}$. Suppose the given data has *n<sub>tail</sub>* observations (where $x ≥ x_\text{min}$) and *n* observations in total. With a probability of $x_\text{min}/n$, a random number $x_i$ is generated from the hypothesized power-law distribution. With a probability of $1- n_{tail}/n$, a number is picked randomly from the given dataset with $x < x_\text{min}$. This way, the generated dataset of n elements is expected to follow powerlaw above $x ≥ x_\text{min}$ and same distribution as given data below $x_\text{min}$.
If we want the *p-* values to be accurate to within about *ε* of the true value, then we should generate at least *1/4 ε<sup>-2</sup>* synthetic data sets.
The power law is ruled out if *p ≤ 0.1*. A large *p-* value does not mean that the power-law is the correct distribution for the data. There can be other distributions that can fit the data equally well or even better. Moreover, for small values of n, it is possible that the given distribution will follow a power law closely, and hence that the p-value will be large, even when the power law is the wrong model for the data.
## Alternate distributions
*p-* value test can only be used to reject the power-law hypothesis and not accept it. So even if *p-value > 0.1*, we can only say that power-law hypothesis is not rejected. It could be the case that some other distribution fits the data equally well or even better. To eliminate this possibility, we calculate a *p-* value for a fit to the alternate distribution and compare it with the *p-* value for the power-law. If the *p-* value for power-law is high and the *p-* value for the other distribution is low, we can say that data is more likely to be drawn from the power-law distribution (though we still can not be sure that it is **definitely** drawn from the power-law distribution).
### Likelihood Ratio Test
This test can be used to directly compare two distributions against one another to see which is a better fit for the given data. The idea is to compute the likelihood of the given data under the two competing distributions. The one with the higher likelihood is taken to be the better fit. Alternatively, the ratio of the two likelihoods, or the logarithm *R* of the ratio can be used. If *R* is close enough to zero, then it could go to either side of zero, depending on statistical fluctuations. So *R* value needs to be sufficiently far from zero. To check for this, Vuong's method [Vuong, Q. H., 1989, Econometrica 57, 307] is used which gives a *p-* value that can tell if the conclusion from the value of *R* is statistically significant. If this *p-* value is small (*p < 0.1*), the result is significant. Otherwise, the result is not taken to be reliable and the test does not favor either distribution.
Other than the likelihood ratio, several other tests like minimum description length (MDL) or cross-validation can also be performed.
![]() |
|
[link]
The [Pregel paper](https://kowshik.github.io/JPregel/pregel_paper.pdf) introduces a vertex-centric, large-scale graph computational model. Interestingly, the name Pregel comes from the name of the river which the Seven Bridges of Königsberg spanned. ### Computational Model The system takes as input a directed graph with properties assigned to both vertices and edges. The computation consists of a sequence of iterations, called supersteps. In each superstep, a user-defined function is invoked on each vertex in parallel. This function essentially implements the algorithm by specifying the behaviour of a single vertex V during a single superstep S. The function can read messages sent to the vertex V during the previous superstep (S-1), change the state of the vertex or its out-going edges', mutate the graph topology by adding/removing vertices or edges and by sending messages to other vertices that would be received in the next superstep (S+1). Since all computation during a superstep is performed locally, the model is well suited for distributed computing and synchronization is needed only between supersteps. The computation terminates when every vertex is in the deactivated state. When computation starts, all vertices are in active state. A vertex deactivates itself by voting to halt and once deactivated, it does not take part in subsequent supersteps. But any time a deactivated vertex receives a message, it becomes activated again and takes part in subsequent supersteps. The resulting state machine is shown below:  The output of the computation is the set of values produced by the vertices. Pregel adopts a pure message passing model that eliminates the need of shared memory and remote reads. Messages can be delivered asynchronously thereby reducing the latency. Graph Algorithms can also be expressed as a sequence of MapReduce jobs, but that requires passing the entire state of the graph from one stage to another. It also requires coordinating the various steps of chained MapReduce. In contrast, Pregel keeps vertices and out-going edges on machine performing the computation and only messages are transferred across. Though Pregel is similar in concept to MapReduce, it comes with a natural graph API and efficient support for running iterative algorithms over the graph. ### API 1. Pregel programs are written by subclassing the vertex class. 2. The programmer overrides the compute() method. This is the user-defined function that is invoked on each vertex. 3. Within compute(), the programmer can modify the value of vertex or out-going edges' properties and these changes are reflected immediately. These are the only per-vertex state that persists throughout each superstep. 4. All messages sent to vertex V during superstep S are available via an iterator in superstep S+1. While it is guaranteed that each message would be delivered exactly once, there is no guarantee on the order of delivery. 5. User defined handlers are invoked when destination vertex does not exist. 6. Combiners are used to reduce the number of messages to be transferred by aggregating messages from different vertices (all of which are on the same machine) which have the same destination vertex. Since there is no guarantee about which messages will be aggregated and in what order, only commutative and associative operations can be used. 7. Aggregators can be used for capturing the global state of the graph where each vertex provides a value to the aggregator during each superstep and these values are combined by the system using a reduce operation (which should be associative and commutative). The aggregated value is then available to all the vertices in the next superstep. ### Topology Mutation Since compute() method allows the graph topology to be modified, conflicting requests can be made in the same superstep. Two mechanisms are used to handles the conflicts: * Within a super step, following order is followed when resolving conflicting operations: * Edge removal * Vertex removal * Vertex addition * Edge addition. * User defined handlers are used for conflicts that can not be resolved by partial ordering alone. This would include scenarios where there are multiple requests to create a vertex with different initial values. In these cases, the user defines how to resolve the conflict. The coordination mechanism is lazy in the sense that global mutations do not require coordination until the point they are applied. ### Execution Steps * Several instances of the user program start executing on a cluster of machines. One of the instances becomes the master and the rest are the workers. * The master partitions the graph based on vertex id with each partition consisting of a set of vertices and its out-going edges. * These partitions are assigned to workers. Each worker executes the compute method on all its vertices and manages messages to and from other vertices. * The master instructs each worker to perform a superstep. The workers run the computer method on each vertex and tell the master how many vertices would be active in the next superstep. This continues as long as even one vertex is active. * Once all the vertices become deactivated, the master may ask the workers to save their portion of the graph. ### Fault Tolerance Fault tolerance is achieved through checkpointing where the master instructs the workers to save the state of computation to persistent storage. Master issues regular "ping" messages to workers and if a worker does not receive a message from the master in a specified time interval, the worker terminates itself. If the master does not hear back from the worker, the worker is marked as failed. In this case, the graph partitions assigned to the failed worker are reassigned to the active workers. All the active workers then load the computation state from the last checkpoint and may repeat some supersteps. An alternate to this would be confined recovery where along with basic checkpointing, the workers log out-going messages from their assigned partitions during graph loading and subsequent supersteps. This way, lost partitions can be recomputed from log messages and the entire system does not have to perform a rollback. ### Worker Implementation A worker contains a mapping of vertex id to vertex state for its portion of the complete graph. The vertex state would comprise of its current value, its put-going edges, the queue containing incoming messages and a flag marking whether it is in the active state. Two copies of queue and flag are maintained, one for the current superstep and one for the next superstep. While sending a message to another vertex, the worker checks if the destination vertex is on the same machine. If yes, it places the message directly on the receiver's queue instead of sending it via the network. In case the vertex lies on the remote machine, the messages are buffered and sent to destination worker as a single network message. If a combiner is specified, it is applied to both the message being added to the outgoing message queue and to the message received at the incoming message queue. ### Master Implementation The master coordinates the workers by maintaining a list of currently alive workers, their addressing information and the information on the portion of graph assigned to them. The size of master's data structure depends on the number of partitions and a single master can be used for a very large graph. The master sends the computation task to workers and waits for a response. If any worker fails, the master enters recovery mode as discussed in the section on fault tolerance. Otherwise, it proceeds to the next superstep. It also runs an internal hHTTP server to serve statistics about the graph and the state of the computation. ### Aggregators Workers combine all the values supplied to an aggregator, by all the vertices in a superstep, into a single local value. At the end of the superstep, the workers perform the tree-based reduction on the local value and deliver the global values to the master. The tree-based reduction is better than pipelining with a chain of workers as it allows fro more parallelization. ### Applications/Experiments The paper has described how to implement PageRank, ShortestPath, Bipartite Matching and Semi Clustering algorithm on top of Pregel. The emphasis is on showing how to think of these algorithms in a vertex-centric manner and not on how to implement them on Pregel in the best possible way. The experiments were conducted with the single-source shortest paths algorithm with input as binary trees and log-normal graphs. Default partitioning strategy and naive implementation of the algorithms was used to show that satisfactory performance can be achieved with little coding effort. The runtime increases approximately linearly in the graph size. ### Limitations One obvious limitation is that the entire computation state resides in main memory. Secondly, Pregel is designed around sparse graphs and performance will take a hit in case of dense graphs where a lot of communication takes place between vertices. The paper counters this by arguing that realistic dense graphs and algorithms with dense computation are rare. Moreover, communication in such dense networks can be reduced by using aggregators and combiners. An add-on would be to support dynamic partitioning of graph based on message traffic to minimize communication over the network. Pregel's open-source implementation, called [Giraph](http://giraph.apache.org/), adds several features beyond the basic Pregel model, including out-of-core computation, and edge-oriented input which does take away some of the original limitations. Facebook is using Giraph to analyze its social network and has [scaled it to a trillion edges](https://code.facebook.com/posts/509727595776839/scaling-apache-giraph-to-a-trillion-edges/) showing the scalability of the Pregel model itself. ![]() |
|
[link]
This week I read upon [GraphX](https://amplab.cs.berkeley.edu/wp-content/uploads/2014/02/graphx.pdf), a distributed graph computation framework that unifies graph-parallel and data-parallel computation. Graph-parallel systems efficiently express iterative algorithms (by exploiting the static graph structure) but do not perform well on operations that require a more general view of the graph like operations that move data out of the graph. Data-parallel systems perform well on such tasks but directly implementing graph algorithms on data-parallel systems is inefficient due to complex joins and excessive data movement. This is the gap that GraphX fills in by allowing the same data to be viewed and operated upon both as a graph and as a table.
### Preliminaries
Let $G = (V, E)$ be a graph where $V = \{1, ..., n\}$ is the set of vertices and $E$ is the set of $m$ directed edges. Each directed edge is a tuple of the form $(i, j) \in E$ where $i \in V$ is the source vertex and $j \in V$ is the target vertex. The vertex properties are represented as $P_V(i)$ where $i \in V$ and edge properties as $P_E (i, j)$ for edge $(i, j) \in E$. The collection of all the properties is $P = (P_V, P_E)$. The combination of graph structure and properties defines a property graph $G(P) = (V, E, P)$.
Graph-Parallel Systems consist of a property graph $G = (V, E, P)$ and a vertex-program $Q$ that is instantiated simultaneously on all the vertices. The execution on vertex $v$, called $Q(v)$, interacts with execution on the adjacent vertices by message passing or shared state and can read/modify properties on the vertex, edges and adjacent vertices. $Q$ can run in two different modes:
* **bulk-synchronous mode** - all vertex programs run concurrently in a sequence of super-steps.
* **asynchronous mode** - vertex programs run as and when resources are available and impose constraints on whether neighbouring vertex-programs can run concurrently.
**Gather-Apply-Scatter (GAS)** decomposition model breaks down a vertex-program into purely edge-parallel and vertex-parallel stages. The associative *gather* function collects the inbound messages on the vertices, the *apply* function operates only on the vertices and updates its value and the *scatter* function computes the message to be sent along each edge and can be safely executed in parallel.
GrapX uses bulk-synchronous model and adopts the GAS decomposition model.
### GraphX Data Model
The GraphX Data Model consists of immutable collections and property graphs. Collections consist of unordered tuples (key-value pairs) and are used to represent unstructured data. The property graph combines the structural information (in the form of collections of vertices and edges) with properties describing this structure. Properties are just collections of form $(i, P_V (i))$ and $((i, j), P_E (i, j))$. The collection of vertices and edges are represented using RDDs (Resilient Distributed Datasets). Edges can be partitioned as per a user defined function. Within a partition, edges are clustered by source vertex id and there is an unclustered index on target vertex id. The vertices are hash partitioned by id and stored in a hash index within a partition. Each vertex partition contains a bitmask which allows for set intersection and filtering. It also contains a routing table that logically maps a vertex id to set of edge partitions containing the adjacent edges. This table is used when constructing triplets and is stored as a compressed bitmap.
### Operators
Other than standard data-parallel operators like `filter`, `map`, `leftJoin`, and `reduceByKey`, GraphX supports following graph-parallel operators:
* `graph` - constructs property graph given a collection of edges and vertices.
* `vertices`, `edges` - decompose the graph into a collection of vertices or edges by extracting vertex or edge RDDs.
* `mapV`, `mapE` - transform the vertex or edge collection.
* `triplets` -returns collection of form $((i, j), (P_V (i), P_E (i, j), P_V (j)))$. The operator essentially requires a multiway join between vertex and edge RDD. This operation is optimized by shifting the site of joins to edges, using the routing table, so that only vertex data needs to be shuffled.
* `leftJoin` - given a collection of vertices and a graph, returns a new graph which incorporates the property of matching vertices from the given collection into the given graph without changing the underlying graph structure.
* `subgraph` - returns a subgraph of the original graph by applying predicates on edges and vertices
* `mrTriplets` (MapReduce triplet) - logical composition of triplets followed by map and reduceByKey. It is the building block of graph-parallel algorithms.
All these operators can be expressed in terms on relational operators and can be composed together to express different graph-parallel abstractions. The paper shows how these operators can be used to construct a enhanced version of Pregel based on GAS. It also shows how to express connected components algorithm and `coarsen` operator.
### Structural Index Reuse
Collections and graphs, being immutable, share the structural indexes associated within each vertex and edge partition to both reduce memory overhead and accelerate local graph operations. Most of the operators preserve the structural indexes to reuse them. For operators like subgraph which restrict the graph, the bitmask is used to construct the restricted view.
### Distributed Join Optimization
##### Incremental View Maintenance
The number of vertices that change between different steps of iterative graph algorithms decreases as the computation converges. After each operation, GraphX tracks which vertices have been changed by maintaining a bit mask. When materializing a vertex view, it uses values from the previous view for vertices which have not changed and ships only those vertices which are changed. This also allows for another optimization when using the `mrTriplets` operation: `mrTriplets` support an optional argument called *skipStale*. when this option is enabled, the `mrTriplets` function does not apply on edges origination from vertices that have not changed since its last iteration. This optimization uses the same bitmask that incremental views were using.
##### Automatic Join elimination
GraphX has implemented a JVM bytecode analyzer that determines whether source/target vertex attributes are referenced in a mrTriplet UDF (for map) or not. Since edges already contain the vertex ids, a 3-way join can be brought down to 2-way join if only source/target vertex attributes are needed (as in PageRank algorithm) or the join can be completely eliminated if none of the vertex attributes are referenced.
### Sequential Scan vs Index Scan
Using structural indices, while reduces computation cost in iterative algorithms, prevents physical data from shrinking. To counter this issue, GraphX switches from sequential scan to bitmap index scan when the fraction of active vertices drops below 0.8. Since edges are clustered by source vertex id, bitmap index scan can efficiently join edges and vertexes together.
### Other Optimizations
* Though GraphX uses Spark's shuffle mechanism, it materializes shuffled data in memory itself, unlike Spark which materializes shuffle data in disk and relies on OS buffer cache to cache the data. The rationale behind this modification is that graph algorithms tend to be communication intensive and inability to control when buffers are flushed can lead to additional overhead.
* When implementing join step, vertices routed to the same target are batched, converted from row-orientation to column-orientation and compressed by LZF algorithm and then sent to their destination.
* During shuffling, integers are encoded using a variable encoding scheme where for each byte, the first 7 bits encode the value, and the highest order bit indicates if another byte is needed for encoding the value. So smaller integers can be encoded with fewer bytes and since, in most cases, vertex ids are smaller than 64 bits, the technique helps to reduce an amount of data to be moved.
### System Evaluation
GraphX was evaluated against graph algorithms implemented over Spark 0.8.1, Giraph 1.0 and GraphLab 2.2 for both graph-parallel computation tasks and end-to-end graph analytic pipelines. Key observations:
* GraphLab benefits from its native runtime and performs best among all the implementations for both PageRank and Connected Components algorithm.
* For connected components algorithm, Giraph benefits from using edge cuts but suffers from Hadoop overhead.
* GraphX outperforms idiomatic implementation of PageRank on Spark, benefitting from various optimizations discussed earlier.
* As more machines are added, GraphX does not scale linearly but it still outperforms the speedup achieved by GraphLab (for PageRank).
* GraphX outperforms Giraph and GraphLab for a multi-step, end-to-end graph analytics pipeline that parses Wikipedia articles to make a link graph, runs PageRank on the link graph and joins top 20 articles with their text.
GraphX provides a small set of core graph-processing operators, implemented on top of relational operators, by efficiently encoding graphs as a collection of edges and vertices with two indexing data structures. While it does lag behind specialised systems like Giraph and GraphLab in terms of graph-parallel computation tasks, GraphX does not aim at speeding up such tasks. It instead aims to provide an efficient workflow in end-to-end graph analytics system by combining data-parallel and graph-parallel computations in the same framework. Given that it does outperform all the specialised systems in terms of end-to-end runtime for graph pipelines and makes the development process easier by eliminating the need to learn and maintain multiple systems, it does seem to be a promising candidate for the use case it is attempting to solve.
![]() |
|
[link]
This paper, by Yahoo, describes a new language called Pig Latin which is intended to provide a middle ground between declarative SQL-style language (which many developers find unnatural) and procedural map-reduce model (which is very low-level and rigid). It also introduces a novel, interactive debugging environment called Pig Pen.
#### Overview
A Pig Latin program is a sequence of steps — each step carrying out a single high-level transformation. Effectively the program specifies the query execution plan itself. The program then compiles into map-reduce jobs which are run on Hadoop (though other backends can also be plugged in). Pig Latin is more expressive than map-reduce which is essentially limited to use a one-input, two-stage data flow model. Moreover, since the map and reduce functions are opaque for each other, optimizations are hard to bake in the system itself. This limitation is also overcome with Pig Latin which allows the programmer to order the execution of individual steps by specifying the execution plan.
Unlike traditional DBMS, Pig does not require data to be imported into system managed tables as it meant for offline, ad-hoc, scan-centric, read-only workloads. Pig supports User Defined Functions (UDFs) out of the box. Since it targets only parallel processing, there is no inbuilt support for operations like non-equi joins or correlated subqueries. These operations can still be performed using UDFs.
#### Nested Data Model
Pig supports a flexible, nested data model with 4 supported types-
1. Atom: Simple values like string or integer. eg ‘medium’
2. Tuple: Sequence of ‘fields’ which can be of any type. eg (‘medium’, 12)
3. Bag: Collection of tuples. eg {(‘medium’, 12), ((‘github’, ‘twitter’), 12)}
4. Map: Collection of key-value pairs where keys are atoms and values can be any type. eg [‘key’ -> ‘value’, ‘anotherKey’ -> (‘another’, ‘value’)]
#### Inbuilt functions
1. FOREACH — Specifies how each tuple is to be processed. The semantics require that there should be no dependence between processing of different input tuples to allow parallel processing.
2. COGROUP — Suppose we have two datasets:
result = (query, url) //a query and its result url
revenue = (query, amount) //a query and revenue generated by the query.
Cogrouping these two datasets, we get
grouped_data = COGROUP results BY query, revenue BY query.
grouped_data would contain one tuple for each group. The first field of the tuple is the group identifier and the other fields are bags — one for each dataset being cogrouped.So 1st bag would correspond to results and other to revenue. A sample dataset has been shown here.
COGROUP is one level lower than JOIN as it only groups together tuples into nested bags. It can be followed by an aggregate operation or cross product operation (leading to the result expected from JOIN operation). GROUP is same as COGROUP on a single dataset.
3. Nested Operations where commands can be nested within FOREACH command to process bags within tuples.
Other functions include — LOAD, STORE, FILTER, JOIN, CROSS, DISTINCT, UNION and ORDER and are similar in operation to equivalent SQL operations.
It may be argued as to how does Pig differ from SQL-style query language when it seems to be using similar operations. Compare the following queries which generate the same result. The first one is written in SQL (declarative) and other in Pig (procedural)
SELECT category, AVG(pagerank) FROM urls WHERE pagerank > 0.2 GROUP BY category HAVING COUNT(*) > 106
good_urls = FILTER urls BY pagerank > 0.2;
groups = GROUP good_urls BY category;
big_groups = FILTER groups BY COUNT(good_urls)>106 ;
output = FOREACH big_groups GENERATE category, AVG(good_urls.pagerank);
#### Implementation
The Pig interpreter parses the commands and verifies that the referred input files and bags are valid. It then builds a logical plan for every bag that is being defined using the logical plans for the input bags, and the current command. These plans are evaluated lazily to allow for in-memory pipelining and filter reordering. The parsing and logical plan construction are independent of the execution platform while the compilation of the logical plan into a physical plan depends on the execution platform.
For each COGROUP command, the compiler generates a map-reduce job where the map function assigns keys for grouping and the reduce function is initially a no-op. The commands intervening between two subsequent COGROUP commands A and B are pushed into the reduce function of A to reduce the amount of data to be materialized between different jobs. The operations before the very first COGROUP operation are pushed into the map function of A. The ORDER command compiles into two map-reduce jobs. The first job samples the input data to determine quantiles of the sort key. The second job range-partitions the input data according to the quantiles to provide equal-sized partitions, followed by local sorting in the reduce phase, resulting in a globally sorted file.
The inflexibility of the map-reduce primitive results in some overheads while compiling Pig Latin into map-reduce jobs. For example, data must be materialized and replicated on the distributed file system between successive map-reduce jobs. When dealing with multiple data sets, an additional field must be inserted in every tuple to indicate which data set it came from. However, the Hadoop map-reduce implementation does provide many desired properties such as parallelism, load balancing, and fault-tolerance. Given the productivity gains to be had through Pig Latin, the associated overhead is often acceptable. Besides, there is the possibility of plugging in a different execution platform that can implement Pig Latin operations without such overheads.
Since the files reside in the Hadoop distributed file system, LOAD operation can run in parallel. Similarly, parallelism is achieved for FILTER and FOREACH operation as any map-reduce job runs several map and reduce instances in parallel. COGROUP operation runs in parallel by re-partitioning the output from multiple map instances to multiple reduce instances.
To achieve efficiency when working with nested bags, Pig uses Hadoop’s combiner function to achieve a two-tier tree evaluation of algebraic functions. So all UDFs, of algebraic nature, benefit from this optimization. Of course, non-algebraic functions can not take advantage of this.
#### Pig Pen
It is the interactive debugging environment that also helps to construct Pig Latin program. Typically the programmer would write the a program and run it on a dataset, or a subset of the dataset (if running over the entire dataset is too expensive) to check for correctness and change the program accordingly. Pig Pen can dynamically create a side data set (a subset of the complete dataset), called sandbox dataset, that can be used to test the program being constructed. This dataset is aimed to be real (ie subset of actual data), concise and complete(illustrate the key semantics of each command). While the paper does not go into the depth of how this dataset is created, it does mention that it starts by taking small, random samples of base data and synthesizes additional data tuples to improve completeness.
Within Yahoo, Pig Latin has been used in a variety of scenarios like computing rollup aggregates and performing temporal and session analysis. While Pig Latin does provide a powerful nested data model and supports UDFs making it easier to write and debug map-reduce jobs, it does not deal with issues like materializing and replicating data between successive map-reduce jobs. The paper argues that this overhead is acceptable given the numerous advantages Hadoop offers. Pig has come a long way since the paper was written. A lot of new functions have been added and it now comes with an interactive shell called Grunt. Moreover, now UDFs can be written in various scripting languages and not just Java. All these changes have made Pig more powerful and accessible than before.
![]() |
|
[link]
The paper describes the architecture of RDDs (Resilient Distributed Datasets), what problems they can be used to solve, how they perform on different benchmarks and how they are different from existing solutions.
Many generalized cluster computing frameworks, like MapReduce and Dryad, lack in two areas:
1. Iterative algorithms where intermediate results are used across multiple computations.
2. Interactive data analysis where users run ad-hoc queries on the data.
One way around these problems is to use specialized frameworks like Pregel. But this leads to loss of generality. This is the problem that RDD intends to solve — by providing a general purpose, fault tolerant, distributed memory abstraction.
#### RDD Overview
RDDs are immutable partitioned collections that are created through deterministic operations on data in stable storage or other RDDs. They keep enough information about how they are derived from other sources (this information is called lineage). This lineage ensures that RDDs can be easily reconstructed in case of failures without having to perform explicit checkpointing. In fact, a program can not reference an RDD that it can not reconstruct after a failure. RDDs are lazy and ephemeral. They are constructed on demand and discarded after use. This allows for pipelining of many operations. For example:
rawData = spark.textfile(filepath) // read data from file
dataAfterApplyingFirstFilter = rawData.filter(condition1)
dataAfterApplyingSecondFilter = dataAfterApplyingFirstFilter.filter(condition2)
dataAfterApplyingSecondFilter.save()
The execution will take place on line 4, and the two filter conditions can be merged into a single condition to avoid multiple passes over the data.
#### RDD Model
RDDs provide an interface based on fine-grained reads and coarse-grained updates. This means transformations (functions) are applied to all data items. These transformations can be logged to build lineage graph so as to provide fault tolerance. But this update nature makes RDDs unsuitable for applications like incremental web crawler that needs asynchronous fine-grained updates to a shared state. In such cases, DSM (Distributed Shared Memory) would be a better choice as it provides fine-grained reads and writes. Although RDDs offer many advantages over DSM. First, unlike DSM, RDDs do not need to incur checkpointing overhead. Second, RDDs, being immutable, can mitigate stragglers (slow nodes), by running backup tasks just like MapReduce. Third, since only bulk writes are supported, run time can schedule tasks based on data locality to enhance performance. Lastly, even if RDDs choose to take checkpoints (in cases where the lineage graph grows very big), consistency is not a concern because of the immutable nature of RDDs.
RDDs have been implemented in Spark to provide a language integrated API. Details about this implementation have been discussed here separately.
#### Representing RDDs
The paper proposes a graph-based representation of RDDs where an RDD is expressed through a common interface that exposes five functions:
1. partition — represents atomic pieces of the dataset.
2. dependencies — list of dependencies that an RDD has on its parent RDDs or data sources
3. iterator —a function that computes an RDD based on its parents
4. partitioner — whether data is range/hash partitioned.
5. preferredLocation — nodes where a partition can be accessed faster due to data locality.
The most interesting aspect of this representation is how dependencies are expressed. Dependencies belong to one of the two classes:
1. Narrow Dependencies — where each partition of the parent node is used by at most one child partition. For example, map and filter operations.
2. Wide Dependencies — where multiple child partitions use a single parent partition.
Narrow dependencies support pipelined execution on one cluster node while wide dependencies require data from all parent partitions to be available and to be shuffled across nodes. Recovery is easier with narrow dependencies while in the case of wide dependencies, failure of a single partition may require a complete re-execution. The figure shows some examples of narrow and wide dependencies. Note that join operation defines a narrow dependency when parents are hash-partitioned and wide dependency in other cases.
https://cdn-images-1.medium.com/max/800/1*9I0CaywrdzUpg7RKbGlMow.png
Figure 1: Example of narrow and wide dependencies.
#### Job Scheduler
Whenever an “action” is executed, the scheduler builds a DAG (Directed Acyclic Graph) of stages based on the lineage graph. Each stage would contain pipelined transformations with narrow dependencies. The boundaries between different stages are the shuffle operation which are required by wide dependencies. Some of these stages may be precomputed (due to the persistence of previous computations). For remaining tasks, the scheduler uses delay scheduling to assign tasks to machines based on data locality. For wide dependencies, intermediate records are materialized on nodes holding the parent partition.
### Evaluation
Spark outperforms Hadoop and HadoopBinMem for following reasons:
1. Minimum overhead of Hadoop Stack as Hadoop incurs around 25 seconds of overhead to complete the minimal requirements of job setup, starting tasks, and cleaning up.
2. Overhead of HDFS while serving data as HDFS performs multiple memory copies and a checksum to serve each block.
3. Deserialization cost to convert binary data to in-memory Java objects.
Note that HadoopBinMem converts input data to low-overhead binary format and stores it in an in-memory HDFS instance.
Case studies also show that Spark performs well for interactive data analysis and other user applications. One limitation of the experiments is that in all the cases comparing the 3 systems, the cluster had sufficient RAM to keep all the data in-memory. It would have been interesting to compare the performance of the three systems in the case where the cluster does not have sufficient RAM to keep the entire data in main memory.
#### Comparison with existing systems
RDDs and Spark learn from and improve the existing systems in many ways.
1. Data flow models like MapReduce share data through stable storage but have to incur the cost of data replication, I/O and serialization.
2. DryadLINQ and FlumeJava provide language integrated APIs and pipeline data across operators in the same query. But unlike Spark, they can not share data across multiple queries.
3. Piccolo and DSM do not provide a high-level programming interface like RDDs. Moreover, they use checkpointing and roll back which are more expensive than lineage based approach.
4. Nectar, Ceil and FlumeJava do not provide in-memory caching.
5. MapReduce and Dryad use lineage based recovery within a computation, but this information is lost after a job ends. In contrast, RDDs persists lineage information across computations.
RDDs can be used to express many existing models like MapReduce, DryadLINQ, Pregel, Batched Stream Processing, etc. This seems surprising given that RDDs offer only a limited interface due to their immutable nature and coarse-grained transformations. But these limitations have a negligible impact on many parallel applications. For example, many parallel programs prefer to apply the same operation to many records to keep the program simple. Similarly, multiple RDDs can be created to represent different versions of the same data.
The paper also offers an interesting insight on the question of why previous frameworks could not offer the same level of generality. It says previous frameworks did not observe that “the common cause of these problems was a lack of data sharing abstractions”.
![]() |