Effectively-Once Semantics in Wallaroo

Wallaroo is an efficient, low-footprint, event-by-event data processing engine with on-demand scaling. Wallaroo falls into the distributed streaming category along with tools like Apache Flink, Spark Streaming, and Storm. One of our long-running design goals for Wallaroo has been to guarantee effectively-once semantics for use cases where strict correctness guarantees are important, even in the face of failure.

In this post, I’ll describe what we mean by effectively-once semantics, explore how Wallaroo is able to make this kind of guarantee, and finally describe the testing strategies we’ve employed to verify these claims.

What do we mean by “effectively-once semantics”?

Stream processing systems make certain kinds of guarantees around message processing. We can frame these guarantees in terms of the relationship between the set of inputs sent into the system and the set of outputs produced by the system. In the ideal case, a set of inputs will correspond to a set of valid outputs.

For deterministic applications, there will be one set of valid outputs. For nondeterministic applications, there can be more than one set of valid outputs, since different executions can lead to different results.

In what follows, we assume that any internal application logic is bug-free, meaning that, if a subset of inputs is successfully processed by a computation, that computation will always generate a valid subset of outputs.

When talking about stream processing semantics, we’re particularly interested in the point at which the outputs corresponding to a given input are “committed.” If an output is committed, then its effects are observable outside the closed system internal to Wallaroo.

For example, outputs can lead to updates to a database, or they can be written to a file or Kafka topic. Stream processing semantics involve guarantees around the following properties:

  1. Do the committed outputs leave out expected data? In this case, messages were lost.
  2. Do the committed outputs include duplicates?
  3. Do the committed outputs include incorrect data (even when the application logic itself is bug-free)?

With these properties in mind, we can distinguish five main categories of stream processing semantics: zero-guarantee, correct-if-committed, at-most-once, at-least-once, and effectively-once. Let’s explore the differences between these.

To begin with, ignoring questions about failure and distributed systems for the moment, imagine we have a simple doubler application that assigns exactly one output to each input integer. If we send inputs into this system, we expect to see one and only one output committed per input:

Expected results for a deterministic application.

This is the ideal case, and what one would like to guarantee even in the presence of failure. The outputs include both the input and the transformed value (e.g. “1:2”) as a shorthand for the fact that we need to correlate inputs to outputs in order to reason about our processing guarantees. We can understand different kinds of processing semantics by contrasting them with this ideal case in terms of “what can go wrong.”

Zero-guarantee

In the case of zero-guarantee semantics, given an application and a set of inputs, we can’t say anything ahead of time about which outputs will be committed. It’s possible that the set of outputs will be valid. But it’s also possible that some of the outputs will be duplicates.

Furthermore, it’s possible some of our outputs will have been lost. In fact, we don’t even know if the committed outputs will be correct (even with bug-free application logic). Here’s a possible run under these (effectively nonexistent) guarantees:

Zero-guarantee

In this example, we commit a correct answer for input 1, but we commit it twice. We commit no answer for 2 (it was apparently lost), and we get an incorrect answer for 3. This was just one possible run. It’s also possible to get fewer than 3 outputs or more than 3 outputs under these conditions. A system like this is untrustworthy, to say the least.

Correct-if-committed

A bare minimum for a useful system is the guarantee that all committed outputs are correct. Even with bug-free application logic, there are a number of ways things can go wrong. Data can be corrupted over the network, or by the framework itself. Or perhaps the framework is buggy and routes messages to the wrong computations.

Whatever the possible cause, a system must rule out this kind of corruption through extensive testing. Correct-if-committed makes no guarantees about lost or duplicated messages, but it does say that only correct outputs will ever be committed. Here is a possible run under this guarantee:

Correct-if-committed

We’ve lost the outputs for 1 and 2, and we’ve committed the output for 3 twice. But because we committed it, given this guarantee, at least we know that the doubler app’s calculations are correct, e.g., 3 * 2 is indeed 6.1 For the remaining three categories of processing semantics, we will assume that the correct-if-committed guarantee holds.

At-most-once

Perhaps for your application, if outputs are guaranteed to be correct, it’s acceptable to lose messages sometimes, but it’s crucial that the output set contains no duplicate messages. In this case, you’d prefer at-most-once semantics, which guarantees that no output will ever end up being committed twice.

