A CRDT Primer Part II: Convergent CRDTs

In Part I of this series, we looked at the basics of order theory in order to explore the concept of a join semi-lattice, which is the basis for Convergent CRDTs (or CvRDTs). If you haven’t read that post, I highly recommend reading it before continuing since we’ll be building on it here. In this post, we’ll look at CvRDTs in detail, starting with how they work in general before implementing a simple example of a grow-only distributed counter.

Convergent CRDTs

Convergent CRDTs (or CvRDTs) are replicated data structures that, when merged, converge towards a value. There are two basic components of a CvRDT that we need to keep in mind. First, we have state. All the possible states are elements of a set, and that set is ordered by some binary relation. Imagine, for example, that our state is a counter. We can think of all the possible states as all the integers, and we can think of our order as less than or equal to on those integers.

The other key component of CvRDTs is a merge() function. The whole point of CRDTs is to replicate state across nodes. We need a merge function in order to ultimately keep that state in sync. For CvRDTs, the merge function acts as a join for our order.

This will be clearer if we look at some examples. Again, we can think of our set as a set of integers ordered by less than or equal to. In this case, integers are the possible states and less than or equal to is the order on those states. Our merge function needs to act as a join in terms of our order.

Recall that for a total order, the join of any two elements is going to be one of those two elements. And for less than or equal to, the join is always going to be the greater of the two integers. This means that our merge function can simply be max(). Here are a few examples:

$$merge(1, 3) = 3$$ $$merge(9, 5) = 9$$ $$merge(8, 8) = 8$$

In each case, the max of the two integers is the least upper bound on those integers.

Now imagine that our set of states is not the integers, but vector clock timestamps. In this case, we can use coordinatewise max as our merge function. Here are a few examples:

$$merge((1, 0, 0), (0, 1, 1)) = (1, 1, 1)$$ $$merge((0, 0, 0), (2, 0, 2)) = (2, 0, 2)$$ $$merge((5, 3, 1), (1, 9, 2)) = (5, 9, 2)$$

Finally, consider the case where our set of states are locations and our order is located-in. In this case, we can use “least-common-enclosing-location” as our merge function. By least-common-enclosing-location, I simply mean the smallest location that contains both locations we’re merging. Here are a few examples:

$$merge(Seattle, Mumbai) = Earth$$ $$merge(Bronx, NYC) = NYC$$ $$merge(Mumbai, Delhi) = India$$

If this is all a little unclear at this point, don’t worry. We’re going to continually return to these same examples in what follows. But the important point to keep in mind is that, when defining a CvRDT, we have to determine a set of states, a relation $\leq$ that orders that set, and a merge function that acts like a join for that order.

Systems

Before getting to CvRDTs proper, it will be useful to introduce a couple of concepts that will help us work up to them. First, let’s call a set of available states a system. Here are a few examples, using the states we’ve discussed so far:

$$[2, 5, 7]$$ $$[Seattle, Delhi, Mumbai]$$ $$[(0, 0, 1), (1, 0, 0), (1, 1, 0)]$$

It’s important to draw a distinction between the particular set of states we have at the moment (what I’m calling our system) and the background set. In our first example above, our system consists of three integers, $2$, $5$, and $7$. Our background set is the set of all integers. In order to keep this clear, I’m going to start talking about systems and background sets from now on.

Here’s an interesting fact. For any system of states with a merge operation that acts as a join and that’s defined for all pairs in the system, we can draw a join semi-lettice diagram. Let’s look at our three example systems in turn. For our system of three integers, we can draw the following diagram:

Integer System

Recall that what makes something a join semi-lattice is that for any two elements in the diagram we can find a least upper bound on both of them. In this case, any two elements can be directly related by less than or equal to, and the join is simply the max of the two. For a more interesting example, we need to look at a partial order, like located-in. So let’s turn to our system of three locations. If you drew a diagram of these three locations alone, it would look like this:

Location System

