[link]
In fear of fettering development and innovation, companies often allow engineers free reign to generate and analyze datasets at will. This often leads to unorganized data lakes: a ragtag collection of datasets from a diverse set of sources. Google Dataset Search (Goods) is a system which uses unobstructive post-hoc metadata extraction and inference to organize Google's unorganized datasets and present curated dataset information, such as metadata and provenance, to engineers. Building a system like Goods at Google scale presents many challenges. - Scale. There are 26 billion datasets. 26 billion (with a b)! - Variety. Data comes from a diverse set of sources (e.g. BigTable, Spanner, logs). - Churn. Roughly 5% of the datasets are deleted everyday, and datasets are created roughly as quickly as they are deleted. - Uncertainty. Some metadata inference is approximate and speculative. - Ranking. To facilitate useful dataset search, datasets have to be ranked by importance: a difficult heuristic-driven process. - Semantics. Extracting the semantic content of a dataset is useful but challenging. For example consider a file of protos that doesn't reference the type of proto being stored. The Goods catalog is a BigTable keyed by dataset name where each row contains metadata including - basic metatdata like timestamp, owners, and access permissions; - provenance showing the lineage of each dataset; - schema; - data summaries extracted from source code; and - user provided annotations. Moreover, similar datasets or multiple versions of the same logical dataset are grouped together to form clusters. Metadata for one element of a cluster can be used as metadata for other elements of the cluster, greatly reducing the amount of metadata that needs to be computed. Data is clustered by timestamp, data center, machine, version, and UID, all of which is extracted from dataset paths (e.g. /foo/bar/montana/August01/foo.txt). In addition to storing dataset metadata, each row also stores status metadata: information about the completion status of various jobs which operate on the catalog. The numerous concurrently executing batch jobs use status metadata as a weak form of synchronization and dependency resolution, potentially deferring the processing of a row until another job has processed it. The fault tolerance of these jobs is provided by a mix of job retries, BigTable's idempotent update semantics, and a watchdog that terminates divergent programs. Finally, a two-phase garbage collector tombstones rows that satisfy a garbage collection predicate and removes them one day later if they still match the predicate. Batch jobs do not process tombstoned rows. The Goods frontend includes dataset profile pages, dataset search driven by a handful of heuristics to rank datasets by importance, and teams dashboard. |
[link]
There's an enormous number of stream (a.k.a. real-time, interactive) processing systems in the wild: Twitter Storm, Twitter Heron, Google Millwheel, LinkedIn Samza, Spark Streaming, Apache Flink, etc. While all similar, the stream processing systems differ in their ease of use, performance, fault-tolerance, scalability, correctness, etc. In this paper, Facebook discusses the design decisions that go into developing a stream processing system and discusses the decisions they made with three of their real-time processing systems: Puma, Swift, and Stylus. Systems Overview. - Scribe. Scribe is a persistent messaging system for log data. Data is organized into categories, and Scribe buckets are the basic unit of stream processing. The data pushed into scribe is persisted in HDFS. - Puma. Puma is a real-time data processing system in which applications are written in a SQL-like language with user defined functions written in Java. The system is designed for compiled, rather than ad-hoc, queries. It used to compute aggregates and to filter Scribe streams. - Swift. Swift is used to checkpoint Scribe streams. Checkpoints are made every N strings or every B bytes. Swift is used for low-throughput applications. - Stylus. Stylus is a low-level general purpose stream processing system written in C++ and resembles Storm, Millwheel, etc. Stream processors are organized into DAGs and the system provides estimated low watermarks. - Laser. Laser is a high throughput, low latency key-value store built on RocksDB. - Scuba. Scuba supports ad-hoc queries for debugging. - Hive. Hive is a huge data warehouse which support SQL queries. Example Application. Imagine a stream of events, where each event belongs to a single topic. Consider a streaming application which computes the top k events for each topic over 5 minute windows composed of four stages: 1. Filterer. The filterer filter events and shards events based on their dimension id. 2. Joiner. The joiner looks up dimension data by dimension id, infers the topic of the event, and shards output by (event, topic). 3. Scorer. The scorer maintains a recent history of event counts per topic as well as some long-term counts. It assigns a score for each event and shards output by topic. 4. Ranker. The ranker computes the top k events per topic. The filterer and joiner are stateless; the scorer and ranker are stateful. The filterer and ranker can be implemented in Puma. All can be implemented in Stylus. Language Paradigm. The choice of the language in which users write applications can greatly impact a system's ease of use: - Declarative. SQL is declarative, expressive, and everyone knows it. However, not all computations can be expressed in SQL. - Functional. Frameworks like Dryad and Spark provide users with a set of built-in operators which they chain together. This is more flexible that SQL. - Procedural. Systems like Storm, Heron, and Samza allow users to form DAGs of arbitrary processing units. Puma uses SQL, Swift uses Python, and Stylus uses C++. Data Transfer. Data must be transferred between nodes in a DAG: - Direct message transfer. Data can be transferred directly with something like RPCs or ZeroMQ. Millwheel, Flink, and Spark Streaming do this. - Broker based message transfer. Instead of direct communication, a message broker can be placed between nodes. This allows an output to be multiplexed to multiple outputs. Moreover, brokers can implement back pressure. Heron does this. - Persistent message based transfer. Storing messages to a persistent messaging layer allows data to be multiplexed, allows for different reader and writer speeds, allows data to be read again, and makes failures independent. Samza Puma, Swift, and Stylus do this. Facebook connects its systems with Scribe for the following benefits: - Fault Tolerance: If the producer of a stream fails, the consumer is not affected. This failure independence becomes increasingly useful at scale. - Fault Tolerance: Recovery can be faster because only nodes need to be replaced. - Fault Tolerance: Multiple identical downstream nodes can be run to improve fault-tolerance. - Performance: Different nodes can have different read and write latencies. The system doesn't propagate back pressure to slow down the system. - Ease of Use: The ability to replay messages makes debugging easier. - Ease of Use: Storing messages in Scribe makes monitoring and alerting easier. - Ease of Use: Having Scribe as a common substrate lets different frameworks communicate with one another. - Scalability: Changing the number of Scribe buckets makes it easy to change the number of partitions. Processing Semantics. Stream processors: 1. Proccess inputs, 2. Generate output, and 3. checkpoint state, stream offsets, and outputs for recovery. Each node has - state semantics: can each input affect state at least once, at most once, or exactly once. - output semantics: can each output be produced at least once, at most once, or exactly once. For state semantics, we can achieve - at least once by saving state before saving stream offsets, - at most once by saving stream offsets before saving state, and - exactly once by saving both state and stream offsets atomically. For output semantics, we can achieve - at least once by saving output before offset/state, - at most once by saving offset/state before output, - exactly once by saving output and offset/state atomically. At-least-once semantics is useful when low latency is more important than duplicate records. At most once is useful when loss is preferred over duplication. Puma guarantees at least once state and output semantics, and Stylus supports a whole bunch of combinations. State-saving Mechanisms. Node state can be saved in one of many ways: - Replication. Running multiple copies of a node provides fault tolerance. - Local DB. Nodes can save their state to a local database like LevelDB or RocksDB. - Remote DB. Nodes can save their state to remote databases. Millwheel does this. - Upstream backup. Output messages can be buffered upstream in case downstream nodes fail. - Global consistent snapshot. Flink maintains globally consistent snapshots. Stylus can save to a local RocksDB instance with data asynchronously backed up to HDFS. Alternatively, it can store to a remote database. If a processing unit forms a monoid (identity element with associate operator), then input data can be processed and later merged into the remote DB. Backfill Processing. Being able to re-run old jobs or run new jobs on old data is useful for a number of reasons: - Running new jobs on old data is great for debugging. - Sometimes, we need to run a new metric on old data to generate historical metrics. - If a node has a bug, we'd like to re-run the node on the data. To re-run processing on old data, we have three choices: - Stream only. - Separate batch and streaming systems. This can be very annoying and hard to manage. - Combined batch and streaming system. This is what Spark streaming, Flink, and Facebook does. Puma and Stylus code can be run as either streaming or batch applications. |
[link]
Overview. Strong consistency is expensive. The alternative, weak consistency, is hard to program against. The lack of distributed synchronization or consensus in a weakly consistent system means that replica state can diverge. Existing systems try to hide this divergence with causal consistency to deal with read-write conflicts and per-object eventual convergence to deal with write-write conflicts, but neither is sufficient to deal with complex multi-object write-write conflicts. As a motivating example, imagine a Wikipedia article for Donald Trump with three parts: some text, an image, and some references. In one partition, Hillary modifies the text to oppose Trump and subsequently, Tim changes the picture to a nastier image of Trump. In another picture, Trump modifies to the text to support Trump and subsequently, Mike changes the references to link to pro-Trump websites. Later, the partitions need to be merged. The write-write conflict on the text needs to be reconciled. Moreover, the modifications to the image and references do not produce conflicts but still need to be updated to match the text. Existing systems solve this in one of two ways: 1. syntactic conflict resolution: Some policy like last-writer-wins is chosen. 2. lack of cross-object semantics: Users are forced to merge individual objects. TARDiS (Transactional Asynchronously Replicated Divergent Store) is a distributed weakly-consistent transactional key-value store that supports branching computation as a core abstraction. When working off a single branch, applications are shielded from diverging computations. Conversely, applications can merge branches and reconcile conflicts. By allowing applications to merge entire branches rather than single objects, users have the ability to perform semantically rich multi-object merges. TARDiS employs three techniques: 1. branch-on-conflict 2. inter-branch isolation 3. application-driven cross-object merge and has the following properties: 1. TARDiS knows history: TARDiS maintains a DAG of branching execution and uses DAG compression to minimize memory overhead. 2. TARDiS merges branches, not objects: TARDiS allows applications to merge branches rather than single-objects. 3. TARDiS is expressive: TARDiS supports various isolation levels. 4. TARDiS improves performance of the local site: Branching on conflict and deferring merge until later can improve performance in a local setting as well as in a distributed setting. Architecture. TARDiS is a distributed multi-master key value store with asynchronous replication. There are four main layers to TARDiS: 1. Storage layer: this layer implements a disk-backed multiversion B-tree. 2. Consistency layer: this layer maintains the DAG of execution branches where each vertex is a logical state. 3. Garbage collection layer: this layer performs DAG compression and record pruning. 4. Replicator service layer: this layer propagates transactions. Interface. Applications use TARDiS in either 1. single-mode, where transaction execute on a single branch, or 2. multi-mode, where a transaction can read from multiple branches and create a new merged branch. TARDiS provides an API to find the set of conflicting writes for a set of branches, the find the fork points for a set of branches, and to get the versioned values of objects. It also supports varying levels of begin and end constraints including serializability, snapshot isolation, read committed, etc. Here's an example TARDiS application that implements a counter. ``` func increment(counter) Tx t = begin(AncestorConstraint) int value = t.get(counter) t.put(counter, value + 1) t.commit(SerializabilityConstraint) func decrement(counter) Tx t = begin(AncestorConstraint) int value = t.get(counter) t.put(counter, value - 1) t.commit(SerializabilityConstraint) func merge() Tx t = beginMerge(AnyConstraint) forkPoint forkPt = t.findForkPoints(t.parents).first int forkVal = t.getForId(counter, forkPt) list<int> currentVals = t.getForId(counter, t.parents) int result = forkVal foreach c in currentVals result += (c - forkVal) t.put(counter, result) t.commit(SerializabilityConstraint) ``` Design and Implementation. When a transaction begins, TARDiS performs a BFS from the leaves of the state DAG to find the first state satisfying the begin constraint. When a transaction is committed, TARDiS walks down the state DAG as far as possible until a state is reached that doesn't satisfy the end constraint, branching if necessary. Branches are represented as a set of (b, i) pairs to indicate the bth child of the ith node. Keys are mapped to a topologically sorted list of versions used during reading. The garbage collector performs DAG compression, eliminating unneeded states in the state DAG. It also prunes record unneeded record versions after DAG compression. |
[link]
For a traditional single-node database, the data that a transaction reads and writes is all on a single machine. For a distributed OLTP database, there are two types of transactions: 1. Local transactions are transactions that read and write data on a single machine, much like traditional transactions. A distributed database can process a local transaction as efficiently as a traditional single-node database can. 2. Distributed transactions are transactions that read and write data that is spread across multiple machines. Typically, distributed databases use two-phase commit (2PC) to commit or abort distributed transactions. Because 2PC requires multiple rounds of communications, distributed databases process distributed transactions less efficiently than local transactions. This paper presents an alternative to 2PC, dubbed Localizing Executions via Aggressive Placement of data (LEAP), which tries to avoid the communication overheads of 2PC by aggressively moving all the data a distributed transaction reads and writes onto a single machine, effectively turning the distributed transaction into a local transaction. #### LEAP LEAP is based on the following assumptions and observations: - Transactions in an OLTP workload don't read or write many tuples. - Tuples in an OLTP database are typically very small. - Multiple transactions issued one after another may access the same data again and again. - As more advanced network technology becomes available (e.g. RDMA), the cost of moving data becomes smaller and smaller. With LEAP, tuples are horizontally partitioned across a set of nodes, and each tuple is stored exactly once. Each node has two data structures: - a data table which stores tuples, and - a horizontally partitioned owner table key-value store which stores ownership information. Consider a tuple $d = (k, v)$ with primary key $k$ and value $v$. The owner table contains an entry $(k, o)$ indicating that node o owns the tuple with key $k$. The node $o$ contains a $(k, v)$ entry in its data table. The owner table key-value store is partitioned across nodes using any arbitrary partitioning scheme (e.g. hash-based, range-based). When a node initiates a transaction, it requests ownership of every tuple it reads and writes. This migrates the tuples to the initiating node and updates the ownership information to reflect the ownership transfer. Here's how the ownership transfer protocol works. For a given tuple $d = (k, v)$, the requester is the node requesting ownership of $d$, the partitioner is the node with ownership information $(k, o)$, and the owner is the node that stores $d$. - First, the requester sends an owner request with key k to the partitioner. - Then, the partitioner looks up the owner of the tuple with k in its owner table and sends a transfer request to the owner. - The owner retrieves the value of the tuple and sends it in a transfer response back to the requester. It also deletes its copy of the tuple. - Finally, the requester sends an inform message to the partitioner informing it that the ownership transfer was complete. The partitioner updates its owner table to reflect the new owner. Also note that - if the requester, partitioner, and owner are all different nodes, then this scheme requires 4 messages, - if the partitioner and owner are the same, then this scheme requires 3 messages, and - if the requester and partitioner are the same, then this scheme requires 2 messages. If the transfer request is dropped and the owner deletes the tuple, data is lost. See the appendix for information on how to make this ownership transfer fault tolerant. Also see the paper for a theoretical comparison of 2PC and LEAP. #### LEAP-Based OLTP Engine L-Store is a distributed OLTP database based on H-Store which uses LEAP to manage transactions. Transactions acquire read/write locks on individual tuples and use strict two-phase locking. Transactions are assigned globally unique identifiers, and deadlock prevention is implemented with a wait-die scheme where lower timestamped transactions have higher priority. That is, higher priority threads wait on lower priority threads, but lower priority threads abort rather than wait on higher priority threads. Concurrent local transactions are processed as usual; what's interesting is concurrent transfer requests. Imagine a transaction is requesting ownership of a tuple on another node. - First, the requester creates a request lock locally indicating that it is currently trying to request ownership of the tuple. It then sends an owner request to the partitioner. - The partitioner may receive multiple concurrent owner requests. It processes them serially using the wait-die scheme. As an optimization, it processes requests in decreasing timestamp order to avoid aborts whenever possible. It then forwards a transfer request to the owner. - If the owner is currently accessing the tuple being requested, it again uses a wait-die scheme to access the tuple before sending it back to the owner. - Finally, the owner changes the request lock into a normal data lock and continues processing. If a transaction cannot successfully get ownership of a tuple, it aborts. L-Store also uses logging and checkpointing for fault tolerance (see paper for details). |