From the viewpoint of any external system, each input will have been processed at most once. Fire-and-forget could achieve this goal, as long as we are able to prevent the same input from being fired twice. Here’s a possible run under this guarantee:

At-most-once

We have committed the correct outputs for 1 and 3, but the output for 2 appears to have been lost. The important point is that we have committed no duplicate outputs.

At-least-once

Perhaps you can tolerate duplicates, which are ruled out by at-most-once semantics, but you can’t tolerate lost messages. In this case, you’d prefer at-least-once semantics, which guarantees that at least one of each valid output will be committed (but says nothing about how many duplicates of each will show up). Under this guarantee we could see something like this:

At-least-once

There are no lost outputs, and all the results are correct. But because we’re only guaranteeing at-least-once, in this case we’ve ended up committing duplicate outputs for 1.

Effectively-once

If you want to rule out both lost messages and duplicates, then you need effectively-once semantics. Effectively-once guarantees that for a given application and set of inputs, the set of outputs committed will be exactly equivalent to some set of valid outputs. This output set will include all expected data, and will include no duplicate data.

We use the term “effectively-once” rather than the more familiar “exactly-once” because it is clearer that we do not mean to say, for example, that every output is literally sent over TCP exactly once. The guarantee is that from the viewpoint of any external system, it will look as though each input was processed exactly one time.

So with our deterministic Doubler application, and with the inputs 1, 2, and 3, we guarantee the following result:

Effectively-once

Every input has a corresponding (correct) output. And no output has been committed more than once.

How does Wallaroo guarantee effectively-once semantics?

Wallaroo supports end-to-end effectively-once semantics, provided that producers are replayable and consumers can participate in checkpoint-based transactions with Wallaroo sinks (or, in the case of a certain class of deterministic applications, if consumers are idempotent). Even in the absence of these guarantees at the producers and consumers, Wallaroo provides consistent distributed snapshots taken at tunable intervals that can be used to roll back in the presence of failures.

In order to achieve effectively-once semantics, we need to guarantee no committed outputs are lost or duplicated, and that all committed outputs are valid outputs. In a completely deterministic application, “all outputs are valid outputs” means that the one and only correct set of outputs is committed by a consumer. However, there are many ways that nondeterminism can enter into Wallaroo applications. Here are some examples:

  • Because of parallelism and Wallaroo’s key-based partitioning scheme, Wallaroo only guarantees ordering by routing key and not across keys. If application operations are not commutative, this can lead to different outputs on different runs of the same inputs (e.g. if the final operation is appending to a list).
  • If the operations themselves are not deterministic (e.g. they use clock time or randomization), then different runs with the same inputs can lead to different outputs.
  • Depending on how window policies are set via the data windowing API, receiving the same inputs at different times can lead to different outputs. For example, just because an input is received before a configured late data threshold on one run does not mean it won’t be considered late on another run where the relative timing of inputs is different.

There are other examples, but the point is that we must account for possible nondeterminism from run to run, given the same inputs. This is why “correct” is defined as an outcome in which “all outputs from Wallaroo are valid outputs given the set of input messages”. Think of “valid outputs” as the set of valid sets of outputs (each one corresponding to a possible valid execution).

In the face of failure, there are three core challenges that we must overcome in order to guarantee effectively-once semantics:

  1. Ensure we can replay any messages from a producer that were lost so that the corresponding outputs can be committed at the consumer.
  2. Ensure that any outputs we already committed do not end up getting committed at the consumer again when we replay.
  3. Ensure that we can recover any intermediate state that will not be reconstructed by replaying lost messages.

To achieve these three goals, we use a checkpointing protocol to periodically take global snapshots of Wallaroo state, including metadata about which inputs we have already seen and which outputs we have committed to a consumer.

Our protocol is based on the Chandy-Lamport algorithm↗(opens in a new tab) and some of the modifications proposed in “Lightweight Asynchronous Snapshots for Distributed Dataflows”↗(opens in a new tab). We chose our protocol because it ensures consistent checkpoints while adding relatively little overhead, and without the need to stop processing globally.

On recovery from failure, Wallaroo rolls back to the last successful checkpoint. The system can then request replay from producers when necessary. For any checkpoint, we partition all messages processed into those that happened before the checkpoint and those that happened after the checkpoint.

This means that for each producer, we know the last input received before the checkpoint, and can request that the producer starts replaying at the message immediately after that one. Wallaroo will also drop any inputs that it has already successfully processed to avoid duplicate processing.

