|
Welcome to ShortScience.org! |
|
|
[link]
The paper introduced the famous MapReduce paradigm and created a huge impact in the BigData world. Many systems including Hadoop MapReduce and Spark were developed along the lines of the paradigm described in the paper. MapReduce was conceptualized to handle cases where computation to be performed was straightforward but parallelizing the computation and taking care of other aspects of distributed computing was a pain. A MapReduce computation takes as input a set of key/value pairs and outputs another set of key/value pairs. The computation consists of 2 parts: 1. Map — A function to process input key/value pairs to generate a set of intermediate key/value pairs. All the values corresponding to each intermediate key are grouped together and sent over to the Reduce function. 2. Reduce — A function that merges all the intermediate values associated with the same intermediate key. The Map/Reduce primitives are inspired by similar primitives defined in Lisp and other functional programming languages. A program written in MapReduce is automatically parallelized without the programmer having to care about the underlying details of partitioning the input data, scheduling the computations or handling failures. The paper mentions many interesting applications like distributed grep, inverted index and distributed sort which can be expressed by MapReduce logic. #### Execution Overview When MapReduce function is invoked, following steps take place: 1. The input data is partitioned into a set of M splits of equal size. 2. One of the nodes in the cluster becomes the master and rest become the workers. There are M Map and R Reduce tasks. 3. The Map invocations are distributed across multiple machines containing the input partitions. 4. Each worker reads the content of the partition and applies the Map function to it. Intermediate results are buffered in memory and periodically written back to local disk. 5. The locations are passed on to the master which passes it on to reduce workers. These workers read the intermediate data and sort it. 6. Reduce worker iterates over the sorted data and for each unique intermediate key, passes the key and intermediate values to Reduce function. The output is appended to an output file. 7. Once all map and reduce tasks are over, the control is returned to the user program. As we saw, the map phase is divided into M tasks and reduce phase into R tasks. Keeping M and R much larger than the number of nodes helps to improve dynamic load balancing and speeds up failure recovery. But since the master has to make O(M+R) scheduling decisions and keep O(M*R) states in memory, the value of M and R can not be arbitrarily large. #### Master Node The master maintains the state of each map-reduce task and the identity of each worker machine. The location of the intermediate file also moves between the map and reduce operations via the master. The master pings each worker periodically. In case, it does not her back, it marks the worker as failed and assigns its task to some other worker. If a map task fails, all the reduce workers are notified about the newly assigned worker. Master failure results in computation termination in which case the client may choose to restart the computation. #### Locality MapReduce takes advantage of the fact that input data is stored on the machines performing the computations. The master tries to schedule a map job on a machine that contains the corresponding input data. Thus, most of the data is read locally and network IO is saved. This locality optimization is inspired by active disks where computation is pushed to processing elements close to the disk. #### Backup Tasks Stragglers are machines that take an unusually long time to complete one of the last few Map or Reduce tasks and can increase the overall execution time of the program. To account for these, when a MapReduce operation is close to completion, the master schedules backup executions of remaining in-progress tasks. This may lead to some redundant computation but can help to reduce the start-to-end execution time. #### Refinements MapReduce supports a variety of refinements including: 1. Users can specify an optional combiner function that performs a partial merge over the data before sending it over the network. This combiner operation would be performed after the Map function and would reduce the amount of data to be sent over the network. 2. MapReduce can be configured to detect if certain records fail deterministically and skip those records. It is very useful in scenarios where a few missing records can be tolerated. 3. Within a given partition, the intermediate key/value pairs are guaranteed to be processed in increasing key order. 4. Side-effect is supported in the sense that MapReduce tasks can produce auxiliary files as additional outputs from their operation. 5. MapReduce can be run in sequential mode on the local machine to facilitate debugging and profiling. 6. The master runs an internal HTTP Server that exports status pages showing metadata of computation and links to output and error files. 7. MapReduce provides a counter facility to keep track of occurrence of various events like the number of documents processed. Counters like the number of key-value pairs processed are tracked by default. Other refinements include custom partitioning function and support for different input/output types. MapReduce was profiled for two computations(grep and sort) running on a large cluster of commodity machines. The results were very solid. The grep program scanned ten billion records in 150 seconds and the sort program could sort ten billion records in 891 seconds (including the startup overhead). Moreover, the profiling showed the benefit of backup tasks and that the system is resilient to node failure. Other than performance, MapReduce offers many more benefits. The user code tends to be small and understandable as the code taking care of the distributed aspect is now abstracted. So the user does not have to worry about issues like machine failure and networking error. Moreover the system can be easily scaled horizontally. Lastly, the performance is good enough that conceptually unrelated computations can be maintained separately instead of having to mix every thing together in the name of saving that extra pass over the data. ![]() |
|
[link]
Bigtable is the distributed, scalable, highly available and high performing database that powers multiple applications across Google — each having a different demand in terms of the size of data to be stored and latency with which results are expected. Though Bigtable is not open source itself, the paper was crucial in the development of powerful open source databases like Cassandra (borrows BigTable’s data model), HBase and LevelDB. The paper Bigtable: A Distributed Storage System for Structured Data describes the story of how BigTable is designed and how it works. #### Data Model Bigtable is a sorted, multidimensional key-value store indexed by a row key, a column key, and a timestamp. For example in the context of storing web pages, the page URL could be the row key, various attributes of the page could be the column key and the time when the page was fetched could be the timestamp. Rows are maintained in lexicographic order and row range is dynamically partitioned with each row range being referred to as a tablet. Reads over short range are very fast due to locality and all read/write operations under a row are atomic. Column keys are grouped into sets called column family. These sets are created before any data is stored. A table should have a small number of distinct column families. A column key has the following syntax — family:qualifier. For example, for family as language, the keys could be language:english or language:hindi. A row in a Bigtable can contain multiple versions of same data, indexed by timestamps, and stored in decreasing order of timestamps. Old versions can be garbage collected in the way that the client can specify that only last n entries are to be kept or entries for only last n days(hours/weeks etc) are to be kept. #### Building Blocks Bigtable uses Google File System (GFS) for storing logs and data in SSTable file format. SSTable provides an immutable, ordered (key, value) map. Immutability provides certain advantages which will be discussed later. An SSTable consists of a sequence of blocks and a block index to locate the blocks. When the SSTable is opened, this index is loaded into the main memory for fast lookup. Bigtable also uses a highly available and persistent distributed lock service called Chubby for handling synchronization issues. Chubby provides a namespace consisting of directories and small files which can be used as locks. Read and write access to a file is atomic. Clients maintain sessions with a Chubby service which needs to be renewed regularly. Bigtable depends on Chubby so much that if Chubby is unavailable for an extended period of time, Bigtable will also be unavailable. #### Implementation There are 3 major components — a library linked into the client, one master server and multiple tablet servers. Master server assigns tablets to tablet servers, load balances these servers and garbage collect files in GFS. But the client data does not move through the master, clients directly contact tablet servers for read and write operations. Tablet servers manage a set of tablets — they handle the read/write operations directed to these tablets and also split very large tablets. A 3-level hierarchy is maintained to store tablet location information. The first level is a file stored in Chubby that has the location of the root table. Root table contains the location of all tables in a METADATA tablet. Each entry in this METADATA tablet contains the location of a set of user tablets. These tablet locations are cached in the client library and can also be fetched from the above scheme by recursively moving up the hierarchy. Each tablet is assigned to one tablet server at a time. The master server keeps track of which servers are alive, which tablet is assigned to which server and which tablets are unassigned. When a tablet server restarts, it creates and acquires an exclusive lock on a unique file in the servers directory (Chubby directory) which is monitored by the master server. If the tablet server loses its exclusive lock, it will stop serving its tablets though it will attempt to reconnect as long as the file exists in the servers directory. In case the file is deleted, the tablet server kills itself. The master tracks tablets that are not being served and reassigns them. To avoid network partition issues, the master kills itself if its Chubby session expires though its tablets are not reassigned. A master performs following tasks at startup time: 1. Grab the unique master lock to prevent concurrent master instantiation. 2. Find live servers by scanning the servers directory. 3. Find what tablets are assigned to each server by messaging them. 4. Find the set of all tablets by scanning the METADATA tablet. Tablet creation, deletion or merge is initiated by the master server while tablet partition is handled by tablet servers who notifies the master. In case the notification is lost, the master would be notified when it asks for the split tablet to be loaded. Memtables are in-memory buffers to store recently committed updates to Bigtable. These updates are later written to GFS for persistent storage. Tablets can be recovered by reading the metadata from METADATA tablet which contains a set of Redo points and list of SSTables comprising the tablet. There are 3 kinds of compactions in action: 1. Minor compaction — As the memtable grows in size and reaches a certain threshold, it is frozen, a new memtable is created and the frozen memtable is written to GFS. 2. Merging Compaction — Minor compaction keeps increasing the count of SSTables. Merging compaction reads the contents of a few SSTables and the memtable and writes it to a new SSTable. 3. Major Compaction — In Major compaction, all the SSTables are written into a single SSTable. #### Refinements Along with the described implementation, several refinements are required to make the system reliable and high performing. Clients can club together column families into locality groups for which separate SSTables are generated. This allows for more efficient reads. If SSTables are small enough, they can be kept in main memory as well. SSTables can be compressed in various formats. The compression ratio gets even better when multiple versions of same data are stored. **Caching** — Tablet servers employ 2 levels of caching. 1. Scan Cache — It caches (key, value) pairs returned by the SSTable and is useful for applications that read same data multiple times. 2. Block Cache — It caches SSTable blocks read from GFS and useful when the application uses locality of reference. **BloomFilters** are created for SSTables (particularly for the locality groups). They help to reduce the number of disk access by predicting if an SSTable may contain data corresponding to a particular row, column pair. **Commit Logs** are maintained at tablet server level instead of tablet level to keep the number of log file small. Each tablet server maintains 2 log writing threads — each writing to its own and separate log file and only one of the threads is active at a time. If one of the threads is performing poorly (say due to network congestion), the writing switches to other thread. Log entries have sequence numbers to allow recovery process. We earlier saw that SSTable entries are immutable. The advantage is that no synchronization is needed during read operations. This also makes it easier to split tablets. Permanently removing deleted data is taken care of by garbage collector. ![]() |
|
[link]
This paper talks about how Spark SQL intends to integrate relational processing with Spark itself. It builds on the experience of previous efforts like Shark and introduces two major additions to bridge the gap between relational and procedural processing: 1. DataFrame API that provides a tight integration between relational and procedural processing by allowing both relational and procedural operations on multiple data sources. 2. Catalyst, a highly extensible optimizer which makes it easy to add new data sources and algorithms. #### Programming Interface Spark SQL uses a nested data model based on Hive and supports all major SQL data types along with complex (eg array, map, etc) and user-defined data types. It ships with a schema inference algorithm for JSON and other semistructured data. This algorithm is also used for inferring the schemas of RRDDs (Resilient Distributed Datasets) of Python objects. The algorithm attempts to infer a static tree structure of STRUCT types (which in turn may contain basic types, arrays etc) in one pass over the data. The algorithm starts by finding the most specific Spark SQL type for each record and then merges them using an associative most specific supertype function that generalizes the types of each field. A DataFrame is a distributed collection of rows with the same schema. It is equivalent to a table in an RDBMS. They are similar to the native RDDs of Spark as they are evaluated lazily, but unlike RDDs, they have a schema. A DataFrame represents a logical plan and a physical plan is built only when an output function like save is called. Deferring the execution in this way makes more space for optimizations. Moreover. DataFrames are analyzed eagerly to identify if the column names and data types are valid or not. DataFrames supports query using both SQL and a DSL which includes all common relational operators like select, where, join and groupBy. All these operators build up an abstract syntax tree (AST) of the expression (think of an expression as a column in a table), which is then optimized by the Catalyst. Spark SQL can cache data in memory using columnar storage which is more efficient than Spark’s native cache which simply stores data as JVM objects. The DataFrame API supports User-defined functions (UDFs) which can use the full Spark API internally and can be registered easily. To query native datasets, Spark SQL creates a logical data scan operator (pointing to the RDD) which is compiled into a physical operator that accesses fields of the native objects in-place, extracting only the fiel needed for a query. This is better than traditional object-relational mapping (ORM) which translates an entire object into a different format. Spark MLlib implemented a new API based on pipeline concept (think of a pipeline as a graph of transformations on the data) and choose DataFrame as the format to exchange data between pipeline stages. This makes is much easier to expose MLlib’s algorithms in Spark SQL. #### Catalyst Catalyst is an extensible optimizer based on Scala’s standard features. It supports both rule-based and cost-based optimizations and makes it easy to add new optimization techniques and data sources to Spark SQL. At its core, Catalyst is powered by a general library that represents and manipulates trees by applying rules to them. Tree is the main data type in Catalyst and is composed of node objects where a node has a type and zero or more children. Rules are functions to transform one tree to another. Trees offer a transform method that applies a pattern matching function recursively on all the nodes of the tree, transforming only the matching nodes. Rules can match multiple patterns in the same transform call and can contain arbitrary Scala code which removes the restriction of using a Domain Specific Language (DSL) only. Catalyst groups rules into batches and executes each batch until it reaches a fixed point (ie the tree stops changing). This means that each rule can be simple and self-contained while producing a global effect on the tree. Since both nodes and trees are immutable, optimizations can be easily performed in parallel as well. Spark SQL uses Catalyst in four phases: **Logical Plan Analysis** which requires resolving attribute references (one for which we do not know the type or which have not been matched to an input table). It uses a Catalog object to track the tables in all data sources to resolve references. **Logical Optimization** phase applies standard rule-based optimizations to the logical plan which include constant folding, predicate pushdown, projection pruning, null propagation, Boolean expression simplification, etc. In **Physical Planning** phase, Spark SQL generates multiple physical plans corresponding to a single logical plan and then selects one of the plans using a cost model. It also performs some rule-based physical optimizations like the previous case. In **Code Generation** phase, Catalyst uses quassiquotes (provided by Scala) to construct an AST that can be fed to Scala Compiler and bytecode can be generated at runtime. Extensions can be added even without understanding how Catalyst works. For example, to add a data source, one needs to implement a createRelation function that takes a set of key-value parameters and returns a BaseRelation object if successfully loaded. This BaseRelation can then implement interfaces to allow Spark SQL access to data. Similarly to add user-define types (UDTs), the UDTs are mapped to Catalyst’s inbuilt types. So one needs to provide a mapping from an object of UDT to a Catalyst row of built in types and an inverse mapping back. #### Future Work Some recent work has also shown that it is quite easy to add special planning rules to optimize for specific use cases. For example, researchers in ADAM Project added a new rule to use interval trees instead of using normal joins for a computational genomics problem. Similarly, other works have used Catalyst to improve generality of online aggregation to support nested aggregate queries. These examples show that Spark and Spark SQL is quite easy to adapt to new use cases as well. As I mentioned previously, I am experimenting with Spark SQL and it does look promising. I have implemented some operators and it is indeed quite easy to extend. I am now looking forward to developing more concrete thing on top of it. ![]() |
|
[link]
The paper introduces a new framework called Spark which focuses on use cases where MapReudce fails. While a lot of applications fit the MapReduce’s acyclic data flow model, there are use cases requiring iterative jobs where MapReduce is not greatly efficient. Many machine learning algorithms fall into this category. Secondly with MapReduce, each query incurs a significant overhead because it is effectively a different job each time. So MapReduce is not the ideal solution for doing interactive analysis. This is the space which Spark intends to fill in. Spark is implemented in Scala and now supports API in Java, Python, and R #### Programming Model The model supports RDDs, parallel operations and 2 type of shared variables. A driver program implements the high-level control flow and launches different operations in parallel. Resilient Distributed Datasets (RDDs) are a read-only collection of objects, partitioned across a set of machines in a cluster. A handle to RDD contains enough information to compute the RDD from data in case of partition failure. RDDs can be constructed: 1. From a file in a Hadoop supported file system. 2. By “parallelizing” a Scala collection. 3. By transforming an existing RDD using operations like flatMap, map, and filter. 4. By changing persistence of an existing RDD. flatMap, map and filter are standard operations as supported by various functional programming languages. For example map will take as input a list of items and return a new list of items after applying a function to each item of the original list. RDDs are lazy and ephemeral. They are constructed on demand and discarded after use. The persistence can be changed to cache which means they are still lazy but are kept in memory (or disk if they can not fit memory) or to save where they are saved to disk only. Some supported parallel operations(at time of writing the paper) include: 1. reduce: Combines dataset elements using an associative function to produce a result at the driver program. 2. collect: Sends all elements of the dataset to the driver program. 3. foreach: Passes each element through a user-provided function Since the paper was authored, Spark has come a long way and support much more transformations (sample, union, distinct, groupByKey, reduceByKey, sortByKey, join, cartesian, etc), parallel operations (shuffle, count, first, take, takeSample etc), and persistence options (memory only, memory and disk, disk only, memory only serialized, etc). Spark supports 2 kinds of shared variables: Broadcast variables — These are read-only variables that are distributed to worker nodes for once and can be used multiple times (for reading). A use case of this variable would be training data which can be sent to all the worker nodes once and can be used for learning different models instead of sending the same data with each model. Accumulators — These variables are also shared with workers, the different being that the driver program can only read them and workers can perform only associative operations on them. A use case could be when we want to count the total number of entries in a data set, each worker fills up its count accumulator and sends it to the driver which adds up all the received values. #### Implementation The core of Spark is the implementation of RDDs. Suppose we start by reading a file, then filtering the lines to get lines with the word “ERROR” in them, then we cache the results and then count the number of such lines using the standard map-reduce trick. RDDs will be formed corresponding to each of these steps and these RDDs will be stored as a link-list to capture the lineage of each RDD. https://cdn-images-1.medium.com/max/800/1*6I7aiD2bPrsw32U76Q5q2g.png Lineage chain for distributed dataset objects Each RDD contains a pointer to its parent and information about how it was transformed. This lineage information is sufficient to reconstruct any lost partition and checkpointing of any kind is not required. There is no overhead if no node fails and even if some nodes fails, only select RDDs need to be reconstructed. Internally an RDD object implements a simple interface consisting of three operations: 1. getPartitions, which returns a list of partition IDs. 2. getIterator(partition), which iterates over a partition. 3. getPreferredLocations(partition), which is used for task scheduling to achieve data locality. Spark is similar to MapReduce — it sends computation to data instead of the other way round. This requires shipping closures to workers — closures to define and process a distributed dataset. This is easy given Scala uses Java serialization. However unlike MapReduce, operations are performed on RDDs that can persist across operations. Shared variables are implemented using classes with custom serialization formats. When a broadcast variable b is created with a value v, v is saved to a file in the shared file system. The serialized form of b is a path to this file. When b’s value is queried, Spark checks if v is in the local cache. If not, it is read from the file system. For accumulators, each accumulator is given a unique ID upon creation and its serialized form contains its ID and the “zero” value. On the workers, a separate copy of the accumulator is created for each thread and is reset to “zero” value. Once the task finishes, the updated value is sent to the driver program. #### Future Work The paper describes how an early stage implementation performs on Logistic Regression, Alternating Least Square, and interactive mode. The results seem to outperform MapReduce largely because of caching the results of previous computations. This makes Spark a good alternative for use cases where same data is read into memory again and again (iterative jobs fit the category.) Spark has come a long way since the paper was written. It now supports libraries for handling SQL-like queries (SparkSQL), streaming data (Spark streaming), graphs (GraphX) and machine learning (MLlib) along with more transformations and parallel operations. I came across Spark while working at Adobe Analytics and have been reading about it to learn more. The cool thing about Spark is that it supports interactive analysis and has APIs in Python, R and Java thus making it easy to adopt. While I have not done some much work around Spark, I am looking forward to making something on top of it. ![]() |
|
[link]
Twitter’s Real-Time Related Query Suggestion Architecture. The paper tells the story behind architecture to support Twitter’s real-time related query suggestion, why the architecture had to be designed twice and what lessons can be learned from this exercise. It does not talk much about the algorithms, rather it talks about the different design decisions that lead to the current architecture — the focus is not on how things were done but why they were done a certain way. Twitter has an interesting use case — search assistance — which boils down to things like a user searching for “Obama” and getting results for related queries like “White House” as well. Spelling correction is also a part of search assistance. The problem is quite well researched from volume perspective of data but in Twitter’s context, velocity mattered as much as volume. The results had to adapt to rapidly evolving global conversations in real-time — where real-time loosely translates to a target latency of 10 minutes. The real-time sense is important in Twitter’s context where “relevance” has a temporal aspect as well. For example, after the Nepal Earthquake in 2015, the query “Nepal” led to results related to “earthquake” as well. Then when the new constitution was passed in Nepal, the query “Nepal” led to results related to “constitution”. The time frame in which suggestions have maximum impact is very narrow. A suggestion made too early would seem irrelevant while a suggestion made too late would seem obvious and hence less impactful. These fast moving signals have to be mixed with slow moving ones for insightful results. Sometimes the query volume is very low in which case longer observation period is needed before suggestions can be made. Twitter noticed that 17% of top 1000 query terms churn over an hourly basis which means they are no longer in top 1000 after an hour. Similarly, around 13% of top 1000 query terms are churned out every day. This suggested that a fine-grained tracking of search terms was needed. Twitter started with a basic idea: if two queries are seen in the same context, then they are related. Now they had a large open design space. For example, context can be defined by user’s search session or tweet or both. Measures like log likelihood, chi-square test etc can be used to quantify how often 2 queries appear together. To consider the temporal effect, counts are decayed time. Finally, Twitter has to combine these factors, and some more factors, together to come up with a ranking mechanism. This paper does not focus on what algorithms were chosen for these tasks, it focuses on how an end-to-end system was created. #### Hadoop Solution Twitter has a powerful petabyte-scale Hadoop-based analytics platform. Both real-time and batch processes write data to the Hadoop Distributed File System (HDFS). These include bulk exports from databases, application logs, and many other sources. Contents are serialized using either Protocol Buffers or Thrift, and LZOcompressed. There is a work-flow manager, called Oink, which schedules recurring jobs and handles dataflow dependencies between jobs. For example, if job B requires data generated by job A, A will be scheduled first. Twitter wanted to take advantage of this stack and the first version was deployed in form of a Pig script that aggregated user search sessions to compute term and cooccurrence statistics and ranked related queries on top of the existing stack. While the results were pretty good, the latency was too high and results were not available until several hours. #### Bottleneck #1 Log Imports Twitter uses Scribe to aggregate streaming log data in an efficient manner. These logs are rich with user interaction and are used by the search assistant. A Scribe daemon is running on each production server where it collects and sends local log data (consisting of category and message) to a cluster of aggregators which are co-located with a staging Hadoop cluster. This cluster merges per-category streams from the server daemons and writes the results to HDFS of the staging cluster. These logs are then transformed and moved to the main Hadoop data warehouse in chunks of data for an hour. These log messages are put in per-category, per-hour directories and are bundled in a small number of large files. Only now can the search assistant start its computations. The hierarchical aggregation is required to “roll up” data into few, large files as HDFS is not good at handling large numbers of small files. As a result, there is a huge delay from when the logs are generated to when they are available for processing. Twitter estimated that they could bring down the latency to tens of minutes by re-engineering their stack though even that would be too high. #### Bottleneck #2 Hadoop: Hadoop is not meant for latency sensitive jobs. For example, a large job could take tens of seconds to just startup — irrespective of the amount of data crunched. Moreover, the Hadoop cluster was a shared resource across Twitter. Using a scheduler (in this case, FairScheduler) is not the ideal solution as the focus is on predictable end-to-end latency bound and not resource allocation. Lastly, the job completion time depending on stragglers. For some scenarios, a simple hash partitioning scheme created chunks of “work” with varying size. This lead to large varying running times for different map-reduce jobs. For scripts that chain together Hadoop jobs, the slowest task becomes the bottleneck. Just like with log imports, Twitter estimated the best case scenario for computing query suggestions to be of the order of ten minutes. Starting with the Hadoop stack had many advantages like a working prototype was built quickly and ad hoc analysis could be easily done. This also helped them to understand the query churn and make some important observations about factors to use in search assistant. For example, Twitter discovered that only 2 sources of context — search sessions and tweets — were good enough for an initial implementation.But due to high latency, Twitter had to restrict this solution to the experimental stage itself. #### Current Architecture Firehose is the streaming API that provides access to all tweets in real time and the frontend, called Blender, brokers all requests and provides a streaming API for queries — also called query hose. These two streams are used by EarlyBird, the inverted indexing engine, and search assistant engine. Now client logs are not needed as Blender has all search sessions. Twitter search assistance is an in-memory processing engine comprising of two decoupled components: 1. Frontend Nodes — These are lightweight in-memory caches which periodically read fresh results from HDFS. They are implemented as a Thrift service, and can be scaled out to handle increased query load. 2. Backend Nodes — These nodes perform the real computations. The backend processing engine is replicated but not sharded. Every five minutes, computed results are persisted to HDFS and every minute, the frontend caches poll a known HDFS location for updated results. Request routing to the replicas is handled by a ServerSet, which provides client-side load-balanced access to a replicated service, coordinated by ZooKeeper for automatic resource discovery and robust failover. Each backend instance is a multi-threaded application that consisting of: 1. Stats collector: Reads the firehose and query hose 2. In-memory stores: Hold the most up-to-date statistics 3. Rankers: Periodically execute one or more ranking algorithm by getting raw features from the in-memory stores. There are three separate in-memory stores to keep track of relevant statistics: 1. Sessions store: Keeps track of (anonymized) user sessions observed in the query hose, and for each session, the history of the queries issued in a linked list. Sessions older than a threshold are discarded. Metadata is also tracked separately. 2. Query statistics store: Retains up-to-date statistics, like session count, about individual queries. These also include a weighted count based on a custom scoring function. This function captures things like association is more between 2 consecutively typed queries vs 2 consecutively clicked hash-tags. These weights are periodically decayed to reflect decreasing importance over time. It also keeps additional metadata about the query like its language. 3. Query cooccurrence statistics store: Holds data about pairs of co-occurring queries. Weighting and decaying are applied like in the case of query statistics store. **Query Flow** — As a user query flows through the query hose, query statistics are updated in the query statistics store, it is added to the sessions store and some old queries may be removed. For each previous query in the session, a query cooccurrence is formed with the new query and statistics in the query cooccurrence statistics store are also updated. **Tweet Flow** — As a tweet flows through the firehose, its n-grams are checked to determine whether they are query-like or not. All matching n-grams are processed just like the query above except that the “session” is the tweet itself. **Decay/Prune cycles** — Periodically, all weights are decayed and queries or co-occurrences with scores below predefined thresholds are removed to control the overall memory footprint of the service. Even user sessions with no recent activity are pruned. **Ranking cycles** — Rankers are triggered periodically to generates suggestions for each query based on the various accumulated statistics. Top results are then persisted to HDFS. #### Scalability 1. Since there is no sharding, each instance of the backend processing engine must consume the entire firehose and query hose to keep up with the upcoming data. 2. The memory footprint for retaining various statistics, without any pruning, is very large. But if the footprint is reduced, by say pruning, the quality and coverage of results may be affected. Another approach could be to store less session history and decay the weights more aggressively though it may again affect the quality of the results. #### Lessons Learnt Twitter managed to solve the problem of fast moving big data but their solution is far from ideal. It works well but only for scenario it is fine-tuned for. What is needed is a unified data platform to process for big and fast moving data with varying latency requirements. Twitter’s current implementation is an in-memory engine which mostly uses tweets and search sessions to build the context. Rich parameters like clicks, impressions etc are left out for now to keep the latency under check. Twitter described it as “a patchwork of different processing paradigms”. Though not-so-complete, it is still an important step in the direction of unifying big data and fast data processing systems. A lot of systems exists which solve the problem is pieces eg message queues like Kafka for moving data in real time and Facebook’s ptail for running Hadoop operations in real time but there is no end-to-end general data platform which can adapt itself to perform analytics on both short and long term data and combine their results as per latency bound in different contexts. ![]() |