[link]
Serializability is the gold standard of consistency, but databases have always provided weaker consistency modes (e.g. Read Committed, Repeatable Read) that promise improved performance. In this paper, Bailis et al. determine which of these weaker consistency models can be implemented with high availability. First, why is high availability important? 1. Partitions. Partitions happen, and when they do non-available systems become, well, unavailable. 2. Latency. Partitions may be transient, but latency is forever. Highly available systems can avoid latency by eschewing coordination costs. Second, are weaker consistency models consistent enough? In short, yeah probably. In a survey of databases, Bailis finds that many do not employ serializability by default and some do not even provide full serializability. Bailis also finds that four of the five transactions in the TPC-C benchmark can be implemented with highly available transactions. After defining availability, Bailis presents the taxonomy of which consistency can be implemented as HATs, and also argues why some fundamentally cannot. He also performs benchmarks on AWS to show the performance benefits of HAT. |
[link]
A parameter server is more or less a distributed key-value store optimized for training machine learning models. For example, imagine we're learning a weight vector $w = (w_1, w_2, w_3)$ using logistic regression. We can distribute $w$ across two shards of the parameter server where one shard stores $(1, w_1)$ and the other stores $(2, w_2)$ and $(3, w_3)$. Worker nodes can then read parts of the weight vector, perform some computation, and write back parts of the weight vector. This paper presents an optimized parameter server with the following features: 1. Efficient communication. 2. Flexible consistency models. 3. Elastic scalability. 4. Fault tolerance and durability. 5. Ease of use. #### Machine Learning Many machine learning algorithms try to find a weight vector $w \in \mathbb{R}^d$ that minimizes a regularized cost function of the following form: $$ F(w) = \Omega(w) + \sum_{i=1}^n l(x_i, y_i, w) $$ When $n$ and $d$ get really big, it becomes intractable to run these algorithms on a single machine. Instead, we have to parallelize the algorithm across multiple machines. As an example, consider how to perform distributed batch gradient descent across $m$ workers. We initialize $w$ and store it in a parameter server. Concurrently, each worker begins by reading $\frac{1}{m}$ of the training data. Then, every worker reads $w$ from the parameter server and computes a gradient with respect to its local training data (actually, it only needs to read the relevant parts of $w$). Then, it pushes its gradient to the parameter server. Once the server receives gradients from every worker, it aggregates them together and updates $w$. Finally, workers pull the updated $w$ and loop. #### Architecture A parameter server consists of a bunch of servers that store weights and a bunch of workers that perform computations with the weights (e.g. compute gradients). Servers are organized into a server group managed by a server manager. Workers are organized into multiple worker groups, and each worker group is managed by a task scheduler. The server manager manages which data is assigned to which server. The task scheduler assigns tasks to workers and monitors progress. Parameters are stores as key-value pairs. For example, a weight vector $w \in \mathbb{R}^d$ can be stored as a set of pairs $(i, w_i)$ for $1 \leq 1 \leq d$. To store sparse vectors more efficiently, only non-zero entries of $w$ must be explicitly stored. If a pair $(i, w_i)$ is missing, the parameter server assumes $w_i = 0$. Most machine learning algorithms do not update individual entries of a weight vector at a time. Instead, they typically update part of a matrix or part of a vector. To take advantage of this workload pattern, the parameter server allows workers to read and write ranges of values instead of single values at a time. This reduces network overhead. In addition to pushing and pulling entries of $w$, workers can also register user-defined functions to run at a server. For example, a server can compute the gradient of a regularization term. By default, the parameter server runs tasks asynchronously. That is, if a worker issues a pull or push request, it does not block. However, the parameter server also allows workers to explicitly mark dependencies between different requests which forces them to serialize. Some algorithms are robust to weir inconsistencies. These algorithms can often run faster with weaker consistency. The parameter server provides three levels of consistency: 1. Sequential consistency in which every request is totally serialized. 2. Eventual consistency in which requests are run whenever they please. 3. Bounded delay in which a request is delayed until all tasks that began τ time ago have completed. Users can also specify that a certain consistency model only apply to a certain subset of key-value pairs as specified by a filter. #### Implementation Data is partitioned across servers using consistent hashing, and the server manager records the assignment of key ranges to machines. When a new server joins: 1. The server manager assigns a new key range to the server. 2. The server fetches its data from other servers. 3. The server manager broadcasts the update to the other servers who relinquish data they no longer own. The parameter server uses chain replication to replicate data. Each node forms a chain with the $k$ previous nodes in the hashing ring. Workers send updates to the master which is chain replicated to the next $k$ servers. Logically, the parameter server tags each key-value pair with a vector clock (though honestly, I'm not exactly sure I understand why). Physically, each range of key-value pairs is associated with a vector clock. This range-based vector clock avoids storing redundant vector clock information. Messages sent from workers to servers are compressed with Snappy. Moreover, servers cache parts of messages, and workers can send a hash instead of a whole message if they the think a server has it cached. |
[link]
Modelling a distributed system as a replicated state machine provides the illusion that the distributed system is really just a single machine. At the core of the replicated state machine approach is a replicated log that is kept consistent by a consensus algorithm. Traditionally, consensus has been synonymous with Paxos. Paxos is taught in schools, and most consensus algorithm implementations are based on Paxos. However, Paxos has two main disadvantages: 1. It is hard to understand. Single-decree Paxos is nuanced, and composing single-decree Paxos into multi-Paxos is confusing. 2. It is hard to implement efficiently. Multi-Paxos is not very well described in the literature, and the algorithm is difficult to implement efficiently without modification. This paper presents the Raft consensus algorithm. Raft provides the same performance and safety as multi-Paxos but it is designed to be much easier to understand. Basics. Every node in a raft cluster is in one of three states: leader, follower, or candidate. The leader receives requests from users and forwards them to followers. Followers are completely passive and receive messages from leaders. Candidates perform leader elections in an attempt to become a leader. In normal operation, there is a single leader, and every other node is a follower. Raft proceeds in a series of increasingly numbered terms. Each term consists of a leader election followed by (potentially) normal operation. There is exactly one leader elected per term. Moreover, each node participates in monotonically increasing terms. When a node sends a message in Raft, it annotates it with its term. If a leader receives a message from a later term, it immediately becomes a follower. Nodes ignore messages annotated with older terms. Raft uses two RPCs: RequestVote (for leader election) and AppendEntries (for replication and heartbeats). Leader Election. Leaders periodically send heartbeats (AppendEntries RPCs without any entries) to followers. As long as a follower continues to receive heartbeats, it continues to be a follower. If a follower does not receive a heartbeat after a certain amount of time, it begins leader election: it increments its term, enters the candidate state, votes for itself, and sends RequestVote RPCs in parallel to all other nodes. Either, 1. It wins. Nodes issue a single vote per term on a first come first serve basis. If a candidate receives a vote from a majority of the nodes, then it becomes leader. 2. It hears from another leader. If a candidate receives a message from another leader in a term at least as large as it, it becomes a follower. 3. It times out. It's possible that a split vote occurs and nobody becomes leader in a particular term. If this happens, the candidate times out after a certain amount of time and begins another election in the next term. Log Replication. During normal operation, a leader receives a request from a client, appends it to its log annotated with the current term, and issues AppendEntries to all nodes in parallel. An entry is considered committed after it is replicated to a majority of the nodes. Once a log entry is committed, all previous log entries are also committed. Once a log entry is committed, the leader can apply it and respond to the user. Moreover, once an entry is committed, it is guaranteed to eventually execute at all available nodes. The leader keeps track of the index of the largest committed entry and sends it to all other nodes so that they can also apply log entries. Raft satisfies a powerful log matching invariant: 1. "If two entries in different logs have the same index and term, then they store the same command." 2. "If two entries in different logs have the same index and term, then the logs are identical in all preceding entries." 1 is ensured by the fact that a single leader is elected for any given term, the fact that a leader only creates a single log entry per index, and the fact that once a log entry is created, it never changes index. 2 is ensured by a runtime check. When a leader sends an AppendEntries RPC for a particular index, it also sends its log entry for the previous index. The follower only applies the AppendEntries RPC if it agrees on the previous index. Inductively, this guarantees 2. Followers may have missing or extraneous log entries. When this happens, the leader identifies the longest prefix on which the two agree. It then sends the rest of its log. The follower overwrites its log to match the leader. Safety. The protocol described so far is unsafe. If a new leader is elected, it can accidentally force followers to overwrite committed values with uncommitted values. Thus, we must ensure that leaders contain all committed entries. Other consensus algorithms ensure this by shipping committed values to newly elected leaders. Raft takes an alternative approach and guarantees that if a leader is elected, it has every committed entry. To ensure this, Raft must restrict which nodes can be elected. A follower rejects a RequestVote RPC if the requesting candidate's log is not as up-to-date as its log. One log is as up-to-date as another if its last entry has a higher term or has the same term but is longer. Since a candidate must receive a majority of votes and committed values have been replicated to a majority of nodes, a candidate must contact a node with all committed values during its election which will prevent it from being elected if it doesn't have all the committed log entries. To prevent another subtle bug, leaders also do not directly commit values from previous terms. They only commit values from their own term which indirectly commits previous log entries from previous terms. Cluster Membership Changes. A Raft cluster cannot be instantaneously switched from one configuration to another. For example consider a cluster moving from 3 to 5 nodes. It's possible that two nodes are elected master for the same term which can lead to a safety violation. Instead, the cluster transitions to a joint consensus phase where decisions require a majority from both the old and new configuration. Once a majority of nodes accept the new configuration, the cluster can transition to it. |
[link]
When running an application in the cloud, users have to trust (i) the cloud provider's software, (ii) the cloud provider's staff, and (iii) law enforcement with the ability to access user data. Intel SGX partially solves this problem by allowing users to run small portions of program on remote servers with guarantees of confidentiality and integrity. Haven leverages SGX and Drawbridge to run entire legacy programs with shielded execution. Haven assumes a very strong adversary which has access to all the system's software and most of the system's hardware. Only the processor and SGX hardware is trusted. Haven provides confidentiality and integrity, but not availability. It also does not prevent side-channel attacks. There are two main challenges that Haven's design addresses. First, most programs are written assuming a benevolent host. This leads to Iago attacks in which the OS subverts the application by exploiting its assumptions about the OS. Haven must operate correctly despite a malicious host. To do so, Haven uses a library operation system LibOS that is part of a Windows sandboxing framework called Drawbridge. LibOS implements a full OS API using only a few core host OS primitives. These core host OS primitives are used in a defensive way. A shield module sits below LibOS and takes great care to ensure that LibOS is not susceptible to Iago attacks. The user's application, LibOS, and the shield module are all run in an SGX enclave. Second, Haven aims to run unmodified binaries which were not written with knowledge of SGX. Real world applications allocate memory, load and run code dynamically, etc. Many of these things are not supported by SGX, so Haven (a) emulated them and (b) got the SGX specification revised to address them. Haven also implements an in-enclave encrypted file system in which only the root and leaf pages need to be written to stable storage. As of publication, however, Haven did not fully implement this feature. Haven is susceptible to replay attacks. Haven was evaluated by running Microsoft SQL Server and Apache HTTP Server. |
[link]
Storm is Twitter's stream processing system designed to be scalable, resilient, extensible, efficient, and easy to administer. In Storm, streams of tuples flow through (potentially cyclic) directed graphs, called topologies, of processing elements. Each processing element is either a spout (a source of tuples) or a bolt (a tuple processor). Storm Overview. Storm runs on a cluster, typically over something like Mesos. Each Storm cluster is managed by a single master node knows as a Nimbus. The Nimbus oversees a cluster of workers. Each worker runs multiple worker processes which run a JVM which run multiple executors which run multiple tasks: ``` worker +--------------------------------------------------------------+ | worker process worker process | | +---------------------------+ +---------------------------+ | | | JVM | | JVM | | | | +-----------------------+ | | +-----------------------+ | | | | | executor executor | | | | executor executor | | | | | | +--------+ +--------+ | | | | +--------+ +--------+ | | | | | | | +----+ | | +----+ | | | | | | +----+ | | +----+ | | | | | | | | |task| | | |task| | | | | | | |task| | | |task| | | | | | | | | +----+ | | +----+ | | | | | | +----+ | | +----+ | | | | | | | | +----+ | | +----+ | | | | | | +----+ | | +----+ | | | | | | | | |task| | | |task| | | | | | | |task| | | |task| | | | | | | | | +----+ | | +----+ | | | | | | +----+ | | +----+ | | | | | | | +--------+ +--------+ | | | | +--------+ +--------+ | | | | | +-----------------------+ | | +-----------------------+ | | | +---------------------------+ +---------------------------+ | | supervisor | | +----------------------------------------------------------+ | | | | | | +----------------------------------------------------------+ | +--------------------------------------------------------------+ ``` Users specify a topology which acts as a logical topology. Storm exploits data parallelism by expanding the logical topology into a physical topology in which each logical bolt is converted into a replicated set of physical bolts. Data is partitioned between producer and consumer bolts using one of the following partitioning scheme: - shuffle: Data is randomly shuffled. - fields: Data is hash partitioned on a subset of fields. - all: All data is sent to all downstream bolts. - global: All data is sent to a single bolt. - local: Data is sent to a task running on the same executor. Each worker runs a supervisor which communicates with the Nimbus. The Nimbus stores its state in Zookeeper. Storm Internals. Nimbus and Zookeeper. In Storm, topologies are represented as Thrift objects, and the Nimbus is a Thrift server which stores topologies in Zookeeper. This allows topologies to be constructed in any programming language or framework. For example, Summingbird is a Scala library which can compile dataflows to one of many data processing systems like Storm or Hadoop. Users also send over a JAR of the code to the Nimbus which stores it locally on disk. Supervisors advertise openings which the Nimbus fills. All communication between workers and the Nimbus is done through Zookeeper to increase the resilience of the system. Supervisor. Each worker runs a supervisor process which is responsible for communicating with the Nimbus, spawning workers, monitoring workers, restarting workers, etc. The supervisor consists of three threads: (1) a main thread, (2) an event manager thread, and (3) a process event manager thread. The main thread sends heartbeats to the Nimbus every 15 seconds. The event manager thread looks for assignment changes every 10 seconds. The process event manager thread monitors workers every 3 seconds. Workers and Executors. Each executor is a thread running in a JVM. Each worker process has a thread to receive tuples and thread to send tuples. The receiving thread multiplexes tuples to different tasks' input queues. Each executor runs (1) a user logic thread which reads tuples from its input queue and processes them and (2) an executor send thread which puts outbound tuples in a global outbound queue. Processing Semantics. Storm provides at most once and at least once semantics. Each tuple in the system is assigned a unique 64 bit identifier. When a bolt processes a tuple, it can generate new tuples. Each of these tuples is also given a unique identifier. The lineage of each tuple is tracked in a lineage tree. When a tuple leaves the system, all bolts that contributed to it are acknowledged and can retire their buffered output. Storm implements this using a memory-efficient representation that uses bitwise XORs. Commentary. The paper doesn't mention stateful operators. |
[link]
Strong consistency is easy to reason about, but typically requires coordination which increases latency. Weaker consistency can improve performance but is difficult to reason about. This paper presents a program analysis and distributed protocol to run transactions with coordination-free strong consistency. Analysing Transactions. We model a database as a finite map from objects to integers. Transactions are ordered sequences of reads, writes, computations, and prints; this is formalized below. A transaction T executes on a database D to produce a new database D' state and a log G' of printed values. Formally, eval(D, T) = <D', G'>. Symbolic tables categorize the behavior of a transaction based on the initial database sate. Formally, a symbolic table for transaction T is a binary relation Q of pairs <P, T'> where P is a formula in first order logic describing the contents of a database, and T' is a transaction such that T and T' are observationally equivalent when run on databases satisfying P. A symbolic table can also be built for a set of transactions. Formally, transactions are expressed in a language L which is essentially IMP with database reads, database writes, and prints. A somewhat simple recursive algorithm walks backwards through the program computing symbolic tables. Essentially, the algorithm traces all paths through the programs control flow. There is also a higher-level language L++ which can be compiled to L. Homeostasis Protocol. Assume data is partitioned (not replicated) across a cluster of K nodes. We model a distributed database as a pair <D, Loc> where D is a database and Loc is a function from objects to an index between 1 and K. Each transaction T runs on a site i; formally, l(T) = i. For simplicity, we assume that transactions only write to objects local to the site it is running on. Each transaction runs on some site. It reads fresh versions of values on the site and stale versions of values on other sites. Nodes establish treaties with one another such that operating with stale data does not affect the correctness of the transaction. This is best explained by way of example. Imagine the following transaction is running on a site where x is remote. ``` x' = r(x) if x' > 0: write(y = 1) else: write(y = 2) ``` If we establish the treaty x > 0, then it doesn't matter what the actual value of x is. We now formalize this notion. Given a database D, a local-remote partition is a function p from objects to booleans. We can represent a database D with respect to a local-remote p as a pair (l, r) where l is a vector of values x such that p(x), and r is a vector of values x such that not p(x). In words, we can model a database as disjoint sets of local and remote values. We say <(l, r), G> = <(l', r') G'> if l = l' and r = r'. Given a database D, local-remote partition p, transaction T, and set of vectors L and R, we say (L, R) is a local-remote slice (LR-slice) for T if Eval((l, r), T) = Eval((l, r'), T) for all l in L and r, r' in R. In words, (L, R) is a local-remote slice for T if T's output depends only on the values of local values. A global treaty Gamma is a subset of possible database states. A global treaty is valid for a set of transactions {T1, ..., Tn} if ({l | (l, r) in Gamma}, {r | (l, r) in Gamma}) is an LR-slice for all T. The homoeostasis protocol proceeds in rounds where each round has three phases: 1. Treaty generation The system generates a treaty for the current database state. 2. Normal execution. Transactions can execute without coordination reading a snapshot of remote values. After each site executes a transaction, it checks that it does not bring the database to a state outside the treaty. If it doesn't, the transaction is committed. If it does, we enter the next phase. 3. Cleanup. All sites synchronize and communicate all values that have changed since the last round. All sites then run the transaction that caused the violation. Finally, we enter the next round. Generating Treaties. Two big questions remain: how do we generate treaties, and how do we enforce treaties? Given an initial database state D, we could always pick Gamma = {D}. This requires that we synchronize after every single database modification. We want to pick the treaties that let us run as long as possible before synchronizing. We can pick the predicate P in the symbolic table that D satisfies but this isn't guaranteed to be a valid treaty. Instead we take the predicate P and divide it into a set of local treaties P1, ..., PK where the conjunction of all local treaties imply the global treaty. Moreover, each local treaty must be satisfied by the database. The conjunction of the local treaties is our global treaty and is guaranteed to be valid. Finding good local treaties is not easy. In fact, it can be undecidable pretty easily. We limit ourselves to linear arithmetic and leverage SMT solvers to do the heavy lifting for us. First, we decompose the global treaty into a conjunction of linear constraints. We then generate templates from the constraints and instantiate them using Z3. Homeostasis in Practice. Roy et al. present a homoeostasis prototype. An offline preprocessing component takes in L++ transactions and computes join symbolic tables, using tricks to keep the tables small. It them initializes global and local treaties. The online execution component executes the homeostasis protocol described above. It is implemented in Java over MySQL. The analysis uses ANTLR-4 and Z3. |
[link]
Impala is a distributed query engine built on top of Hadoop. That is, it builds off of existing Hadoop tools and frameworks and reads data stored in Hadoop file formats from HDFS. Impala's CREATE TABLE commands specify the location and file format of data stored in Hadoop. This data can also be partitioned into different HDFS directories based on certain column values. Users can then issue typical SQL queries against the data. Impala supports batch INSERTs but doesn't support UPDATE or DELETE. Data can also be manipulated directly by going through HDFS. Impala is divided into three components. 1. An Impala daemon (impalad) runs on each machine and is responsible for receiving queries from users and for orchestrating the execution of queries. 2. A single Statestore daemon (statestored) is a pub/sub system used to disseminate system metadata asynchronously to clients. The statestore has weak semantics and doesn't persist anything to disk. 3. A single Catalog daemon (catalogd) publishes catalog information through the statestored. The catalogd pulls in metadata from external systems, puts it in Impala form, and pushes it through the statestored. Impala has a Java frontend that performs the typical database frontend operations (e.g. parsing, semantic analysis, and query optimization). It uses a two phase query planner. 1. Single node planning. First, a single-node non-executable query plan tree is formed. Typical optimizations like join reordering are performed. 2. Plan parallelization. After a single node plan is formed, it is fragmented and divided between multiple nodes with the goal of minimizing data movement and maximizing scan locality. Impala has a C++ backed that uses Volcano style iterators with exchange operators and runtime code generation using LLVM. To efficiently read data from disk, Impala bypasses the traditional HDFS protocols. The backend supports a lot of different file formats including Avro, RC, sequence, plain test, and Parquet. For cluster and resource management, Impala uses a home grown Llama system that sits over YARN. |
[link]
Borg is Google's cluster manager. Users submit jobs, a collection of tasks, to Borg which are then run in a single cell, many of which live inside a single cluster. Borg jobs are either high priority latency-sensitive production jobs (e.g. user facing products and core infrastructure) or low priority non-production batch jobs. Jobs have typical properties like name and owner and can also express constraints (e.g. only run on certain architectures). Tasks also have properties and state their resource demands. Borg jobs are specified in BCL and are bundled as statically linked executables. Jobs are labeled with a priority and must operate within quota limits. Resources are bundled into allocs in which multiple tasks can run. Borg also manages a naming service, and exports a UI called Sigma to developers. Cells are managed by five-way replicated Borgmasters. A Borgmaster communicates with Borglets running on each machine via RPC, manages the Paxos replicated state of system, and exports information to Sigma. There is also a high fidelity borgmaster simulator known as the Fauxmaster which can used for debugging. One subcomponent of the Borgmaster handles scheduling. Submitted jobs are placed in a queue and scheduled by priority and round-robin within a priority. Each job undergoes feasibility checking where Borg checks that there are enough resources to run the job and then scoring where Borg determines the best place to run the job. Worst fit scheduling spreads jobs across many machines allowing for spikes in resource usage. Best fit crams jobs as closely as possible which is bad for bursty loads. Borg uses a scheduler which attempts to limit "stranded resources": resources on a machine which cannot be used because other resources on the same machine are depleted. Tasks that are preempted are placed back on the queue. Borg also tries to place jobs where their packages are already loaded, but offers no other form of locality. Borglets run on each machine and are responsible for starting and stopping tasks, managing logs, and reporting to the Borgmaster. The Borgmaster periodically polls the Borglets (as opposed to Borglets pushing to the Borgmaster) to avoid any need for flow control or recovery storms. The Borgmaster performs a couple of tricks to achieve high scalability. - The scheduler operates on slightly stale state, a form of "optimistic scheduling". - The Borgmaster caches job scores. - The Borgmaster performs feasibility checking and scoring for all equivalent jobs at once. - Complete scoring is hard, so the Borgmaster uses randomization. The Borgmaster puts the onus of fault tolerance on applications, expecting them to handle occasional failures. Still, the Borgmaster also performs a set of nice tricks for availability. - It reschedules evicted tasks. - It spreads tasks across failure domains. - It limits the number of tasks in a job that can be taken down due to maintenance. - Avoids past machine/task pairings that lead to failure. To measure cluster utilization, Google uses a cell compaction metric: the smallest a cell can be to run a given workload. Better utilization leads directly to savings in money, so Borg is very focused on improving utilization. For example, it allows non-production jobs to reclaim unused resources from production jobs. Borg uses containers for isolation. It also makes sure to throttle or kill jobs appropriately to ensure performance isolation. |
[link]
Building fault tolerant systems is hard, like really hard. There are are a couple approaches to building fault-tolerant systems, but none are perfect. - In a bottom-up approach, we verify individual components of a system are fault-tolerant and then glue the components together. Unfortunately, fault-tolerance is not closed under composition: the combination of two fault-tolerant systems may not be fault tolerant. - In a top-down, we can inject faults into an existing system and see whether or not it fails. Fault-injection can discover shallow bugs, but has trouble surfacing bugs that require more complex failure scenarios (e.g. a network partition followed by a particular sequence of machine failures). Lineage-driven Fault Injection (LDFI) is a top-down approach which uses lineage and fault injection to carefully discover fault-tolerance bugs in distributed systems. If a bug is found, the lineage of the bug is given to the user to help discover the root cause of the bug. If no bugs are found, LDFI provides some guarantees that there are no possible bugs for that particular configuration. In a nutshell, LDFI takes a program written in something like Bloom, inputs to the program, and some parameters (e.g. to bound the length the execution, to bound the number of faults, etc.). It runs the given program on the given input and computes a provenance graph of the output. It then carefully selects a small number of faults that invalidate every derivation. It then injects these faults into the system to see if it surfaces a bug. This is repeated until a bug is found or no such bugs exist. In this paper, Alvaro presents LDFI and an LDFI implementation named Molly. #### System Model LDFI injects faults, but not every kind of fault. We'll explore dropped messages, failed nodes, and network partitions. We won't explore message reordering or crash recovery. While we sacrifice generality, we gain tractability. LDFI is governed by three parameters: 1. LDFI does not operate over arbitrarily long executions. A parameter EOT (end of time) prescribes the maximum logical time of any execution examined by LDFI. 2. Distributed systems typically tolerate some number of faults, but cannot possible tolerate complete message loss for example. A parameter EFF (end of finite failures) < EOT sets a logical time after which LDFI will not introduce faults. The time between EFF and EOT allows a distributed system to recover from message losses. 3. A parameter Crashes sets an upper limit on the number of node crashes LDFI will consider. A failure specification is a three-tuple (EOT, EFF, Crashes). Molly will automatically find a good failure specification by repeatedly increasing EOT until programs can create meaningful results and increasing EFF until faults occur. We assume programs are written in Dedalus and that pre- and postconditions are expressed as special relations pre and post in the program. #### LDFI Consider an interaction between Molly and a user trying to implement a fault-tolerant broadcast protocol between three nodes A, B, and C where A begins with a message to broadcast. Our correctness condition asserts that if a message is delivered to any non-failed node, it is delivered to all of them. - Implementation 1. Assume a user writes an incredibly naive broadcast protocol in which A sends a copy of the message to B and C once. Molly drops the message from A to B inducing a bug. - Implementation 2. The author user then amends the program so that A continually sends the message to B and C. Molly drops the message from A to B and then crashes A. C has the message but B doesn't: a bug. - Implementation 3. The author then amends the program so that all three nodes continuously broadcast the message. Now, Molly cannot find a set of dropped messages or node crashes to prevent all three nodes from obtaining the message. - Implementation 4. While implementation 3 is fault-tolerant it is also inefficient. The user amends the protocol so that nodes broadcast messages until they receive an ack from the other nodes. Molly can devise a sequence of message drops and node crashes to prevent the message from being delivered to all three nodes, but when it runs the system again with the same faults, the messages still appear. - Implementation 5. A "classic" implementation of broadcast in which a node broadcasts any message it receives once is found to be buggy by Molly. #### Molly Molly begins by rewriting a Dedalus program into a Datalog program. Each relation is augmented with a time column. ``` foo(A,B) ==> foo(A,B,T) foo(B,C) ==> bar(B,C,T) baz(A,C) ==> baz(A,C,T) ``` The time column of every predicate in the body of a rule is bound to the same variable T. ``` _ :- foo(A,B), bar(B,C) ==> _ :- foo(A,B,T), bar(B,C,T) ``` The head of every deductive rule is bound to T. ``` baz(A,C) :- foo(A,B), bar(B,C) ==> baz(A,C,T) :- foo(A,B,T), bar(B,C,T) ``` The head of every inductive rule is bound to T + 1. ``` baz(A,C) :- foo(A,B), bar(B,C) ==> baz(A,C,T+1) :- foo(A,B,T), bar(B,C,T) ``` For asynchronous rules, we introduce a Clock(From, To, T) relation which contains an entry (n, m, T) if node n sent a message to m at time T. Then, the body of asynchronous rules at node n whose heads are destined for node n are augmented with a Clock(n, m, t) predicate while the head is augmented with T + 1. Molly can add and remove entries from Clock to simulate faults. ``` foo(A,B) :- foo(B,A) ==> foo(A,B,T+1) :- foo(B,A,T), Clock(B,A,T) ``` It then rewrites the Datalog program to maintain its own provenance graph, and extracts lineage from the graph via recursive queries that walk the graph. Given an execution of the Datalog program, Molly generates a CNF formula where the disjuncts inside each conjunct x1 or .. or xn represent a message drop or node failure that would invalidate a particular derivation. If all derivations can be invalidated, then the formula is unsatisfiable and the program is fault-tolerant. If the formula is satisfiable, then each satisfied conjunct represents a counterexample of the derivation. Molly uses a SMT solver to solve these formulas. |
[link]
Many distributed databases geo-replicate data for (i) lower read latency and (ii) higher fault tolerance in the face of an extreme failure (e.g. lightning striking a data center). Implementing strong consistency over a geo-replicated system can incur tremendous write latency, as updates have to coordinate between geographically distant data centers. On the other hand, weak consistency is a real brain buster. This paper introduces a new consistency model between weak and strong consistency, explicit consistency, which takes into account user specified invariants. It also presents an explicitly consistent system which 1. performs static analysis to determine which operations can be executed without coordination, 2. uses invariant-repair or violation-avoidance to resolve or avoid conflicts, and 3. instruments user code with calls to middleware. The system is called Indigo and is built on an existing causally consistent key-value store with various properties. Explicit Consistency. A database is a collection of objects replicated across data centers. Users issue reads and writes as part of transactions, and these transactions are asynchronously replicated between data centers. We denote by t(S) the database state achieved by applying transaction t to database state S. S_n is the database state achieved after the nth transaction. That is, $S_n = t_n(...(t_1(t_init))...)$. $T(S_n) = \{t_1, ..., t_n\}$ is the set of transactions used to create S_n. We say a transaction t_a happens before a transaction $t_b$, denoted $t_a \rightarrow t_b$, if t_a is in $T(S_b)$. $O = (T,\rightarrow)$ is a partial order. $O' = (T, <)$ is a serialization of $O$ if $<$ is total and respects $\rightarrow$. Given an invariant $I$, we say $S$ is $I$-valid if $I(S) = true$. $(T, <)$ is an I-valid serialization if I holds on all prefixes of the serialization. If a system ensures that all serializations are I-valid, it provides explicit consistency. In other words, an explicitly consistent database ensures invariants always hold. This builds off of Bailis et al.'s notion of invariant-confluence. Determining $I$-offender Sets. An invariant is a universally quantified first order logic formula in prenex normal form. The invariant can include uninterpreted functions like Player($P$) and enrolled($P, T$). A postcondition states how operations affect the truth values of the uninterpreted functions in invariants. Every operation is annotated with postconditions. A predicate clause directly alters the truth assignments of a predicate (e.g. not Player(P)). A function clause relates old and new database states (e.g. nrPlayers(T) = nrPlayers(T) + 1). This language is rather expressive, as evidenced by multiple examples in the paper. A set of transactions is an I-offender if it is not invariant-confluent. First, pairs of operations are checked to see if a contradictory truth assignment is formed (e.g. Player(P) and not Player(P)). Then, every pair of transactions is considered. Given the weakest liberal precondition of the transactions, we substitute the effects of the transactions into the invariant to get a formula. We then check for the validity of the formula using Z3. If the formula is valid, the transactions are invariant-confluent. Handling I-offender Sets. There are two ways to handle I-offenders: invariant-repair and violation-avoidance. Invariant-repair involves CRDTs; the bulk of this paper focuses on violation-avoidance which leverages existing reservation and escrow transaction techniques. - UID generation. Unique identifiers can easily be generated without coordination. For example, a node can concatenate an incrementing counter with its MAC address. - Multi-level lock reservation. Locking is the most general form of reservation. Locks come in three flavors: (i) shared forbid, (ii) shared allow, and (iii) exclusive allow. Transactions acquire locks to avoid invariant violation. For example, an enrollTournament could acquire a sharedForbid lock on removing players, and removePlayer could acquire a sharedAllow lock. Exclusive allow are used for self-conflicting operations. - Multi-level mask reservation. If our invariant is a disjunction P1 or ... or Pn, then to preserve the invariant, we only need to guarantee that at least one of the disjuncts remains true. A mask reservation is a vector of locks where an operation can falsify one of the disjuncts only after acquiring a lock on another true disjunct preventing it from being falsified. - Escrow reservation. Imagine our invariant is x >= k and x has initial value x0. Escrow transactions allocate x0 - k rights. A transaction can decrement x only after acquiring and spending a right. When x is incremented, a right is generated. This gets tricker for invariants like |A| >= k where concurrent additions could generate too many rights leading to an invariant violation. Here, we use escrow transactions for conditions, where a primary is allocated for each reservation. Rights are not immediately generated; instead, the primary is responsible for generating rights. - Partition lock reservation. Partition locks allow operations to lock a small part of an object. For example, an operation could lock part of a timeline to ensure there are no overlapping timespans. There are many ways to use reservations to avoid invariant violations. Indigo uses heuristics and estimated operation frequencies to try and minimize reservation acquisitions. Implementation. Indigo can run over any key-value store that offers (i) causally consistency, (ii) snapshot transactions with CRDTs, and (iii) linearizability within a data center. Currently, it uses Swiftcloud. Its fault tolerance leverages the underlying fault tolerance of the key-value store. Each reservation is stored as an object in the key-value store, where operations are structured as transfers to avoid some concurrency oddities. |
[link]
Data processing frameworks like MapReduce and Spark can do things that relational databases can't do very easily. For example, they can operate over semi-structured or unstructured data, and they can perform advanced analytics. On the other hand, Spark's API allows user to run arbitrary code (e.g. rdd.map(some_arbitrary_function)) which prevents Spark from performing certain optimizations. Spark SQL marries imperative Spark-like data processing with declarative SQL-like data processing into a single unified interface. Spark's main abstraction was an RDD. Spark SQL's main abstraction is a DataFrame: the Spark analog of a table which supports a nested data model of standard SQL types as well as structs, arrays, maps, unions, and user defined types. DataFrames can be manipulated as if they were RDDs of row objects (e.g. dataframe.map(row_func)), but they also support a set of standard relational operators which take ASTs, built using a DSL, as arguments. For example, the code users.where(users("age") < 40) constructs an AST from users("age") < 40 as an argument to filter the users DataFrame. By passing in ASTs as arguments rather than arbitrary user code, Spark is able to perform optimizations it previously could not do. DataFrames can also be queries using SQL. Notably, integrating queries into an existing programming language (e.g. Scala) makes writing queries much easier. Intermediate subqueries can be reused, queries can be constructed using standard control flow, etc. Moreover, Spark eagerly typechecks queries even though their execution is lazy. Furthermore, Spark SQL allows users to create DataFrames of language objects (e.g. Scala objects), and UDFs are just normal Scala functions. DataFrame queries are optimized and manipulated by a new extensible query optimizer called Catalyst. The query optimizer manipulates ASTs written in Scala using rules, which are just functions from trees to trees that typically use pattern matching. Queries are optimized in four phases: 1. Analysis. First, relations and columns are resolved, queries are typechecked, etc. 2. Logical optimization. Typical logical optimizations like constant folding, filter pushdown, boolean expression simplification, etc are performed. 3. Physical planning. Cost based optimization is performed. 4. Code generation. Scala quasiquoting is used for code generation. Catalyst also makes it easy for people to add new data sources and user defined types. Spark SQL also supports schema inference, ML integration, and query federation: useful features for big data. |
[link]
Storm was Twitter's first stream processing system. Unfortunately, it wasn't good enough for a number of reasons. Heron is a Storm API compliant rewrite of Storm. Motivation. In Storm, computation is expressed as a directed graph, or topology, where vertexes are computations and edges transport tuples. Storm topologies are run on a cluster of workers overseen by a central Nimbus. Each worker runs multiple worker processes which run a JVM which run multiple executors which run multiple tasks. The executors run multiple threads, and each worker also has a multi-threaded supervisor. This worker architecture was far too complex. Multiple components were making scheduling decisions (e.g. OS schedules processes, JVM schedules executors; executors schedule tasks) which made it hard to predict when certain tasks would be run. Moreover, putting different types of tasks on the same executor complicated logs, exception handling, garbage collection, etc. The Storm scheduler was also not good at scheduling tasks with different resource requirements. The fact that workers were very multi-threaded meant that messages were traversing a lot of thread boundaries. The Nimbus was a complicated piece of code that did too much stuff. It also often became a bottleneck and was a single point of failure. It's scheduling was so poor, that Twitter used to reserve nodes to exclusively run a single topology. The Nimbus also communicated with workers through ZooKeeper which became a bottleneck. Storm also did not implement backpressure; when bolts became overloaded; packets were just dropped. Design Alternatives. Twitter considered extending and modifying Storm to fix its problems, but its flaws were deeply entrenched in its design, so a rewrite would be difficult. They considered using other existing stream processing systems, but didn't want to break the Storm API and have to rewrite a bunch of applications. In the end, they felt like a rewrite was the best bet. Data Model and API. Heron follows the exact same API as Storm. Computation is expressed as a directed graph where vertexes are spouts (sources of tuples) or bolts (tuple processors) and edges transfer tuples between vertexes. Users provide logical plans which are expanded to physical plans in order to exploit data parallelism. Heron provides at least once and at most once semantics. Architecture Overview. Users submit Heron topologies to Aurora, though Heron is able to run on top of Mesos, YARN, ECS, etc. Each topology is run as a set of containers. One container runs the Topology Master (TM). The rest run a Stream Manager (SM), a Metrics Manager (MM), and multiple Heron Instances (HI). Topology state is kept in ZooKeeper, and the TM can have a standby. All communication is done via protobufs. Topology Master. The TM is responsible for overseeing the execution of a topology and reporting its status. The TM holds an ephemeral node in ZooKeeper to ensure there is only ever one TM and so that other things can discover it. Stream Manager. Stream Managers are responsible for routing tuples. There are k Stream Managers that form a clique. Though, O(k^2) connections is a lot, the number of Heron Instances can scale independently of k. Stream Managers communicate via TCP, short-circuiting if delivering within a container. Heron, unlike Storm, implements backpressure. Here are three kinds of backpressure implementations: - TCP Backpressure. Heron Instances communicate with Stream Managers via TCP. This provides a natural form of backpressure. If a TCP consumer is too slow, the TCP producer will slow down. This form of backpressure is easy to implement. However, multiple logical edges are mapped over a single SM to SM TCP connection which means that sometimes nodes are inadvertently slowed down when they shouldn't be. - Spout Backpressure. When a bolt is overloaded, the SM in charge of the bolt can tell the spout that is feeding into it to slow down. This is somewhat inefficient in that slowing an intermediate node may be sufficient. - Stage-by-Stage Backpressure. Backpressure can be propagated backwards from vertex to vertex. Heron implements TCP and Spout Backpressure. Each socket is associated with a queue whose size has a high and low watermark. If the size exceeds the high watermark, backpressure is applied until it drops below the low watermark. Heron Instances. Each Heron instance runs a single JVM which runs a single task which makes debugging significantly easier. Heron instances cannot be single-threaded because slow user code could prevent things like metrics from being reported in a timely manner. So, Heron implements Heron Instances with two threads: a Gateway Thread and a Task Execution Thread. The Gateway Thread communicates with the Task Execution Thread and also communicates with the SM and MM. The Task Execution Thread runs user code and gathers metrics. The Gateway Thread communicates with the Task Execution Thread using a set of one-directional queues. The sizes of these queues can be adjusted to avoid bad GC. Metrics Manager. The metrics manager, well, manages metrics. It reports metrics to the TM and to a Twitter monitoring system. |
[link]
Google has spent the last decade developing three container management systems. Borg is Google's main cluster management system that manages long running production services and non-production batch jobs on the same set of machines to maximize cluster utilization. Omega is a clean-slate rewrite of Borg using more principled architecture. In Omega, all system state lives in a consistent Paxos-based storage system that is accessed by a multitude of components which act as peers. Kubernetes is the latest open source container manager that draws on lessons from both previous systems. All three systems use containers for security and performance isolation. Container technology has evolved greatly since the inception of Borg from chroot to jails to cgroups. Of course containers cannot prevent all forms of performance isolation. Today, containers also contain program images. Containers allow the cloud to shift from a machine-oriented design to an application oriented-design and tout a number of advantages. - The gap between where an application is developed and where it is deployed is shrunk. - Application writes don't have to worry about the details of the operating system on which the application will run. - Infrastructure operators can upgrade hardware without worrying about breaking a lot of applications. - Telemetry is tied to applications rather than machines which improves introspection and debugging. Container management systems typically also provide a host of other niceties including: - naming services, - autoscaling, - load balancing, - rollout tools, and - monitoring tools. In borg, these features were integrated over time in ad-hoc ways. Kubernetes organizes these features under a unified and flexible API. Google's experience has led a number of things to avoid: - Container management systems shouldn't manage ports. Kubernetes gives each job a unique IP address allowing it to use any port it wants. - Containers should have labels, not just numbers. Borg gave each task an index within its job. Kubernetes allows jobs to be labeled with key-value pairs and be grouped based on these labels. - In Borg, every task belongs to a single job. Kubernetes makes task management more flexible by allowing a task to belong to multiple groups. - Omega exposed the raw state of the system to its components. Kubernetes avoids this by arbitrating access to state through an API. Despite the decade of experience, there are still open problems yet to be solved: - Configuration. Configuration languages begin simple but slowly evolve into complicated and poorly designed Turing complete programming languages. It's ideal to have configuration files be simple data files and let real programming languages manipulate them. - Dependency management. Programs have lots of dependencies but don't manually state them. This makes automated dependency management very tough. |
[link]
Strong consistency increases latency; weak consistency is hard to reason about. Compromising between the two, a number of databases allow each operation to run with either strong or weak consistency. Ideally, users choose the minimal set of strongly consistent operations needed to enforce some application specific invariant. However, deciding which operations to run with strong consistency and which to run with weak consistency can be very challenging. This paper - introduces a formal hybrid consistency model which subsumes many existing consistency models, - introduces a modular proof rule which can determine whether a given consistency model enforces an invariant, and - implements a prototype using a standard SMT solver. Consistency Model, Informally. Consider - a set of states $s, s_{init} \in$ State, - a set of operations Op = {o, ...}, - a set of values $\bot$ in Val, and - a set of replicas $r_1, r_2,$ .... The denotation of an operation is denoted F_o where - F_o: State -> (Val x (State -> State)), - F_o^val(s) = F_o(s)[0] (the value returned by the operation), and - F_o^eff(s) = F_o(s)[1] (the effect of the operation). For example, a banking operation may let states range over natural numbers where - F_deposit_a(s) = (\bot, fun s' -> s' + a) - F_interest(s) = (\bot, fun s' -> s' + 0.05 * s) - F_query(s) = (s, fun s' -> s') If all operations commute (i.e. forall o1, o2, s1, s2. F_o1(s1) o F_o2(s2) = F_o2(s2) o F_o1(s1)), then all replicas are guaranteed to converge. However, convergence does not guarantee all application invariants are maintained. For example, an invariant I = {s | s >= 0} could be violated by merging concurrent withdrawals. To enforce invariants, we introduce a token system which can be used to order certain operations. A token system TS = (Token, #) is a set of tokens Token and a symmetric relation # over Token. We say two sets of tokens T1 # T2 if exists t1 in T1, t2 in T2. t1 # t2. We also update our definition of operations to acquire tokens: - F_o: State -> (Val x (State -> State) x P(Token)), - F_o^val(s) = F_o(s)[0] (the value returned by the operation), - F_o^eff(s) = F_o(s)[1] (the effect of the operation), and - F_o^tok(s) = F_o(s)[2] (the tokens acquired by the operation). Our consistency model will ensure that two operations that acquire conflicting operations will be ordered. Formal Semantics. Recall a strict partial order is a partial order that is transitive and irreflexive (e.g. sets ordered by strict subset). Given a partial order R, we say (a, b) \in R or a -R-> b. Consider a countably infinite set Event of events ranged over by e, f, g. If operations are like transactions, an event is like applying a transaction at a replica. - Definition 1. Given token system TS = (Token, #), an execution is a tuple X = (E, oper, rval, tok, hb) where - E \subset Event is a finite subset of events, - oper: E -> Op designates the operation of each event, - rval: E -> Val designates the return value of each event, - tok: E -> P(Token) designates the tokens acquired by each event, and - hb \subset Event x Event is a happens before strict partial order where forall e, f. tok(e) # tok(f) => (e-hb->f or f-hb->e). An execution formalizes operations executing at various replicas concurrently, and the happens before relation captures how these operations are propagated between replicas. The transitivity of the happens before relation ensures at least causal consistency. Let - Exec(TS) be the set of all executions over token system TS, and - ctxt(e, X) = (E, X.oper|E, X.rval|E, X.tok|E, X.hb|E) be the context of e where E = X.hb^-1(e). Intuitively, ctxt(e, X) is the subexection of X that only includes operations causally preceding e. Executions are directed graphs of operations, but without a semantics, they are rather meaningless. Here, we define a relation evald_F \subset Exec(TS) x P(State) where evald_F(Y) is the set of all final states Y can be in after all operations are propagated to all replicas. We'll see shortly that if all non-token-conflicting operations commute, then evald_F is a function. - evald_F(Y) = {} if Y.E = {}, and - evald_F(Y) = {F_e^eff(s)(s') | e \in max(Y), s \in evald_F(ctxt(e, Y)), s' \in evalfd_F(Y|Y.E - {e})} otherwise. Now - Definition 2. An execution X \in Exec(TS) is consistent with TS and F denoted X |= TS, F if forall e \in X.e. exists s \in evald_F(ctxt(e, x)). X.val(e) = F_X.oper(e)^val(s) and X.tok(e = F_X.oper(e)^tok(s)). We let Exec(TS, F) = {X | X |= TS, F}. Consistent operations are closed under context. Furthermore, evald_F is a function when restricted to consistent executions where non-token-conflicting operations commute. We call this function eval_F. This model can model a number of consistency models: - Causal consistency. Let Token = {}. - Sequential consistency. Let Token = {t}, t # t, and F_o^tok(s) = {t} for all o. - RedBlue Consistency. Let Token = {t}, t # t, and F_o^tok(s) = {t} for all red o and F_o^tok(s) = {} for all blue o. State Based Proof Rule. We want to construct a proof rule to establish the fact that Exec(TS, F) \subset eval_F^-1(I). That is, every execution results in a state that satisfies the invariant. Since executions are closed under context, this also means that all operations execute on a state that satisfies the invariant. Our proof rule involves a guarantee relation G(t) over states which describes all possible state changes that can occur while holding token t. Similarly, G_0 describes the state transitions that can occur without holding any tokens. Here is the proof rule. - S1: s_init \in I. - S2: G_0(I) \subset I and forall t. G(t)(I) \subset I. - S3: forall o, s, s'. - s \in I and - (s, s') \in (G_0 \cup G(F_o^tok(s)^\bot))* => - (s', F_o^eff(s)(s')) \in G_0 \cup G(F_o^tok(s)). In English, - S1: s_init satisfies the invariant. - S2: G and G_0 preserve the invariant. - S3: If we start in any state s that satisfies the invariant and can transition in any finite number of steps to any state s' without acquiring any tokens conflicting with o, then we can transition from s' to F_o^eff(s)(s') in a single step using the tokens acquired by o. Event Based Proof Rule and Soundness. Instead of looking at states, we can instead look at executions. That is, if we let invariants I \subset Exec(TS), then we want to write a proof rule to ensure Exec(TS, F) \subset I. That is, all consistent executions satisfy the invariant. Again, we use a guarantee G \subset Exec(TS) x Exec(TS). - E1: X_init \in I. - E2: G(I) \in I. - E3: forall X, X', X''. forall e in X''.E. - X'' |= TS, F and - X' = X''|X''.E - {e} and - e \in max(X'') and - X = ctxt(e, X'') and - X \in I and - (X, X') \in G* => - (X', X'') \in G. This proof rule is proven sound. The event based rule and its soundness is derived from this. Examples and Automation. The authors have built a banking, auction, and courseware application in this style. They have also built a prototype that you give TS, F, and I and it determines if Exec(T, f) \subset eval_F^-1(I). Their prototype modifies the state-based proof rule eliminating the transitive closure and introducing intermediate predicates. |
[link]
Decibel is like git for relational data. Often, teams need to simultaneously query, analyze, clean, or curate a collection of data without clobbering each other's work. Currently, the best solution available involves each team member making copies of the entire database. This is undesirable for a number of reasons: - Data is stored very redundantly, wasting a huge amount of storage. - It is difficult to share or merge changes made to local snapshots of the data. - There is no systematic way to say which version of the data was used for a given experiment. Version control solves these problems, but existing version control systems (e.g. git) have a couple of shortcomings when applied naively to large datasets: - Using distributed version control systems like git, clients clone an entire copy of a repository's contents. This is infeasible when working with large datasets. - Version control systems like git operate on arbitrary data and do not understand the structure of relational data. This makes certain operations like generic diffing a bad fit. - Systems like git do not support high-level data management or query APIs. Decibel manages datasets which are collections of relations, and each relation's schema includes an immutable primary key which is used to track the version of each row. Beginning with an initial snapshot of a dataset, users can check out, branch, and commit changes to the data set in a style very similar to git. When data is merged, non-conflicting changes to separate columns of the same row are both applied. If conflicting changes to the same column of the same row occur, one branch's changes take priority. Diffing two tables across two branches results in a table of insertions and deletions. Finally, data can be queried across versions using VQuel: a query language which is notably not SQL. This paper describes three physical realizations of Decibel's logical data model. 1. In a tuple-first representation, tables contain every version of every row, and each row is annotated with a bitmap indicating the set of versions in which the row is live. In a tuple-oriented approach, each of N tuples comes with a B-bit bitmap for B branches. In a branch-oriented approach, there are B N-bit bitmaps. 2. In a version-first representation, all the changes made to a table in a branch are stored together in the same file, and these branched files contain pointers to their ancestors forming a directed acyclic graph. 3. In a hybrid representation, data is stored similarly to the version-first approach, but each of the branched files (called segments) includes a segment index: a bitmap, like in the tuple-first representation, that tracks the liveness of each row for all descendent branches. Moreover, there is a single branch-segment bitmap which for each branch, records the set of segments with a tuple live in that branch. The tuple-first representation is good for multi-version queries, the version-first representation is good for single-version queries, and the hybrid approach is good at both. This paper also presents a versioned database benchmarking framework in the form of a set of branching strategies and characteristic queries to analyze Decibel and any future versioned databases. Commentary. A git-like versioned relational database seems like a great idea! This paper focuses on how to implement and analyze such a database efficiently; it focuses less on the higher-level semantics of data versioning. Notably, two unanswered questions stuck out to me that seem like interesting areas for future research. 1. The current merging strategy seems inconvenient, if not inconsistent. Imagine a table R(key, a, b) with the invariant that a + b < 10. Imagine branch b1 updates a tuple (0, 0, 0) to (0, 9, 0) and branch b2 updates it to (0, 0, 9). If these tuples are merged, the tuple becomes (0, 9, 9) violating the invariant. In systems like git, users can manually inspect and verify the correctness of the merge, but if a large dataset is being merged, this becomes infeasible. I think more research is needed to determine if this strategy is good enough and won't cause problems in practice. Or if it is insufficient, what merging strategies can be applied on large datasets. Perhaps ideas could be borrowed from CRDT literature; if all columns were semilattices, the columns could be merged in a sane way. 2. How useful is diffing when dealing with large datasets? When working with git, even a modestly sized diff can become confusing. Would a non-trivial diff of a large dataset be incomprehensible? |
[link]
In the face of latency, partitions, and failures, application sometimes turn to weak consistency for improved performance. However, strong consistency cannot always be avoided, so some data stores (e.g. Cassandra, Riak) allow applications to operate on some data with weak consistency and on other data with strong consistency. However, it is difficult to ensure that data read with weak consistency does not leak into the strongly consistent data. For example, imagine a ticketing application that displays the number of remaining tickets for an upcoming movie. To improve performance, an application may choose to read the data using weak consistency; it's okay if the displayed number is slightly erroneous. However, now imagine that we want to increase the price of the last 10 tickets. If we used the same weakly consistent value to determine the price of a ticket, we may make less money that we expect. Weak and strong consistency cannot be carelessly mixed. The Inconsistent, Performance-bound, Approximate (IPA) storage system uses a type system of consistency types to ensure consistency safety: the guarantee that weakly consistent data does not leak into strongly consistent data. They also introduce error-bounded consistency which allows a runtime to dynamically choose the strongest consistency that satisfies certain constraints (e.g. latency or accuracy). Programming Model. The IPA programming model involves three components: abstract data types, consistency policies, and consistency types. Users annotate ADTs with consistency policies which determines the consistency types returned by the ADT's methods. 1. ADTs. IPA is bundled with a set of common ADTs (e.g. sets, maps, lists) that can be implemented with any number of backing stores (e.g. Cassandra, Riak, Redis). The collection of ADTs is also extensible; users can add their own ADTs. 2. Consistency Policies. ADT instances (or their methods) are annotated with consistency policies which designate the level of consistency the user desires. Static policies, like strong consistency, are fixed throughout the lifetime of the ADT. Dynamic policies allow the runtime to dynamically choose the consistency of the system to meet some constraint. For example, a user can set a policy LatencyBound(x), and the system will choose the strongest consistency that can be served and still satisfy the latency bound. Or, a user can set a ErrorTolerance(x) allowing the system to return approximate results with weaker consistency. 3. Consistency Types. Consistency types are parameterized on a type T and are partially ordered ``` Consistent[T] / | \ Interval[T] Rushed[T] ... \ | / Inconsistent[T] ``` Users can explicitly endorse, or upcast, weak types to stronger types. Rushed types are produced by ADTs with LatencyBound policies and are a sum of other consistency types. Interval types are a numeric interval which are guaranteed to contain the correct value. Enforcing Consistency Policies. Static consistency policies are easy to enforce. The ADT operations simply set flags that are used by the underlying store. Dynamic policies are trickier to implement. - Latency bounds. To service a read with a latency bound of x milliseconds, for example, a read is issued at every single consistency level. The strongest to return in less that x milliseconds is used. This naive approach can put unwanted load on a system, so IPA can also monitor and predict the strongest consistency model for a given latency bound in order to issue only a couple of reads. - Error bounds. Error bounds are enforced using a reservation scheme similar to the one used by bounded CRDTs. For example, imagine a counter with value 10 should be concurrently decremented but never drop below 0. We can allocate 10 decrement tokens and distribute them to nodes. A node can decrement so long as it has enough tokens. IPA involves reservation servers which are assigned tokens. A node can return an approximate read by knowing the number of outstanding tokens. For example, given a value of 100 and knowing there are 10 outstanding increment tokens, the true value is somewhere in the range [100, 110]. The more outstanding tokens there are, the weaker the error bounds. To avoid unnecessarily granting tokens, IPA uses an allocation table which maps reservation servers to tokens. Tokens are only allocated when necessary. Implementaton. IPA is implemented in Scala on top of Cassandra with some middleware for the reservation servers. Type checking is done by leveraging Scala's type system. |
[link]
In fear of fettering development and innovation, companies often allow engineers free reign to generate and analyze datasets at will. This often leads to unorganized data lakes: a ragtag collection of datasets from a diverse set of sources. Google Dataset Search (Goods) is a system which uses unobstructive post-hoc metadata extraction and inference to organize Google's unorganized datasets and present curated dataset information, such as metadata and provenance, to engineers. Building a system like Goods at Google scale presents many challenges. - Scale. There are 26 billion datasets. 26 billion (with a b)! - Variety. Data comes from a diverse set of sources (e.g. BigTable, Spanner, logs). - Churn. Roughly 5% of the datasets are deleted everyday, and datasets are created roughly as quickly as they are deleted. - Uncertainty. Some metadata inference is approximate and speculative. - Ranking. To facilitate useful dataset search, datasets have to be ranked by importance: a difficult heuristic-driven process. - Semantics. Extracting the semantic content of a dataset is useful but challenging. For example consider a file of protos that doesn't reference the type of proto being stored. The Goods catalog is a BigTable keyed by dataset name where each row contains metadata including - basic metatdata like timestamp, owners, and access permissions; - provenance showing the lineage of each dataset; - schema; - data summaries extracted from source code; and - user provided annotations. Moreover, similar datasets or multiple versions of the same logical dataset are grouped together to form clusters. Metadata for one element of a cluster can be used as metadata for other elements of the cluster, greatly reducing the amount of metadata that needs to be computed. Data is clustered by timestamp, data center, machine, version, and UID, all of which is extracted from dataset paths (e.g. /foo/bar/montana/August01/foo.txt). In addition to storing dataset metadata, each row also stores status metadata: information about the completion status of various jobs which operate on the catalog. The numerous concurrently executing batch jobs use status metadata as a weak form of synchronization and dependency resolution, potentially deferring the processing of a row until another job has processed it. The fault tolerance of these jobs is provided by a mix of job retries, BigTable's idempotent update semantics, and a watchdog that terminates divergent programs. Finally, a two-phase garbage collector tombstones rows that satisfy a garbage collection predicate and removes them one day later if they still match the predicate. Batch jobs do not process tombstoned rows. The Goods frontend includes dataset profile pages, dataset search driven by a handful of heuristics to rank datasets by importance, and teams dashboard. |
[link]
There's an enormous number of stream (a.k.a. real-time, interactive) processing systems in the wild: Twitter Storm, Twitter Heron, Google Millwheel, LinkedIn Samza, Spark Streaming, Apache Flink, etc. While all similar, the stream processing systems differ in their ease of use, performance, fault-tolerance, scalability, correctness, etc. In this paper, Facebook discusses the design decisions that go into developing a stream processing system and discusses the decisions they made with three of their real-time processing systems: Puma, Swift, and Stylus. Systems Overview. - Scribe. Scribe is a persistent messaging system for log data. Data is organized into categories, and Scribe buckets are the basic unit of stream processing. The data pushed into scribe is persisted in HDFS. - Puma. Puma is a real-time data processing system in which applications are written in a SQL-like language with user defined functions written in Java. The system is designed for compiled, rather than ad-hoc, queries. It used to compute aggregates and to filter Scribe streams. - Swift. Swift is used to checkpoint Scribe streams. Checkpoints are made every N strings or every B bytes. Swift is used for low-throughput applications. - Stylus. Stylus is a low-level general purpose stream processing system written in C++ and resembles Storm, Millwheel, etc. Stream processors are organized into DAGs and the system provides estimated low watermarks. - Laser. Laser is a high throughput, low latency key-value store built on RocksDB. - Scuba. Scuba supports ad-hoc queries for debugging. - Hive. Hive is a huge data warehouse which support SQL queries. Example Application. Imagine a stream of events, where each event belongs to a single topic. Consider a streaming application which computes the top k events for each topic over 5 minute windows composed of four stages: 1. Filterer. The filterer filter events and shards events based on their dimension id. 2. Joiner. The joiner looks up dimension data by dimension id, infers the topic of the event, and shards output by (event, topic). 3. Scorer. The scorer maintains a recent history of event counts per topic as well as some long-term counts. It assigns a score for each event and shards output by topic. 4. Ranker. The ranker computes the top k events per topic. The filterer and joiner are stateless; the scorer and ranker are stateful. The filterer and ranker can be implemented in Puma. All can be implemented in Stylus. Language Paradigm. The choice of the language in which users write applications can greatly impact a system's ease of use: - Declarative. SQL is declarative, expressive, and everyone knows it. However, not all computations can be expressed in SQL. - Functional. Frameworks like Dryad and Spark provide users with a set of built-in operators which they chain together. This is more flexible that SQL. - Procedural. Systems like Storm, Heron, and Samza allow users to form DAGs of arbitrary processing units. Puma uses SQL, Swift uses Python, and Stylus uses C++. Data Transfer. Data must be transferred between nodes in a DAG: - Direct message transfer. Data can be transferred directly with something like RPCs or ZeroMQ. Millwheel, Flink, and Spark Streaming do this. - Broker based message transfer. Instead of direct communication, a message broker can be placed between nodes. This allows an output to be multiplexed to multiple outputs. Moreover, brokers can implement back pressure. Heron does this. - Persistent message based transfer. Storing messages to a persistent messaging layer allows data to be multiplexed, allows for different reader and writer speeds, allows data to be read again, and makes failures independent. Samza Puma, Swift, and Stylus do this. Facebook connects its systems with Scribe for the following benefits: - Fault Tolerance: If the producer of a stream fails, the consumer is not affected. This failure independence becomes increasingly useful at scale. - Fault Tolerance: Recovery can be faster because only nodes need to be replaced. - Fault Tolerance: Multiple identical downstream nodes can be run to improve fault-tolerance. - Performance: Different nodes can have different read and write latencies. The system doesn't propagate back pressure to slow down the system. - Ease of Use: The ability to replay messages makes debugging easier. - Ease of Use: Storing messages in Scribe makes monitoring and alerting easier. - Ease of Use: Having Scribe as a common substrate lets different frameworks communicate with one another. - Scalability: Changing the number of Scribe buckets makes it easy to change the number of partitions. Processing Semantics. Stream processors: 1. Proccess inputs, 2. Generate output, and 3. checkpoint state, stream offsets, and outputs for recovery. Each node has - state semantics: can each input affect state at least once, at most once, or exactly once. - output semantics: can each output be produced at least once, at most once, or exactly once. For state semantics, we can achieve - at least once by saving state before saving stream offsets, - at most once by saving stream offsets before saving state, and - exactly once by saving both state and stream offsets atomically. For output semantics, we can achieve - at least once by saving output before offset/state, - at most once by saving offset/state before output, - exactly once by saving output and offset/state atomically. At-least-once semantics is useful when low latency is more important than duplicate records. At most once is useful when loss is preferred over duplication. Puma guarantees at least once state and output semantics, and Stylus supports a whole bunch of combinations. State-saving Mechanisms. Node state can be saved in one of many ways: - Replication. Running multiple copies of a node provides fault tolerance. - Local DB. Nodes can save their state to a local database like LevelDB or RocksDB. - Remote DB. Nodes can save their state to remote databases. Millwheel does this. - Upstream backup. Output messages can be buffered upstream in case downstream nodes fail. - Global consistent snapshot. Flink maintains globally consistent snapshots. Stylus can save to a local RocksDB instance with data asynchronously backed up to HDFS. Alternatively, it can store to a remote database. If a processing unit forms a monoid (identity element with associate operator), then input data can be processed and later merged into the remote DB. Backfill Processing. Being able to re-run old jobs or run new jobs on old data is useful for a number of reasons: - Running new jobs on old data is great for debugging. - Sometimes, we need to run a new metric on old data to generate historical metrics. - If a node has a bug, we'd like to re-run the node on the data. To re-run processing on old data, we have three choices: - Stream only. - Separate batch and streaming systems. This can be very annoying and hard to manage. - Combined batch and streaming system. This is what Spark streaming, Flink, and Facebook does. Puma and Stylus code can be run as either streaming or batch applications. |
[link]
Overview. Strong consistency is expensive. The alternative, weak consistency, is hard to program against. The lack of distributed synchronization or consensus in a weakly consistent system means that replica state can diverge. Existing systems try to hide this divergence with causal consistency to deal with read-write conflicts and per-object eventual convergence to deal with write-write conflicts, but neither is sufficient to deal with complex multi-object write-write conflicts. As a motivating example, imagine a Wikipedia article for Donald Trump with three parts: some text, an image, and some references. In one partition, Hillary modifies the text to oppose Trump and subsequently, Tim changes the picture to a nastier image of Trump. In another picture, Trump modifies to the text to support Trump and subsequently, Mike changes the references to link to pro-Trump websites. Later, the partitions need to be merged. The write-write conflict on the text needs to be reconciled. Moreover, the modifications to the image and references do not produce conflicts but still need to be updated to match the text. Existing systems solve this in one of two ways: 1. syntactic conflict resolution: Some policy like last-writer-wins is chosen. 2. lack of cross-object semantics: Users are forced to merge individual objects. TARDiS (Transactional Asynchronously Replicated Divergent Store) is a distributed weakly-consistent transactional key-value store that supports branching computation as a core abstraction. When working off a single branch, applications are shielded from diverging computations. Conversely, applications can merge branches and reconcile conflicts. By allowing applications to merge entire branches rather than single objects, users have the ability to perform semantically rich multi-object merges. TARDiS employs three techniques: 1. branch-on-conflict 2. inter-branch isolation 3. application-driven cross-object merge and has the following properties: 1. TARDiS knows history: TARDiS maintains a DAG of branching execution and uses DAG compression to minimize memory overhead. 2. TARDiS merges branches, not objects: TARDiS allows applications to merge branches rather than single-objects. 3. TARDiS is expressive: TARDiS supports various isolation levels. 4. TARDiS improves performance of the local site: Branching on conflict and deferring merge until later can improve performance in a local setting as well as in a distributed setting. Architecture. TARDiS is a distributed multi-master key value store with asynchronous replication. There are four main layers to TARDiS: 1. Storage layer: this layer implements a disk-backed multiversion B-tree. 2. Consistency layer: this layer maintains the DAG of execution branches where each vertex is a logical state. 3. Garbage collection layer: this layer performs DAG compression and record pruning. 4. Replicator service layer: this layer propagates transactions. Interface. Applications use TARDiS in either 1. single-mode, where transaction execute on a single branch, or 2. multi-mode, where a transaction can read from multiple branches and create a new merged branch. TARDiS provides an API to find the set of conflicting writes for a set of branches, the find the fork points for a set of branches, and to get the versioned values of objects. It also supports varying levels of begin and end constraints including serializability, snapshot isolation, read committed, etc. Here's an example TARDiS application that implements a counter. ``` func increment(counter) Tx t = begin(AncestorConstraint) int value = t.get(counter) t.put(counter, value + 1) t.commit(SerializabilityConstraint) func decrement(counter) Tx t = begin(AncestorConstraint) int value = t.get(counter) t.put(counter, value - 1) t.commit(SerializabilityConstraint) func merge() Tx t = beginMerge(AnyConstraint) forkPoint forkPt = t.findForkPoints(t.parents).first int forkVal = t.getForId(counter, forkPt) list<int> currentVals = t.getForId(counter, t.parents) int result = forkVal foreach c in currentVals result += (c - forkVal) t.put(counter, result) t.commit(SerializabilityConstraint) ``` Design and Implementation. When a transaction begins, TARDiS performs a BFS from the leaves of the state DAG to find the first state satisfying the begin constraint. When a transaction is committed, TARDiS walks down the state DAG as far as possible until a state is reached that doesn't satisfy the end constraint, branching if necessary. Branches are represented as a set of (b, i) pairs to indicate the bth child of the ith node. Keys are mapped to a topologically sorted list of versions used during reading. The garbage collector performs DAG compression, eliminating unneeded states in the state DAG. It also prunes record unneeded record versions after DAG compression. |
[link]
For a traditional single-node database, the data that a transaction reads and writes is all on a single machine. For a distributed OLTP database, there are two types of transactions: 1. Local transactions are transactions that read and write data on a single machine, much like traditional transactions. A distributed database can process a local transaction as efficiently as a traditional single-node database can. 2. Distributed transactions are transactions that read and write data that is spread across multiple machines. Typically, distributed databases use two-phase commit (2PC) to commit or abort distributed transactions. Because 2PC requires multiple rounds of communications, distributed databases process distributed transactions less efficiently than local transactions. This paper presents an alternative to 2PC, dubbed Localizing Executions via Aggressive Placement of data (LEAP), which tries to avoid the communication overheads of 2PC by aggressively moving all the data a distributed transaction reads and writes onto a single machine, effectively turning the distributed transaction into a local transaction. #### LEAP LEAP is based on the following assumptions and observations: - Transactions in an OLTP workload don't read or write many tuples. - Tuples in an OLTP database are typically very small. - Multiple transactions issued one after another may access the same data again and again. - As more advanced network technology becomes available (e.g. RDMA), the cost of moving data becomes smaller and smaller. With LEAP, tuples are horizontally partitioned across a set of nodes, and each tuple is stored exactly once. Each node has two data structures: - a data table which stores tuples, and - a horizontally partitioned owner table key-value store which stores ownership information. Consider a tuple $d = (k, v)$ with primary key $k$ and value $v$. The owner table contains an entry $(k, o)$ indicating that node o owns the tuple with key $k$. The node $o$ contains a $(k, v)$ entry in its data table. The owner table key-value store is partitioned across nodes using any arbitrary partitioning scheme (e.g. hash-based, range-based). When a node initiates a transaction, it requests ownership of every tuple it reads and writes. This migrates the tuples to the initiating node and updates the ownership information to reflect the ownership transfer. Here's how the ownership transfer protocol works. For a given tuple $d = (k, v)$, the requester is the node requesting ownership of $d$, the partitioner is the node with ownership information $(k, o)$, and the owner is the node that stores $d$. - First, the requester sends an owner request with key k to the partitioner. - Then, the partitioner looks up the owner of the tuple with k in its owner table and sends a transfer request to the owner. - The owner retrieves the value of the tuple and sends it in a transfer response back to the requester. It also deletes its copy of the tuple. - Finally, the requester sends an inform message to the partitioner informing it that the ownership transfer was complete. The partitioner updates its owner table to reflect the new owner. Also note that - if the requester, partitioner, and owner are all different nodes, then this scheme requires 4 messages, - if the partitioner and owner are the same, then this scheme requires 3 messages, and - if the requester and partitioner are the same, then this scheme requires 2 messages. If the transfer request is dropped and the owner deletes the tuple, data is lost. See the appendix for information on how to make this ownership transfer fault tolerant. Also see the paper for a theoretical comparison of 2PC and LEAP. #### LEAP-Based OLTP Engine L-Store is a distributed OLTP database based on H-Store which uses LEAP to manage transactions. Transactions acquire read/write locks on individual tuples and use strict two-phase locking. Transactions are assigned globally unique identifiers, and deadlock prevention is implemented with a wait-die scheme where lower timestamped transactions have higher priority. That is, higher priority threads wait on lower priority threads, but lower priority threads abort rather than wait on higher priority threads. Concurrent local transactions are processed as usual; what's interesting is concurrent transfer requests. Imagine a transaction is requesting ownership of a tuple on another node. - First, the requester creates a request lock locally indicating that it is currently trying to request ownership of the tuple. It then sends an owner request to the partitioner. - The partitioner may receive multiple concurrent owner requests. It processes them serially using the wait-die scheme. As an optimization, it processes requests in decreasing timestamp order to avoid aborts whenever possible. It then forwards a transfer request to the owner. - If the owner is currently accessing the tuple being requested, it again uses a wait-die scheme to access the tuple before sending it back to the owner. - Finally, the owner changes the request lock into a normal data lock and continues processing. If a transaction cannot successfully get ownership of a tuple, it aborts. L-Store also uses logging and checkpointing for fault tolerance (see paper for details). |
[link]
The Fast Filesystem (FFS) improved the read and write throughput of the original Unix file system by 10x by 1. increasing the block size, 2. dividing blocks into fragments, and 3. performing smarter allocation. The original Unix file system, dubbed "the old file system", divided disk drives into partitions and loaded a file system on to each partition. The filesystem included a superblock containing metadata, a linked list of free data blocks known as the free list, and an inode for every file. Notably, the file system was composed of 512 byte blocks; no more than 512 bytes could be transfered from the disk at once. Moreover, the file system had poor data locality. Files were often sprayed across the disk requiring lots of random disk accesses. The "new file system" improved performance by increasing the block size to any power of two at least as big as 4096 bytes. In order to handle small files efficiently and avoid high internal fragmentation and wasted space, blocks were further divided into fragments at least as large as the disk sector size. |
[link]
Ed Codd proposed the relational model in 1970. As opposed to the navigational data models that came before it, the relational model boasted data independence: the ability for data storage and access methods to change independently of applications. Some worried that data independence necessitated poor performance. System R was one of the first relation databases and proved that the relational model could be implemented efficiently. System R development proceeded in three phases. Phase 0 (1974-1975) was a single-user PL/I interpreter prototype that processed a subset of SQL (e.g. no joins) using the XRM access method. The Phase 0 prototype was always intended to be thrown away. Instead, the focus was on tuning the user interface SQL. User studies and interviews were performed to increase the usability and understandability of SQL. Every tuple in the database was labelled with a TID which contained a page number. Each tuple contained pointers into separate domains, and inversions existed to map domain values to TIDs. The Phase 0 query optimizer aimed to minimize the number of fetched tuples and would perform tricks like TID intersection to evaluate conjunctions. The prototype also introduced the design that the system catalog should be stored as relations. Phase 0 brought about the following ideas: 1. The optimizer should consider more than the cost of fetching tuples. It should also take into account the costs of TID manipulation, data fetching, etc. 2. Number of I/Os would have been a better metric than the number of tuples fetched. This would have also exposed the deficiency of the XRM access method. 3. The Phase 0 optimizer was CPU bound! This encouraged the later optimizer to be a weighted cost of CPU and I/O. 4. SQL joins are very important. 5. The query optimizer was complicated; more care should be given towards simpler and more common queries. Phase 1 ranged from 1976 to 1977 and included the implementation of a full blown multi-user relational database. Phase 1 was divided into two pieces: 1. The Relational Data System (RDS) was an optimizing SQL processor responsible for query optimization. 2. The Research Storage System (RSS) was the access method that replaced XRM and was responsible for things like locking and logging. Users could query System R using interactive queries or by embedding SQL queries in PL/I or Cobol. A preprocessor would compile the embedded SQL queries into an access module using a repository of hand-compiled fragments. Of course, the compiled query plan could be invalidated over time. For example, the query plan could use an index which is later dropped. Thus, each query's dependencies were put in the system catalog and queries were recompiled when their dependencies were invalidated. Unlike the XRM, the RSS stored data directly in the tuples. This meant that certain column values were stored redundantly, but an entire row could be read in a single I/O. RSS also supported B+ tree indexes, tuple links, index scans, full table scans, link scans, tuple nested loop joins, index nested loop joins, and sort merge joins. The query optimizer minimized a weighted sum of RSS calls and I/Os using a dynamic programming approach. It avoided using some of the TID list intersection tricks that the Phase 0 optimizer used. Views were stored as parse trees and merged back into the SQL queries used to query them. Updates were only allowed on single-table views. Views were the atomic unit of authorization using a grant/revoke mechanism. System R used a combination of logging and shadow pages to implement recovery. During recovery, pages were restored to their old shadow pages, and the log was processed backwards. Since Phase 1 was a multi-user database, it introduced multiple granularity locking in the form of intension locks. Originally, it had predicate locking, but this was abandoned because it was (1) difficult to check for predicate disjointness, (2) predicates were sometimes falsely marked as overlapping, and (3) predicate locking broke the abstraction barrier of the RSS. Phase 2 was a two-year period in which System R was evaluated. Users generally enjoyed the uniformity of SQL, and their recommendations led to the introduction of EXISTS, LIKE, prepared statements, and outer joins. The query optimizer was evaluated assuming that data was uniformly distributed and that all columns were independent. Shadow pages led to poor locality, extra bookkeeping, and semi-expensive page swapping. System R provided read uncommitted, read committed, and full serializable transactions. Read uncommitted wasn't implemented as fast as it should have been. Read committed had more overhead than expected. Serializable transactions ended up being the most commonly used. #### Commentary System R introduced a bevy of influential and perennial ideas in the field of databases. Unix introduced a bevy of influential and perennial ideas in the field of operating systems. It's no coincidence that there are a striking number of system design principles that System R and Unix---as presented in The Unix Time-Sharing System---share: 1. Unified Abstractions. Unix unified the file and I/O device abstraction into a single interface. System R unified the table and catalog/metadata API into a single interface (i.e. everything is a relation). System R also unifed SQL as the query language used for ad-hoc queries, program-embeded queries, and view definitions. System R's decision to use relations to represent the catalog can also be seen as a form of dogfooding. 2. Simple is Better. Unix started as Ken Thompson's pet project as an effort to make development simpler and more enjoyable. Unix's simplicity stuck and was one of its selling points. Similarly, System R spent a considerable amount of effort simplifying the SQL interface to make it as easy to use as possible. If a system's interface is too complicated, nobody will use it. 3. Performance isn't Everything. Thompson and Ritchie implemented Unix in C instead of assembly despite the fact that the kernel size increased by one third because C greatly improved the readability and maintainability of the system. Similarly, the System R paper comments that the relational model may never exceed the performance of a hand-optimized navigational database, but the abstraction it provides is worth the cost. Somewhat comically, today's compilers and query optimizers are so good, compiled C is likely smaller than hand-written assembly and optimized queries are likely faster than hand-optimized ones. This is an example of another systems principle of favoring higher-level declarative APIs which leave room for optimization. |
[link]
This paper introduces the B-link tree: a variant of a B+ tree (the paper says B* tree, but they mean B+ tree) which allows for concurrent searches, insertions, and deletions. Operations on a B-link tree lock at most three nodes at any given time, and searches are completely lock free. #### Storage Model We assume that every node of a B+ tree or B-link tree is stored on disk. Threads read nodes from disk into memory, modify them in memory, and then write them back to disk. Threads can also lock a specific node which will block other threads trying to acquire a lock on the same node. However, we'll also see that threads performing a search will not acquire locks at all and may read a node which is locked by another thread. #### Concurrent B+ Trees Let's see what goes wrong when we concurrently search and insert into a B+ tree without any form of concurrency control. Consider a fragment of a B+ tree shown below with two nodes x and y. Imagine a thread is searching for the value 2 and reads the pointer to y from x. ``` +-------+ x | 5 | | +-------+ / | +-------+ ... y | 1 | 2 | +-------+ ``` Then, another thread inserts the value 3 which reorganizes the tree like this: ``` +-------+ x | 2 | 5 | +-------+ / | \ +-------+ +-------+ ... y | 1 | | | 2 | 3 | +-------+ +-------+ ``` Next, the searching thread reads from y but cannot find the value 2! Clearly, concurrently searching and inserting into a B+ tree requires some sort of locking. There are already a number of locking protocols: - The simplest protocol requires searches and insertions to lock every node along their path from root to leaf. This protocol is correct, but limits concurrency. - Smarter protocols have insertions place write intention locks along a path and upgrade those locks to exclusive locks when performing a write. Searches can read nodes with write intention locks on them but not with exclusive locks on them. - Even smarter protocols lock a subsection of the tree and bubble this subsection upwards through the tree. B-link trees will do something similar but will guarantee that at most three nodes are locked at any given point in time. #### B-link Trees - Typically, an internal node in a B+ tree with $n$ keys has $n + 1$ pointers. For example, if an internal node has keys $(5, 10, 15)$, then it has four pointers for values in the range $[-\infty, 5)$, $[5, 10)$, $[10, 15)$, and $[15, \infty)$. Internal nodes in a B-link tree with $n$ keys have $n$ pointers where the last key is known as the <strong>high key</strong>. For example, if an internal node has keys $(5, 10, 15)$ then it has three pointers for values in the range $[-\infty, 5)$, $[5, 10)$, and $[10, 15)$. - In a B+ tree, leaves are linked together, but internal nodes are not. In a B-link tree, all sibling nodes (internal nodes and leaves) are linked together left to right. #### Search Algorithm The search algorithm for a B-link tree is very similar to the search algorithm for a B+ tree. To search for a key $k$, we traverse the tree from root to leaf. At every internal node, we compare $k$ against the internal node's keys to determine which child to visit next. However, unlike with a B+ tree, we might have to walk rightward along the B-link tree to find the correct child pointer. For example, imagine we are searching for the key $20$ at an internal node $(5, 10, 15)$. Because $20 \gt 15$, we have to walk rightward to the next internal node which might have keys $(22, 27, 35)$. We do something similar at leaves as well to find the correct value. Note that searching does not acquire any locks. #### Insertion Algorithm To insert a key $k$ into a B-link tree, we begin by traversing the tree from root to leaf in exactly the same way as we did for the search algorithm. We walk downwards and rightwards and do not acquire locks. One difference is that we maintain a stack of the rightmost visited node in each level of the tree. Later, we'll use the stack to walk backwards up the tree. One we reach a leaf node, we acquire a lock on it and crab rightward until we reach the correct leaf node $a$. If $a$ is not full, then we insert $k$ and unlock $a$. If $a$ is full, then we split it into $a'$ (previously $a$) and $b'$ (freshly allocated). We flush $b'$ to disk and then flush $a'$ to disk. Next, we have to adjust the parent of $a'$ (formerly $a'$). We acquire a lock on the parent node and then crab rightward until we reach the correct parent node. At this point, we repeat our procedure upwards through the tree. At worst, we hold three locks at a time. #### Correctness Proof To prove that the B-link tree works as we intend, we have to prove three things: - First, we have to prove that multiple threads operating on a B-link tree cannot deadlock. This is straightforward. If we totally order nodes bottom-to-top and left-to-right, then threads always acquire locks according to this total order. - Second, we have to prove that whenever a node modifies a B-link tree, the B-link tree still appears like a valid tree to all nodes except the modifying thread. Again, this is straightforward. The insertion procedure only makes three writes to disk. - Writing to a node that is not full clearly does not invalidate the tree. - Writing a newly allocated $b'$ node does not affect the tree because there are not any pointers to it. - Writing $a'$ atomically transitions the tree from one legal state to another. - Finally, we have to ensure that concurrently executing operations do not interfere with one another. See paper for proof (it's complicated). #### Deletion B-link trees punt on deletion. They simply let leaf nodes get underfull and periodically lock the whole tree and reorganize it if things get too sparse. |
[link]
In 1980, synchronization primitives like semaphores, monitors, and condition variables had been well studied in the literature, but there weren't any large systems implementing them. Mesa was a programming language that was being developed to write the Pilot operating system at Xerox. Due to the inherent concurrency of an operating system, Mesa was designed to ease the development of concurrent programs. The Mesa designers considered implementing a message passing interface, but deemed it too complex. They considered semaphores, but found them too undisciplined. They considered cooperative multi-threading but came upon a number of serious disadvantages: - Cooperative multithreading cannot take advantage of multiple cores. - Preemption is already required to service time-sensitive I/O devices. - Cooperation is at odds with modularity. Critical sections have no principled way of knowing if they are calling a function which yields control. Eventually, Mesa settled on implementing monitors and condition variables and exposed a number of previously undiscussed issues: - What is the interface for dynamically spawning threads and waiting for them to terminate? - What is the interface for dynamically constructing monitors? - How are threads scheduled when waiting and notifying each other? - What are the semantics of wait when one monitor calls into another monitor which also calls wait? - How are exceptions and I/O devices handled? Mesa allowed an arbitrary function call to be forked and run by a separate thread and eventually joined: https://i.imgur.com/McaDktY.png Moreover, if a forked thread was not intended to be joined, it could instead be detached via detach[p]. This fork/join style process management had a number of advantages---(i) processes were first class values, (ii) thread forking was type checked, and (iii) any procedure could be forked---but also introduced lots of opportunities for dangling references. Monitors are objects that only allow a single thread to be executing one of its functions at any given time. They unify data, synchronization of the data, and access of the data into one lexical bundle. Mesa monitors included public entry preocedures and private internal procedures that operated with the monitor locked as well as public external procedures that operated without locking the monitor. Monitors, in conjunction with condition variables, were used to maintain an invariant about an object that was true upon entering and exiting any of the monitor's methods. Monitors also lead to potential deadlocks: - Two monitor methods could wait for one another. - Two different monitors could enter each other. - A monitor M could enter a monitor N, then wait on a condition that could only be enabled by another thread entering M through N. Special care also had to be taken to avoid priority inversion. Mesa also introduced Mesa semantics, best explained with this code snippet: ``` """ Condition variables typically obey one of two semantics: 1. Under *Hoare semantics* [1], when a thread calls `notify` on a condition variable, the execution of one of the threads waiting on the condition variable is immediately resumed. Thus, a thread that calls `wait` can assume very strong invariants are held when it is awoken. 2. Under *Mesa semantics* [2], a call to `notify` is nothing more than a hint. Threads calling `wait` can be woken up at any time for any reason. Understanding the distinction between Hoare and Mesa semantics can be solidified by way of example. This program implements a concurrent queue (aka pipe) to which data can be written and from which data can be read. It spawns a single thread which iteratively writes data into the pipe, and it spawns `NUM_CONSUMERS` threads that read from the pipe. The producer produces the same number of items that the consumers consume, so if all goes well, the program will run and terminate successfully. Run the program; not all goes well: Exception in thread Thread-3: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 763, in run self.__target(*self.__args, **self.__kwargs) File "hoare_mesa.py", line 66, in consume pipe.pop() File "hoare_mesa.py", line 52, in pop return self.xs.pop(0) IndexError: pop from empty list Why? The pipe is implemented assuming Python condition variables obey Hoare semantics. They do not. Modify the pipe's implementation assuming Mesa semantics and re-run the program. Everything should run smoothly! [1]: https://scholar.google.com/scholar?cluster=16665458100449755173&hl=en&as_sdt=0,5 [2]: https://scholar.google.com/scholar?cluster=492255216248422903&hl=en&as_sdt=0,5 """ import threading # The number of objects read from and written to the pipe. NUM_OBJECTS = 10000 # The number of threads consuming from the pipe. NUM_CONSUMERS = 2 # An asynchronous queue (a.k.a. pipe) that assumes (erroneously) that Python # condition variables follow Hoare semantics. class HoarePipe(object): def __init__(self): self.xs = [] self.lock = threading.Lock() self.data_available = threading.Condition(self.lock) # Pop the first element from the pipe, blocking if necessary until data is # available. def pop(self): with self.lock: # This code is incorrect beacuse Python condition variables follows # Mesa, not Hoare, semantics. To correct the code, simply replace # the `if` with a `while`. if len(self.xs) == 0: self.data_available.wait() return self.xs.pop(0) # Push a value to the pipe. def push(self, x): with self.lock: self.xs.append(x) self.data_available.notify() def produce(pipe): for i in range(NUM_OBJECTS): pipe.push(i) def consume(pipe): assert NUM_OBJECTS % NUM_CONSUMERS == 0 for i in range(NUM_OBJECTS / NUM_CONSUMERS): pipe.pop() def main(): pipe = HoarePipe() producer = threading.Thread(target=produce, args=(pipe,)) consumers = [threading.Thread(target=consume, args=(pipe,)) for i in range(NUM_CONSUMERS)] producer.start() for consumer in consumers: consumer.start() producer.join() for consumer in consumers: consumer.join() if __name__ == "__main__": main() ``` Threads waiting on a condition variable could also be awoken by a timeout, an abort, or a broadcast (e.g. notify_all). Mesa's implementation was divided between the processor, a runtime, and the compiler. The processor was responsible for process management and scheduling. Each process was on a ready queue, monitor lock queue, condition variable queue, or fault queue. The runtime was responsible for providing the fork/join interface. The compiler performed code generation and a few static sanity checks. Mesa was evaluated by Pilot (an OS), Violet (a distributed calendar), and Gateway (a router). |
[link]
Lauer and Needham explain the duality in expressiveness and performance between - message-oriented concurrency models in which there are a small number of fixed tasks that communicate explicitly, and - process-oriented concurrency models in which there are a larger number of dynamic processes that share memory. Message-oriented systems can be characterized by the following hallmarks, consequences, and provided facilities. | Hallmark | Consequences | Facilities | |------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------| | Long standing communication channels are typically created during program initialization | Synchronization is implicitly performed in the queues connecting processes | Messages and message ids | | There are a fewer number of long lasting processes | Shared structures are passed by reference; only processes with a reference to a structure can act on it | Message channels and ports that provide the ability to Send, WaitForReply, WaitForMessage, or SendReply | | Processes don't share memory | Peripheral devices are treated like processes and communicated with | Process creation (but no deletion) | | Processes read a small number of messages at a time | | | Process-oriented systems can be similarly characterized: | Hallmark | Consequences | Facilities | |----------------------------------------------------------|----------------------------------------------------------------------|--------------------------------| | Global data can be protected and accessed via interfaces | Synchronization is performed in locks | Procedures | | Process creation and deletion is a lightweight task | Data is shared directly, with small portions being locked | fork/join procedure invocation | | Processes typically have a single job | Peripheral interaction typically involves locking and sharing memory | Modules and monitors | | Module instantiation | | | | Condition variables | | | There is a duality between the two concurrency models. Any program in one has a corresponding program written in the other. Lauer and Needham demonstrate the duality not by simulating model's primitives using the other, but by drawing similarities between the two's components: | Message-oriented | Process-oriented | |-----------------------------------------|-----------------------------------| | Processes, CreateProcess | Monitors, NEW/START | | Message Channels | External Procedure identifiers | | Message Ports | ENTRY procedure identifiers | | SendMessage; AwaitReply | simple procedure call | | SendMessage; ... AwaitReply | FORK; ... JOIN | | SendReply | RETURN | | WaitForMessage loop with case statement | monitor lock, ENTRY attribute | | arms of case statement | ENTRY procedure declarations | | selective waiting for messages | condition variables, WAIT, SIGNAL | This correspondence can be used to directly rewrite a canonical program between the two models. In fact, the differences between the two models becomes simply a matter of keyword choice. Moreover, if both models are implemented with identical blocking and scheduling mechanism, then the two models lead to identical performance as well. Since the choice of model does not affect the user or implementor, the decision of which model to use should be based on the architecture of the underlying hardware. |
[link]
SQL queries declaratively describe some subset of the tuples in a database, but they do not specify the method by which these tuples should be retrieved. The same SQL query can be implemented in many different ways. For example, a single-relation query can be implemented with a full table scan, with an index scan, with an index-only scan, etc. Each of these different methods of accessing tuples is called an access path, and it's the job of the query optimizer to select the most efficient access path. This paper explores how System R's query optimizer selects access paths. #### Processing of a SQL Statement A SQL query goes through four phases: parsing, optimization, code generation, and execution. First, the query is parsed where it is decomposed into a set of SELECT-FROM-WHERE query blocks. Then, it is optimized. The optimizer first typechecks the query against type information in the catalog. It then chooses an order to evaluate the blocks and for each block chooses an access path. Each access path is expressed in the Access Specification Language (ASL). Then, the ASL plans are compiled to machine code by a table-driven code generator that maps specific forms of joins to precompiled machine code. Subqueries are treated as subroutines. Finally, the query is ready to be executed. #### Research Storage System The Research Storage System (RSS) is System R's storage subsystem that is responsible for managing the physical layout and physical access of relations. Tuples are stored in 4KB pages, and pages are logically organized into segments. Tuples from different relations can share the same pages (each tuple is annotated with the id of its relation), but each relation is sequestered to a single segment. Tuples are accessed through the tuple-oriented Relation Storage Interface (RSI) which supports an OPEN/NEXT/CLOSE scan interface. The RSI supports full segment scans as well as B-tree backed index scans (including range scans). Moreover, the RSI accepts a set of search arguments or (SARGS)—a collection of predicates of the form column op value in disjunctive normal form—to filter the returned tuples. If a predicate is in the form column op value, we say it's a sargable predicate. #### Costs for Single Relation Access Paths System R's query optimizer tries to select an access path which minimizes cost as defined by the following formula: ``` COST = PAGE_FETCHES + w*RSI_CALLS ``` PAGE_FETCHES (the number of page fetches) is a measure of the amount of I/O a query has to perform, and RSI_CALLS (the number of calls to the RSI) is a measure of the amount of CPU a query has to perform. RSI_CALLS also approximates the number of tuples a query returns. The System R catalog maintains the following statistics which are used by the query optimizer. They are updated periodically. - NCARD(T): the cardinality of relation T. - TCARD(T): the number of pages that hold tuples from relation T. - P(T): TCARD(T) divided by the number of pages in T's segment. - ICARD(I): The number of distinct keys in index I. - NINDX(I): the number of pages in index I. The WHERE clause of a query is considered in conjunctive normal form, and each conjunct is called a boolean factor. The query optimizer estimates a selectivity factor F for each boolean factor with the following rules. | column = value | F = 1 / ICARD(column index) | If there exists an index. | |----------------------------------|------------------------------------------------------------------------------------------|-----------------------------------| | column = value | F = 1 / 10 | If there does not exist an index. | | column1 = column2 | F = 1 / MAX(ICARD(columnn1 index), ICARD(columnn2 index)) | If there exists two indexes. | | column1 = column2 | F = 1 / ICARD(columnni index) | If there exists one index. | | column1 = column2 | F = 1 / 10 | If there does not exist an index. | | column > value | F = (high key - value) / (high key - low key) | If column is arithmetic. | | column > value | F = 1/3 | If column is not arithmetic. | | column BETWEEN value1 AND value2 | F = (value2 - value1) / (high key - low key) | If column is not arithmetic. | | column BETWEEN value1 AND value2 | F = 1/4 | If column is not arithmetic. | | column IN (list of values) | F = (number of items in list) * (F for column=value) | Capped at 1/2. | | column IN subquery | F = (expected cardinality of subquery result) / (product of subquery FROM cardinalities) | | | a OR b | F = F(a) + F(b) - F(a)*F(b) | | | a AND b | F = F(a)*F(b) | | | NOT a | F = 1 - F(a) | | The cardinality of query (QCARD) is the product of the sizes of the relations in the FROM clause multiplied by the selectivity factor of every boolean factor in the WHERE clause. The number of RSI calls (RSICARD) is the product of the sizes of the relations in the FROM clause multiplied by the selectivity of the sargable boolean factors. Some access paths produce tuples in a particular order. For example, an index scan produces tuples in the order of the index key. If this order is consistent with the order of a GROUP BY or ORDER BY clause, we say it is an interesting order. The query optimizer computes the minimum cost unordered plan and the minimum cost plan for every interesting order. After taking into account the (potential) additional overhead of sorting unordered tuples for a GROUP BY or ORDER BY, the least cost plan is selected. The following costs include the number of index pages fetched, then the number of data pages fetched, and then the number of RSI calls weighted by W. | Unique index matching an equal predicate. | 1 + 1 + W | |-------------------------------------------------------------|-----------------------------------------| | Clustered index I matching one or more boolean factors. | F(preds)*(NINDX(I) + TCARD) + W*RSICARD | | Non-clustered index I matching one or more boolean factors. | F(preds)*(NINDX(I) + NCARD) + W*RSICARD | | Clustered index I not matching any boolean factors | NINDX(I) + TCARD + W*RSICARD | | Non-clustered index I not matching any boolean factors | NINDX(I) + NCARD + W*RSICARD | | Segment scan. | TCARD/P + W*RSICARD | #### Access Path Selection for Joins The System R query optimizer considers access plans with (pipelined) tuple-nested loop joins and sort-merge joins. The most critical part of choosing an access plan is choosing a join order. There are n! left-deep access plans for n relations (that's a lot). To avoid enumerating all of them, the query optimizer uses dynamic programming. First, it determines the cheapest single-relation access path for each relation and for each interesting order. Note that interesting orders now include ordering by a GROUP BY or ORDER BY clause and any joining predicates which could take advantage of the order with a sort-merge join. Then, it determines the cheapest 2-way join with each single-relation access path as the outer relation. Then, it determines the cheapest 3-way join with the 2-way joins as the outer relation. And so on. The query optimizer performs a couple of tricks to speed up this algorithm. First, it does not consider a cross-join if there are other more selective joins possible. Second, it computes interesting order equivalence classes to avoid computing redundant interesting orders. For example, if there are predicates E.DNO = D.DNO and D.DNO = F.DNO, then all three columns belong to the same equivalence class. This algorithm computes at worst (2n times the number of interesting orders) intermediate access paths. #### Nested Queries Non-correlated subqueries are evaluated once before their parent query. Correlated subqueries are evaluated every time the parent query is evaluated. As an optimization, we can sort the parent tuples by the correlated column and compute the subquery once for every unique value of the correlated column. |
[link]
Locks are needed in a database system to ensure that transactions are isolated from one another. But what exactly should be locked? At one extreme, we could lock the entire database using a single lock. This coarse-grained approach has incredibly low locking overhead; only one lock is ever acquired. However, it limits the amount of concurrency in the system. Even if two transactions operate on disjoint data, they cannot run concurrently using a single global lock. At the other extreme, we could lock individual fields inside of records. This fine-grained approach has incredibly high concurrency. Two transactions could execute concurrently on the same record, so long as they access disjoint fields! However, this approach has very high locking overhead. If the transaction needs to read a lot of fields from a lot of records, it will spend a lot of time acquiring a lot of locks. A compromise between these to extremes is to use multiple granularity locking, where a transaction can choose the granularity of its locks. For example, one transaction may lock a table, another may lock a page, and another may lock a record. Note, however, that unlike with single granularity locking, care must be taken to ensure that locks at different granularities do not conflict. For example, imagine one transaction has an exclusive lock on a page; another transaction must be prohibited from acquiring an exclusive lock on the table that the page belongs to. In this paper, Gray et al. present an implementation of multiple granularity locking that exploits the hierarchical nature of databases. Imagine a database's resources are organized into a hierarchy. For example, a database has tables, each table has pages, and each page has records. A transaction can acquire a lock on any node in this hierarchy of one of the following types: - IS: An intention shared lock on a node indicates that a transaction plans on acquiring a shared lock on one of the descendants of the node. - IX: An intention exclusive lock on a node indicates that a transaction plans on acquiring an exclusive lock on one of the descendants of the node. - S: A shared lock on a node implicitly grants the transaction shared read access to the subtree rooted at the node. - SIX: A SIX lock on a node implicitly grants the transaction shared read access to the subtree rooted at the node and simultaneously indicates that the same transaction may acquire an exclusive lock on one of the descendants of the node. - X: An exclusive lock on a node implicitly grants the transaction exclusive read and write access to the subtree rooted at the node. Transactions acquire locks starting at the root and obey the following compatibility matrix: https://i.imgur.com/ECnVvXB.png More specifically, these are the rules for acquiring locks: 1. If a transaction wants an S or IS lock on a node, it must acquire an IX or IS lock on its parent. 2. If a transaction wants an X, SIX, or IX lock on a node, it must acquire a SIX, or IX lock on its parent. 3. Locks are either released in any order all at once after the transaction or released from leaf to root. This locking protocol can easily be extended to directed acyclic graphs (DAGs) as well. Now, a node is implicitly shared locked if one of its parents is implicitly or explicitly shared locked. A node is implicitly exclusive locked if all of its parents are implicitly or exclusive exclusive locked. Thus when a shared lock is acquired on a node, it implicitly locks all nodes reachable from it. When an exclusive lock is acquired on a node, it implicitly locks all nodes dominated by it. The paper proves that if two lock graphs are compatible, then the implicit locks on the leaves are compatible. Intuitively this means that the locking protocol is equivalent to the naive scheme of explicitly locking the leaves, but it does so without the locking overhead. The protocol can again be extended to dyamic lock graphs where the set of resources changes over time. For example, we can introduce index interval locks that lock an interval of the index. To migrate a node between parents, we simply acquire X locks on the old and new location. #### Degrees of Consistency Ensuring serializability is expensive, and some applications can get away with weaker consistency models. In this paper, Grey et al. present three definitions of four degrees of consistency. First, we can informal define what it means for a transaction to observe degree i consistency. - Degree 0: no dirty writes. - Degree 1: Degree 0 plus no writes are committed until the end of the transaction. - Degree 2: Degree 1 plus no dirty reads. - Degree 3: Degree 2 plus repeatable reads. Second, we can provide definitions based on locking protocols. - Degree 0: Short X locks. - Degree 1: Long X locks - Degree 2: Long X locks and short read locks. - Degree 3: Long X locks and long read locks. Finally, we can define what it means for schedule to have degree i consistency. A transaction is a sequence of begin, end, S, X, unlock, read, and write actions beginning with a begin and ending with an end. A schedule is a shuffling of multiple transactions. A schedule is serial if every transaction is run one after another. A schedule is legal if obeys a locking protocol. A schedule is degree i consistent if every transaction observes degree i consistency according to the first definition. - Assertion 1. Definition 2 implies definition 3. That is, using the locking protocol for degree i ensures degree i consistent schedules. - Assertion 2. Transactions can pick their consistency. https://i.imgur.com/PFmcMJv.png |
[link]
Unix was an operating system developed by Dennis Ritchie, Ken Thompson, and others at Bell Labs. It was the successor to Multics and is probably the single most influential piece of software ever written. Earlier versions of Unix were written in assembly, but the project was later ported to C: probably the single most influential programming language ever developed. This resulted in a 1/3 increase in size, but the code was much more readable and the system included new features, so it was deemed worth it. The most important feature of Unix was its file system. Ordinary files were simple arrays of bytes physically stored as 512-byte blocks: a rather simple design. Each file was given an inumber: an index into an ilist of inodes. Each inode contained metadata about the file and pointers to the actual data of the file in the form of direct and indirect blocks. This representation made it easy to support (hard) linking. Each file was protected with 9 bits: the same protection model Linux uses today. Directories were themselves files which stored mappings from filenames to inumbers. Devices were modeled simply as files in the /dev directory. This unifying abstraction allowed devices to be accessed with the same API. File systems could be mounted using the mount command. Notably, Unix didn't support user level locking, as it was neither necessary nor sufficient. Processes in Unix could be created using a fork followed by an exec, and processes could communicate with one another using pipes. The shell was nothing more than an ordinary process. Unix included file redirection, pipes, and the ability to run programs in the background. All this was implemented using fork, exec, wait, and pipes. Unix also supported signals. |
[link]
Consider a hard-real-time environment in which tasks must finish within some time after they are requested. We make the following assumptions. - (A1) Tasks are periodic with fixed periods. - (A2) Tasks must finish before they are next requested. - (A3) Tasks are independent. - (A4) Tasks have constant runtime. - (A5) Non-periodic tasks are not realtime. Thus, we can model each task $t_i$ as a period $T_i$ and runtime $C_i$. A scheduling algorithm that immediately preempts tasks to guarantee that the task with the highest priority is running is called a preemptive priority scheduling algorithm. We consider three preemptive priority scheduling algorithms: a static/fixed priority scheduler (in which priorities are assigned ahead of time), a dynamic priority scheduler (in which priorities are assigned at runtime), and a mixed scheduling algorithm. #### Fixed Priority Scheduling Algorithm First, a few definitions: - The deadline of a task is the time at which the next request is issued. - An overflow occurs at time $t$ if $t$ is the deadline for an unfulfilled task. - A schedule is feasible if there is no overflow. - The response time of a task is the time between the task's request and the task's finish time. - A critical instant for task $t$ is the instant where $t$ has the highest response time. It can be shown that the critical instant for any task occurs when the task is requested simultaneously with all higher priority tasks. This result lets us easily determine if a feasible fixed priority schedule exists by pessimistically assuming all tasks are scheduled at their critical instant. It also suggests that given two tasks with periodicities $T_1$ and $T_2$ where $T_1 < T_2$, we should give higher priority to the shorter task with period $T_1$. This leads to the rate-monotonic priority scheduling algorithm where we assign higher priorities to shorter tasks. A feasible static schedule exists if and only if a feasible rate-monotonic scheduling algorithm exists. Define processor utilization to be the fraction of time the processor spends running tasks. We say a set of tasks fully utilize the processor if there exists a feasible schedule for them, but increasing the running time of any of the tasks implies there is no feasible schedule. The least upper bound on processor utilization is the minimum processor utilization for tasks that fully utilize the processor. For $m$ tasks, the least upper bound is $m(2^{1/m} - 1)$ which approaches $\ln(2)$ for large $m$. #### Deadline Driven Scheduling Algorithm The deadline driven scheduling algorithm (or earliest deadline first scheduling algorithm) dynamically assigns the highest priority to the task with the most imminent deadline. This scheduling algorithm has a least upper bound of 100% processor utilization. Moreover, if any feasible schedule exists for a set of tasks, a feasible deadline driven schedule exists. #### Mixed Scheduling Algorithm. Scheduling hardware (at the time) resembled a fixed priority scheduler, but a dynamic scheduler could be implemented for less frequent tasks. A hybrid scheduling algorithm scheduled the $k$ most frequent tasks using the rate-monotonic scheduling algorithm and scheduled the rest using the deadline driven algorithm. |
[link]
In this paper, Ed Codd introduces the relational data model. Codd begins by motivating the importance of data independence: the independence of the way data is queried and the way data is stored. He argues that existing database systems at the time lacked data independence; namely, the ordering of relations, the indexes on relations, and the way the data was accessed was all made explicit when the data was queried. This made it impossible for the database to evolve the way data was stored without breaking existing programs which queried the data. The relational model, on the other hand, allowed for a much greater degree of data independence. After Codd introduces the relational model, he provides an algorithm to convert a relation (which may contain other relations) into first normal form (i.e. relations cannot contain other relations). He then describes basic relational operators, data redundancy, and methods to check for database consistency. Commentary 1. Codd's advocacy for data independence and a declarative query language have stood the test of time. I particularly enjoy one excerpt from the paper where Codd says, "The universality of the data sublanguage lies in its descriptive ability (not its computing ability)". 2. Database systems at the time generally had two types of data: collections and links between those collections. The relational model represented both as relations. Today, this seems rather mundane, but I can imagine this being counterintuitive at the time. This is also yet another example of a unifying interface which is demonstrated in both the Unix and System R papers. |