Notice that we are assuming the producer has the capability of replaying from a given point in the input stream. If the producer does not have this capability, then we can’t even guarantee at-least-once semantics. For example, perhaps the producer believed it sent a message, but Wallaroo crashed before receiving it. No matter what Wallaroo does at this point, if the producer can’t replay the message, we’ve lost access to that message.

Things are even trickier on the consumer side. We must guarantee that all outputs are committed at the consumer, and that they are committed only once. But we must also guarantee that the set of outputs committed is valid.

Notice that idempotence at the consumer (without further qualifications) is not sufficient for effectively-once from that consumer’s perspective. That’s because, in the presence of nondeterminism, the same checkpoint subset of inputs can lead to a different set of outputs. Whenever we roll back to a checkpoint, we replay all messages starting from where that checkpoint left off.

Imagine we roll back to the same checkpoint 3 times in the course of a run. Each time, let’s say we replay only one message successfully before crashing again. In the presence of nondeterminism, we might produce a different number of outputs corresponding to that input each time.

Even with idempotence at the consumer, it might end up committing all of these outputs, which might not be a possible result of a single valid execution. Hence, we fail in this case to guarantee effectively-once processing. This is only one example of how things can go wrong if you rely solely on idempotence in the presence of nondeterminism.

Because of challenges like these on the consumer side, we use a 2-Phase Commit (2PC) protocol for consumers. This allows us to ensure that all (and only) outputs associated with a successful checkpoint are committed. In its first phase, this protocol waits for all consumers to durably pre-commit messages related to a checkpoint. In the second phase, if every consumer has successfully pre-committed, we tell every consumer to commit. Otherwise, we abort the checkpoint.

Wallaroo provides a producer/consumer protocol that allows different consumers to implement pre-commit and commit in different ways. A message should only be “processed” by a consumer if it has been committed according to 2PC. Here is a diagram showing the steps of a successful 2PC:

2 Phase Commit

Every checkpoint triggers 2PC, which involves all consumers participating in the checkpoint, and the checkpoint is only successful if the 2PC succeeds. There are two crucial properties ensured by 2PC:

  1. A consumer will never commit outputs unless they are associated with a successful checkpoint (meaning that we could roll back to that checkpoint, and that the inputs associated with that checkpoint will never be replayed). [Safety property]
  2. All outputs associated with a successful checkpoint will eventually be committed. [Liveness property]

If we are going to use checkpoints to support effectively-once semantics, we need to make sure that the checkpoints themselves are stored in a resilient fashion. After all, machines containing those checkpoints could die and their file systems could be lost. In order to address this problem, we need to use some form of data replication.

When Wallaroo is run in data replication mode, all file-altering I/O to resilience and checkpoint-related files is written to journals, or write-ahead logs. The journal files are replicated to a remote file server, to protect against catastrophic worker failure.

It was an explicit goal to limit any Wallaroo journal replication service to as few features as possible. For example, the ability to do random write I/O can violate safety: overwriting part of a journal can make Wallaroo roll back to impossible or incorrect state.

A limited-capability file server can stop bad behavior by a buggy client by simply not being able to do a bad thing. Wallaroo’s remote file server is called “SOS”, the Simple Object Service. It is quite similar to the FTP protocol, except with even fewer operations and with a binary protocol instead of ASCII-based protocol.

Conclusion

One of the long-running design goals for Wallaroo has been to guarantee effectively-once semantics. This means that, even in the face of failure, we guarantee that no messages will be lost, duplicated, or corrupted if all producers are replayable and all consumers can participate in our 2-Phase Commit transaction protocol. We support resilience through a lightweight, asynchronous checkpointing protocol and the journal-based replication scheme described above.

We’ve subjected our system to a variety of rigorous tests, checking that under a variety of crash scenarios, no expected outputs are lost, duplicated, or incorrect. But that’s a topic for another post!

Further Reading


  1. In general, not all inputs must have corresponding valid outputs. For example, if the app logic filters out a certain class of inputs, then we should not expect outputs corresponding to them in our set of committed outputs. In this doubler example, we expect every input to have a corresponding output, since it’s a simple one-to-one function. Since correct-if-committed makes no guarantees regarding lost outputs, however, it only tells us that any outputs committed will be in the set of valid outputs. Some outputs might be missing, as in the diagram. ↩︎