|
Welcome to ShortScience.org! |
|
|
[link]
Strong consistency is easy to reason about, but typically requires coordination which increases latency. Weaker consistency can improve performance but is difficult to reason about. This paper presents a program analysis and distributed protocol to run transactions with coordination-free strong consistency.
Analysing Transactions. We model a database as a finite map from objects to integers. Transactions are ordered sequences of reads, writes, computations, and prints; this is formalized below. A transaction T executes on a database D to produce a new database D' state and a log G' of printed values. Formally, eval(D, T) = <D', G'>.
Symbolic tables categorize the behavior of a transaction based on the initial database sate. Formally, a symbolic table for transaction T is a binary relation Q of pairs <P, T'> where P is a formula in first order logic describing the contents of a database, and T' is a transaction such that T and T' are observationally equivalent when run on databases satisfying P. A symbolic table can also be built for a set of transactions.
Formally, transactions are expressed in a language L which is essentially IMP with database reads, database writes, and prints. A somewhat simple recursive algorithm walks backwards through the program computing symbolic tables. Essentially, the algorithm traces all paths through the programs control flow.
There is also a higher-level language L++ which can be compiled to L.
Homeostasis Protocol. Assume data is partitioned (not replicated) across a cluster of K nodes. We model a distributed database as a pair <D, Loc> where D is a database and Loc is a function from objects to an index between 1 and K. Each transaction T runs on a site i; formally, l(T) = i. For simplicity, we assume that transactions only write to objects local to the site it is running on.
Each transaction runs on some site. It reads fresh versions of values on the site and stale versions of values on other sites. Nodes establish treaties with one another such that operating with stale data does not affect the correctness of the transaction. This is best explained by way of example. Imagine the following transaction is running on a site where x is remote.
```
x' = r(x)
if x' > 0:
write(y = 1)
else:
write(y = 2)
```
If we establish the treaty x > 0, then it doesn't matter what the actual value of x is. We now formalize this notion.
Given a database D, a local-remote partition is a function p from objects to booleans. We can represent a database D with respect to a local-remote p as a pair (l, r) where l is a vector of values x such that p(x), and r is a vector of values x such that not p(x). In words, we can model a database as disjoint sets of local and remote values.
We say <(l, r), G> = <(l', r') G'> if l = l' and r = r'. Given a database D, local-remote partition p, transaction T, and set of vectors L and R, we say (L, R) is a local-remote slice (LR-slice) for T if Eval((l, r), T) = Eval((l, r'), T) for all l in L and r, r' in R. In words, (L, R) is a local-remote slice for T if T's output depends only on the values of local values.
A global treaty Gamma is a subset of possible database states. A global treaty is valid for a set of transactions {T1, ..., Tn} if ({l | (l, r) in Gamma}, {r | (l, r) in Gamma}) is an LR-slice for all T.
The homoeostasis protocol proceeds in rounds where each round has three phases:
1. Treaty generation The system generates a treaty for the current database state.
2. Normal execution. Transactions can execute without coordination reading a snapshot of remote values. After each site executes a transaction, it checks that it does not bring the database to a state outside the treaty. If it doesn't, the transaction is committed. If it does, we enter the next phase.
3. Cleanup. All sites synchronize and communicate all values that have changed since the last round. All sites then run the transaction that caused the violation. Finally, we enter the next round.
Generating Treaties. Two big questions remain: how do we generate treaties, and how do we enforce treaties?
Given an initial database state D, we could always pick Gamma = {D}. This requires that we synchronize after every single database modification. We want to pick the treaties that let us run as long as possible before synchronizing. We can pick the predicate P in the symbolic table that D satisfies but this isn't guaranteed to be a valid treaty. Instead we take the predicate P and divide it into a set of local treaties P1, ..., PK where the conjunction of all local treaties imply the global treaty. Moreover, each local treaty must be satisfied by the database. The conjunction of the local treaties is our global treaty and is guaranteed to be valid.
Finding good local treaties is not easy. In fact, it can be undecidable pretty easily. We limit ourselves to linear arithmetic and leverage SMT solvers to do the heavy lifting for us. First, we decompose the global treaty into a conjunction of linear constraints. We then generate templates from the constraints and instantiate them using Z3.
Homeostasis in Practice. Roy et al. present a homoeostasis prototype. An offline preprocessing component takes in L++ transactions and computes join symbolic tables, using tricks to keep the tables small. It them initializes global and local treaties. The online execution component executes the homeostasis protocol described above. It is implemented in Java over MySQL. The analysis uses ANTLR-4 and Z3.
![]() |
|
[link]
Impala is a distributed query engine built on top of Hadoop. That is, it builds off of existing Hadoop tools and frameworks and reads data stored in Hadoop file formats from HDFS. Impala's CREATE TABLE commands specify the location and file format of data stored in Hadoop. This data can also be partitioned into different HDFS directories based on certain column values. Users can then issue typical SQL queries against the data. Impala supports batch INSERTs but doesn't support UPDATE or DELETE. Data can also be manipulated directly by going through HDFS. Impala is divided into three components. 1. An Impala daemon (impalad) runs on each machine and is responsible for receiving queries from users and for orchestrating the execution of queries. 2. A single Statestore daemon (statestored) is a pub/sub system used to disseminate system metadata asynchronously to clients. The statestore has weak semantics and doesn't persist anything to disk. 3. A single Catalog daemon (catalogd) publishes catalog information through the statestored. The catalogd pulls in metadata from external systems, puts it in Impala form, and pushes it through the statestored. Impala has a Java frontend that performs the typical database frontend operations (e.g. parsing, semantic analysis, and query optimization). It uses a two phase query planner. 1. Single node planning. First, a single-node non-executable query plan tree is formed. Typical optimizations like join reordering are performed. 2. Plan parallelization. After a single node plan is formed, it is fragmented and divided between multiple nodes with the goal of minimizing data movement and maximizing scan locality. Impala has a C++ backed that uses Volcano style iterators with exchange operators and runtime code generation using LLVM. To efficiently read data from disk, Impala bypasses the traditional HDFS protocols. The backend supports a lot of different file formats including Avro, RC, sequence, plain test, and Parquet. For cluster and resource management, Impala uses a home grown Llama system that sits over YARN. ![]() |
|
[link]
Borg is Google's cluster manager. Users submit jobs, a collection of tasks, to Borg which are then run in a single cell, many of which live inside a single cluster. Borg jobs are either high priority latency-sensitive production jobs (e.g. user facing products and core infrastructure) or low priority non-production batch jobs. Jobs have typical properties like name and owner and can also express constraints (e.g. only run on certain architectures). Tasks also have properties and state their resource demands. Borg jobs are specified in BCL and are bundled as statically linked executables. Jobs are labeled with a priority and must operate within quota limits. Resources are bundled into allocs in which multiple tasks can run. Borg also manages a naming service, and exports a UI called Sigma to developers. Cells are managed by five-way replicated Borgmasters. A Borgmaster communicates with Borglets running on each machine via RPC, manages the Paxos replicated state of system, and exports information to Sigma. There is also a high fidelity borgmaster simulator known as the Fauxmaster which can used for debugging. One subcomponent of the Borgmaster handles scheduling. Submitted jobs are placed in a queue and scheduled by priority and round-robin within a priority. Each job undergoes feasibility checking where Borg checks that there are enough resources to run the job and then scoring where Borg determines the best place to run the job. Worst fit scheduling spreads jobs across many machines allowing for spikes in resource usage. Best fit crams jobs as closely as possible which is bad for bursty loads. Borg uses a scheduler which attempts to limit "stranded resources": resources on a machine which cannot be used because other resources on the same machine are depleted. Tasks that are preempted are placed back on the queue. Borg also tries to place jobs where their packages are already loaded, but offers no other form of locality. Borglets run on each machine and are responsible for starting and stopping tasks, managing logs, and reporting to the Borgmaster. The Borgmaster periodically polls the Borglets (as opposed to Borglets pushing to the Borgmaster) to avoid any need for flow control or recovery storms. The Borgmaster performs a couple of tricks to achieve high scalability. - The scheduler operates on slightly stale state, a form of "optimistic scheduling". - The Borgmaster caches job scores. - The Borgmaster performs feasibility checking and scoring for all equivalent jobs at once. - Complete scoring is hard, so the Borgmaster uses randomization. The Borgmaster puts the onus of fault tolerance on applications, expecting them to handle occasional failures. Still, the Borgmaster also performs a set of nice tricks for availability. - It reschedules evicted tasks. - It spreads tasks across failure domains. - It limits the number of tasks in a job that can be taken down due to maintenance. - Avoids past machine/task pairings that lead to failure. To measure cluster utilization, Google uses a cell compaction metric: the smallest a cell can be to run a given workload. Better utilization leads directly to savings in money, so Borg is very focused on improving utilization. For example, it allows non-production jobs to reclaim unused resources from production jobs. Borg uses containers for isolation. It also makes sure to throttle or kill jobs appropriately to ensure performance isolation. ![]() |
|
[link]
Building fault tolerant systems is hard, like really hard. There are are a couple approaches to building fault-tolerant systems, but none are perfect. - In a bottom-up approach, we verify individual components of a system are fault-tolerant and then glue the components together. Unfortunately, fault-tolerance is not closed under composition: the combination of two fault-tolerant systems may not be fault tolerant. - In a top-down, we can inject faults into an existing system and see whether or not it fails. Fault-injection can discover shallow bugs, but has trouble surfacing bugs that require more complex failure scenarios (e.g. a network partition followed by a particular sequence of machine failures). Lineage-driven Fault Injection (LDFI) is a top-down approach which uses lineage and fault injection to carefully discover fault-tolerance bugs in distributed systems. If a bug is found, the lineage of the bug is given to the user to help discover the root cause of the bug. If no bugs are found, LDFI provides some guarantees that there are no possible bugs for that particular configuration. In a nutshell, LDFI takes a program written in something like Bloom, inputs to the program, and some parameters (e.g. to bound the length the execution, to bound the number of faults, etc.). It runs the given program on the given input and computes a provenance graph of the output. It then carefully selects a small number of faults that invalidate every derivation. It then injects these faults into the system to see if it surfaces a bug. This is repeated until a bug is found or no such bugs exist. In this paper, Alvaro presents LDFI and an LDFI implementation named Molly. #### System Model LDFI injects faults, but not every kind of fault. We'll explore dropped messages, failed nodes, and network partitions. We won't explore message reordering or crash recovery. While we sacrifice generality, we gain tractability. LDFI is governed by three parameters: 1. LDFI does not operate over arbitrarily long executions. A parameter EOT (end of time) prescribes the maximum logical time of any execution examined by LDFI. 2. Distributed systems typically tolerate some number of faults, but cannot possible tolerate complete message loss for example. A parameter EFF (end of finite failures) < EOT sets a logical time after which LDFI will not introduce faults. The time between EFF and EOT allows a distributed system to recover from message losses. 3. A parameter Crashes sets an upper limit on the number of node crashes LDFI will consider. A failure specification is a three-tuple (EOT, EFF, Crashes). Molly will automatically find a good failure specification by repeatedly increasing EOT until programs can create meaningful results and increasing EFF until faults occur. We assume programs are written in Dedalus and that pre- and postconditions are expressed as special relations pre and post in the program. #### LDFI Consider an interaction between Molly and a user trying to implement a fault-tolerant broadcast protocol between three nodes A, B, and C where A begins with a message to broadcast. Our correctness condition asserts that if a message is delivered to any non-failed node, it is delivered to all of them. - Implementation 1. Assume a user writes an incredibly naive broadcast protocol in which A sends a copy of the message to B and C once. Molly drops the message from A to B inducing a bug. - Implementation 2. The author user then amends the program so that A continually sends the message to B and C. Molly drops the message from A to B and then crashes A. C has the message but B doesn't: a bug. - Implementation 3. The author then amends the program so that all three nodes continuously broadcast the message. Now, Molly cannot find a set of dropped messages or node crashes to prevent all three nodes from obtaining the message. - Implementation 4. While implementation 3 is fault-tolerant it is also inefficient. The user amends the protocol so that nodes broadcast messages until they receive an ack from the other nodes. Molly can devise a sequence of message drops and node crashes to prevent the message from being delivered to all three nodes, but when it runs the system again with the same faults, the messages still appear. - Implementation 5. A "classic" implementation of broadcast in which a node broadcasts any message it receives once is found to be buggy by Molly. #### Molly Molly begins by rewriting a Dedalus program into a Datalog program. Each relation is augmented with a time column. ``` foo(A,B) ==> foo(A,B,T) foo(B,C) ==> bar(B,C,T) baz(A,C) ==> baz(A,C,T) ``` The time column of every predicate in the body of a rule is bound to the same variable T. ``` _ :- foo(A,B), bar(B,C) ==> _ :- foo(A,B,T), bar(B,C,T) ``` The head of every deductive rule is bound to T. ``` baz(A,C) :- foo(A,B), bar(B,C) ==> baz(A,C,T) :- foo(A,B,T), bar(B,C,T) ``` The head of every inductive rule is bound to T + 1. ``` baz(A,C) :- foo(A,B), bar(B,C) ==> baz(A,C,T+1) :- foo(A,B,T), bar(B,C,T) ``` For asynchronous rules, we introduce a Clock(From, To, T) relation which contains an entry (n, m, T) if node n sent a message to m at time T. Then, the body of asynchronous rules at node n whose heads are destined for node n are augmented with a Clock(n, m, t) predicate while the head is augmented with T + 1. Molly can add and remove entries from Clock to simulate faults. ``` foo(A,B) :- foo(B,A) ==> foo(A,B,T+1) :- foo(B,A,T), Clock(B,A,T) ``` It then rewrites the Datalog program to maintain its own provenance graph, and extracts lineage from the graph via recursive queries that walk the graph. Given an execution of the Datalog program, Molly generates a CNF formula where the disjuncts inside each conjunct x1 or .. or xn represent a message drop or node failure that would invalidate a particular derivation. If all derivations can be invalidated, then the formula is unsatisfiable and the program is fault-tolerant. If the formula is satisfiable, then each satisfied conjunct represents a counterexample of the derivation. Molly uses a SMT solver to solve these formulas. ![]() |
|
[link]
Many distributed databases geo-replicate data for (i) lower read latency and (ii) higher fault tolerance in the face of an extreme failure (e.g. lightning striking a data center). Implementing strong consistency over a geo-replicated system can incur tremendous write latency, as updates have to coordinate between geographically distant data centers. On the other hand, weak consistency is a real brain buster. This paper introduces a new consistency model between weak and strong consistency, explicit consistency, which takes into account user specified invariants. It also presents an explicitly consistent system which
1. performs static analysis to determine which operations can be executed without coordination,
2. uses invariant-repair or violation-avoidance to resolve or avoid conflicts, and
3. instruments user code with calls to middleware.
The system is called Indigo and is built on an existing causally consistent key-value store with various properties.
Explicit Consistency. A database is a collection of objects replicated across data centers. Users issue reads and writes as part of transactions, and these transactions are asynchronously replicated between data centers. We denote by t(S) the database state achieved by applying transaction t to database state S. S_n is the database state achieved after the nth transaction. That is, $S_n = t_n(...(t_1(t_init))...)$. $T(S_n) = \{t_1, ..., t_n\}$ is the set of transactions used to create S_n. We say a transaction t_a happens before a transaction $t_b$, denoted $t_a \rightarrow t_b$, if t_a is in $T(S_b)$. $O = (T,\rightarrow)$ is a partial order. $O' = (T, <)$ is a serialization of $O$ if $<$ is total and respects $\rightarrow$.
Given an invariant $I$, we say $S$ is $I$-valid if $I(S) = true$. $(T, <)$ is an I-valid serialization if I holds on all prefixes of the serialization. If a system ensures that all serializations are I-valid, it provides explicit consistency. In other words, an explicitly consistent database ensures invariants always hold. This builds off of Bailis et al.'s notion of invariant-confluence.
Determining $I$-offender Sets. An invariant is a universally quantified first order logic formula in prenex normal form. The invariant can include uninterpreted functions like Player($P$) and enrolled($P, T$).
A postcondition states how operations affect the truth values of the uninterpreted functions in invariants. Every operation is annotated with postconditions. A predicate clause directly alters the truth assignments of a predicate (e.g. not Player(P)). A function clause relates old and new database states (e.g. nrPlayers(T) = nrPlayers(T) + 1).
This language is rather expressive, as evidenced by multiple examples in the paper.
A set of transactions is an I-offender if it is not invariant-confluent. First, pairs of operations are checked to see if a contradictory truth assignment is formed (e.g. Player(P) and not Player(P)). Then, every pair of transactions is considered. Given the weakest liberal precondition of the transactions, we substitute the effects of the transactions into the invariant to get a formula. We then check for the validity of the formula using Z3. If the formula is valid, the transactions are invariant-confluent.
Handling I-offender Sets. There are two ways to handle I-offenders: invariant-repair and violation-avoidance. Invariant-repair involves CRDTs; the bulk of this paper focuses on violation-avoidance which leverages existing reservation and escrow transaction techniques.
- UID generation. Unique identifiers can easily be generated without coordination. For example, a node can concatenate an incrementing counter with its MAC address.
- Multi-level lock reservation. Locking is the most general form of reservation. Locks come in three flavors: (i) shared forbid, (ii) shared allow, and (iii) exclusive allow. Transactions acquire locks to avoid invariant violation. For example, an enrollTournament could acquire a sharedForbid lock on removing players, and removePlayer could acquire a sharedAllow lock. Exclusive allow are used for self-conflicting operations.
- Multi-level mask reservation. If our invariant is a disjunction P1 or ... or Pn, then to preserve the invariant, we only need to guarantee that at least one of the disjuncts remains true. A mask reservation is a vector of locks where an operation can falsify one of the disjuncts only after acquiring a lock on another true disjunct preventing it from being falsified.
- Escrow reservation. Imagine our invariant is x >= k and x has initial value x0. Escrow transactions allocate x0 - k rights. A transaction can decrement x only after acquiring and spending a right. When x is incremented, a right is generated. This gets tricker for invariants like |A| >= k where concurrent additions could generate too many rights leading to an invariant violation. Here, we use escrow transactions for conditions, where a primary is allocated for each reservation. Rights are not immediately generated; instead, the primary is responsible for generating rights.
- Partition lock reservation. Partition locks allow operations to lock a small part of an object. For example, an operation could lock part of a timeline to ensure there are no overlapping timespans.
There are many ways to use reservations to avoid invariant violations. Indigo uses heuristics and estimated operation frequencies to try and minimize reservation acquisitions.
Implementation. Indigo can run over any key-value store that offers (i) causally consistency, (ii) snapshot transactions with CRDTs, and (iii) linearizability within a data center. Currently, it uses Swiftcloud. Its fault tolerance leverages the underlying fault tolerance of the key-value store. Each reservation is stored as an object in the key-value store, where operations are structured as transfers to avoid some concurrency oddities.
![]() |