None of these elements can be compared directly to each other in terms of located-in. So if this system were also our entire background set, then we couldn’t draw a semi-lattice diagram. Luckily, our background set here is all locations. So we can proceed to transform this diagram into a semi-lattice diagram by progressively taking the joins of pairs of elements. And as we’ll stress later, it doesn’t matter what order we take these joins in. So let’s start by adding the join of Mumbai and Delhi to the diagram:

Location System

Notice that we’re taking an element from the background set that wasn’t originally part of our system and adding it to our diagram. In this case, it was India. I’ve marked it a different color to indicate that it’s not actually part of our system (yet).

Now we continue by choosing another pair, Seattle and Mumbai. In this case our join is Earth:

Location System

Finally, let’s take the join of Seattle and India. Well, that’s just Earth again, which means that we don’t add any more locations to the diagram, but we can add a new arrow from India to Earth:

Location System

Now if you look carefully at our diagram, you’ll see that no matter which two elements we pick, we can find the join on the diagram. That’s because Earth is an upper bound on everything else in the diagram, so we can always at least get to that upper bound.

As long as our background set and our $\leq$ relation form a join semi-lattice, then for any system, we can always draw a corresponding semi-lattice diagram by pulling elements join by join from the background set. This leads us to our next important concept. Let’s define the value of a system as the upper bound on the corresponding semi-lattice diagram. Here are a few examples of taking the value of a system:

$$value([2, 5, 7]) = 7$$ $$value([Seattle, Delhi, Mumbai]) = Earth$$ $$value([(0, 0, 1), (1, 0, 0), (1, 1, 0)]) = (1, 1, 1)$$

The key point is that the states in a system converge toward the value of the system as we merge them. Imagine choosing pairs of states at random from a system and merging them, each time adding the result of the merge to the system. This process should eventually add the value to the system. Now, each merge acts as a join. And the properties of joins ensure two important things:

  1. The order of merges doesn’t matter. This is guaranteed by the associativity and commutativity of joins.

  2. It doesn’t matter how many times we repeat a particular merge. This is guaranteed by the idempotence of joins.

Implementing a CvRDT

We now have everything we need to implement our first CvRDT. Let’s think of a system as corresponding to a network of nodes, each node containing its own version of global state. For example, here’s a network corresponding to the system of integers from above:

Network

If our nodes pass states around randomly, merging any incoming states, they would all converge toward the value of the system. In this case, the value of our system is 5. That’s because 5 is the upper bound on the three states in the system. As we pass integers around from node to node, we perform a merge by updating our state to the max of our local integer and the incoming integer. The following animation should help make this clear:

Network Animation

The great thing about CvRDT’s is that they allow us to abstract away these network/system details. Let’s implement a counter to illustrate this idea.

Our counter has a simple interface:

increment(): increment the counter

value(): gets the value of the counter

We’d like for this counter to be replicated across three nodes. The idea is that users should be able to interact with any one of these three nodes but should never see inconsistent results as long as they remain connected to the same node. Furthermore, we need these nodes to stay in sync over time.

Abstractly, whenever some user calls increment() on any three of the nodes, it should increment the value of our system. This is because our replicated counter is keeping track of all of the increment() calls by all of the users. This means we can draw our system as follows, where calls are abstractly made to the system as a whole (even though in reality they are always made to a particular node):

Calls to System

The starting value of our system is $0$. Now imagine that increment() is called once on node $X$, twice on node $Y$, and three times on node $Z$. The value should then be equal to six. But remember the important point about the value of a system: The value doesn’t necessarily exist on any one node. Instead, it’s the upper bound of our join semi-lattice. That is, it’s the value towards which our merges are converging.

The idea is that the value will eventually be reflected in all nodes. And no coordination is required beyond passing states around. The order of merges doesn’t matter. And it also doesn’t matter how many times we repeat a particular merge. What this means is that we can simply pass states around whenever it’s convenient. There is no need to keep track of where merges have occurred in the past or in what order they’re going to occur.

Let’s try to actually implement our counter. We’re going to start with what’s called a G-counter, or grow-only counter. Our interface is just increment() and value(). Recall that we need two things:

  1. A state type $S$ ordered by some $\leq$ relation.

  2. A merge() operation that acts as a join for our order <$S, \leq$>.

Each node is going to have a value we’ll call local state which represents that node’s view on the current value of the system. We need to be able to update local state via an increment() call on that node. And we also need to be able to read local state via a value() call on that node. To begin, we’ll just naively represent local state as an integer.

Now we need to think about merge. Remember that a node can receive state from another node at any time. It shouldn’t matter which node it’s receiving it from and it shouldn’t matter if it’s already received that same state. Regardless, our local state should always be converging toward the value of the system, which is just the total number of times increment() has been called anywhere in the system.

A naive implementation of merge() would be to simply add the value of the incoming state to our local value. But this naive approach will definitely fail. The problem is that addition is not idempotent. If we merge in $5$ multiple times, we’ll keep adding $5$ to our local total. And that means that pretty soon we’ll run past the value of the system. This implementation of merge() is not acting as a join.

In our last post, we saw that max() over the integers acts as a join. So another naive approach would be to just take the max of the incoming state and our local state. But imagine the following history:

  1. Node 1 increments 3 times.

  2. Node 2 increments 2 times.

  3. Node 3 increments 1 time.

What is the value of the system after all these calls? Since the value is the total number of times increment was called anywhere in our network, the answer here is $6$. The following animation shows what would happen though if we use max() on integers as our merge function:

Nodes Max() Animation

We begin with three integers in our system: $3$, $2$, and $1$. No matter how many times we call max() between any randomly chosen pair, we’re never going to get a value higher than $3$. But the value toward which we’re converging should have been $6$. We need to try again.

A much better approach borrows from vector clocks. Instead of using an integer as our local state, we use a vector of integers. Each element in the vector corresponds to a node. So in our last example we would start with the following distribution of local states:

$$X: (3, 0, 0)$$ $$Y: (0, 2, 0)$$ $$Z: (0, 0, 1)$$

This time, in order to merge an incoming value, we take the coordinatewise max. And we implement value() as the sum of all the elements in the vector. The following animation illustrates what would happen in this case:

Nodes Coordinatewise Max Animation

Each node gradually picks up the latest value for the other nodes. And here, taking the max at a coordinate is in effect taking the latest value we’ve seen for that coordinate so far. Now that we’ve got a working implementation, let’s define our interface operations:

increment(): Increment the integer at the vector index corresponding to this node.

value(): Sums all integers in the vector.

merge(incoming_state): Replace our local state with the coordinatewise max of our local state and incoming_state.

Let’s draw the semi-lattice diagram for the system we just considered:

G-Counter System Join Semi-Lattice

We see that the vector corresponding to the value of our system is the upper bound on the semi-lattice. Our merge() function corresponds exactly to the join operation on any two of these elements. These joins converge toward the upper bound.

You can see for yourself that it doesn’t matter which order we take the joins in. It also doesn’t matter if we merge in the same value multiple times. In essence, we forget about values lower on the diagram, and either stay where we are or move up to a higher point.

In case you’d like to see this represented in code, here’s an example of a G-Counter implemented as a simple mutable data structure in Python (local state is represented as a list of integers called state_list):

class GCounter:
  def __init__(self, nodeId, state_list):
    self.nodeId = nodeId
    self.state_list = state_list

  def value(self):
    return sum(self.state_list)

  def increment(self):
    self.state_list[self.nodeId] += 1

  def merge(self, incoming):
    for idx in range(0, len(self.state_list)):
      self.state_list[idx] = max(self.state_list[idx],
        incoming.state_list[idx])

Conclusion

There are many other kinds of data structures that can be modeled as convergent CRDTs. You can have registers, sets, maps, and graphs. In each case we need to start by defining a value() method and the merge() method. And perhaps in a future post, we’ll look at how to implement some of these.

In the meantime, if you’d like to dig deeper into the theory behind CRDTs (including operation-based CRDTs, which we haven’t discussed yet), check out A Comprehensive Study of Convergent and Commutative Replicated Data Types↗(opens in a new tab) by Marc Shapiro, Nuno Preguiça, Carlos Baquero, and Marek Zawirski.