Latency-Adaptive Real-Time with Reactive Caching on Microsoft Orleans

Latency-Adaptive Real-Time with Reactive Caching on Microsoft Orleans

The Reactive Caching pattern allows many service clients to stay in sync with the latest data snapshot from an origin, regardless of their relative network latency. This pattern exploits Reactive Polling, enables Real-Time CQRS Projection Streaming and opens the doors to opportunistic Reactive Replication.

This article talks about my team’s experience applying this approach with Microsoft Orleans to solve a multi-geography user latency challenge.

TLDR;

Overview

Reactive Caching is useful when the cost of streaming individual items to machines in disparate geographical locations causes these machines to go out of sync due to network latency. Reactive Caching allows machines in different locations to stay in sync with the same data snapshot, using a best-effort approach and as fast as their network allows.

When implemented in Orleans, everything happens within application memory, often uses the least number of networks hops necessary, and does not require external distributed caching services.

Background

In the industry I work in, relative network latency is a source of constant problems.

Here is an example. Two or more users are on a conference call, on far-between geographical locations, looking at the same data screens on their desktops. Due to their locations, each of their client’s network latency relative to a common data source can vary by a significant number.

graph LR; C[Service Cluster] --> |">100ms"| SC[Slow Client]; C --> |"~50ms"| AV[Average Client]; C --> |"<0ms"| FS[Fast Client];

Their screens are alive with real-time activity from various streaming data sources. Look at the header image. That’s a stock image but they’re kinda like that. This activity happens in bursts, with many events attempting to reach each machine in quick succession to feed their active real-time views.

graph LR; C[Service Cluster] --> |10K Events| SC[Slow Client]; C --> |10K Events| AV[Average Client]; C --> |10K Events| FS[Fast Client];

Although these users are talking on the phone and looking at equivalent screens on their respective desktops, they will have a hard time seeing the same consistent data during periods of high streaming activity, when tens of thousands of individual events are attempting to reach each client machine per second. While the fastest client may indeed update in real-time, the slowest client can take several minutes to reach a consistent state.

It goes without saying, this can make our users very grumpy.

What’s Going On

While users near the source of data will often have latencies in the microsecond range, users in far-away geographical locations often have latencies over 100ms, even with expensive network links.

Classical streaming approaches to push individual data items to client applications will suffer from this when data volumes become significant. We haven’t yet commoditized quantum entanglement and there’s only so much you can send at the same time. Sending batches only helps up to a point. And geographically splitting your HPC cluster makes Alfred come with a can of worms on a silver platter, asking if we’d like some cookies and milk to help down them.

This is where the Reactive Caching pattern can give a hand.

Why Reactive Caching

Reactive Caching can replace streaming approaches when:

  • The client does not require the full event flow.
  • The client just wants to always get the latest snapshot of some data view as fast as their own network allows.
  • The client prefers skipping interim snapshots when the network cannot cope.

We happened to already use a CQRS-style projection solution to create data snapshots.

graph LR; ES1[Event Source 1] --> C1[Conflation 1] C1 --> P1[Always-On Projection 1] C1 --> P2[Always-On Projection 2] ES2[Event Source 2] --> C2[Conflation 2] C2 --> P2 C2 --> P3[Always-On Projection 3] P2 --> OP1(On-Demand Projection 1) P2 --> OP2(On-Demand Projection 2) P2 --> OP3(On-Demand Projection 3) P1 --> OP1 P1 --> OP2 P3 --> OP2 P3 --> OP3

Okay, so the above may look like a spider web, but don’t mind all the arrows.

In-Memory CQRS is super simple to implement in Orleans and it is what we use to compute and maintain real-time data aggregations plus point-in-time snapshots of such aggregations. This approach lets us turn millions of individual data items into many small aggregations of a few thousand items each tops.

That said, you don’t need CQRS to apply Reactive Caching. You need just need to have some snapshot data that you want to propagate.

Our underlying CQRS architecture meant that we already had access to latest data snapshots on-demand and in real-time. And we were already using these for plain request-response queries. All we needed was to solve the user latency problem.

What We Did

By applying the reactive caching approach, we moved from:

Streaming all data items to all clients, regardless of whom can handle them.

To:

Pushing the latest aggregated data snapshot each screen needs, at the point in time each client can handle it.

graph RL; C1[Fast Client] --> P[Projection] C2[Slow Client] --> P[Projection]

In this approach, both fast and slow clients will hang from whatever projections they are interested in. They do so by sending a message to the projection.

Upon this request, each projection decides whether to send the latest data immediately to an individual client or to delay response until some new data is available. This is very similar to long-polling in HTTP, but without the connection cost drawbacks. The MSR paper calls this reactive polling to highlight the difference.

In theory, while the slow client will skip snapshots as their own latency allows, the fast client may even flicker from too many snapshots coming through.

graph RL; C1[Fast Client] --> |Snapshot1, Snapshot2, Snapshot3| P[Projection] C2[Slow Client] --> |Snapshot1, Snapshot3| P[Projection]

In practice, the cluster will only expose at most one snapshot per projection per second, as faster updates made our users cry out for a freeze button in the user interface. That’s what happens when your real-time system is too real-time for its own good.

The neat thing about this pattern, is that there is nothing the slower or faster clients have to do, to regulate what exact snapshots they get. The immediate network latency itself acts a natural regulator of what snapshots will reach them.

How Reactive Caching Works

Here’s some C#-ish client-side pseudo-code, where the target service returns a null snapshot on graceful timeout.

while (keepGoing)
{
    string token = null;
    try
    {
        // request the latest snapshot - the first request provides a null token
        var snapshot = await service.GetNewSnapshotAsync(token);
        if (snapshot != null)
        {
            // keep the token for the next request
            token = snapshot.Token;

            // update own cache, fill a data grid, update a chart, etc
            await ApplySnapshotAsync(snapshot.Data);
        }
    }
    catch (Exception error)
    {
        logger.LogError(error, "Request failed!");
    }
}

The above is oversimplified. It goes without saying, don’t use while(true) style loops, they lead to the dark side of the Force, Luke.

The sample shows how to do this the right way, using non-reentrant timers.

Note that the above is a client concept. Reactive Caching is a pattern, not a technology. In our setup we have implemented this pattern both within Orleans and outside of it, having a mix of both .NET Windows-based and Angular-based browser clients. Orleans just happens to be an excelent fit for this.

In fact, we have expanded the base reactive caching pattern to something we’re calling Reactive Replication - the on-demand, temporaray replication of dynamic in-memory CQRS data projections across the Orleans cluster, using a well-defined hierarchy of short-lived beacon grains to minimize both data copying and the effect of in-cluster latency, while maximizing client responsiveness. This is to tie with Orleans geo-distribution. But that’s a story for another day.

Here is the server-side C#-ish pseudo-code for Orleans:

[Reentrant]
public class MyProjectionGrain: Grain, IMyProjectionGrain
{
    // this happens server-side
    public Task ApplyDataAsync(Data data)
    {
        // make use of the incoming data to the projection and create a new snapshot
        var newSnapshot = new Snapshot(
            data: /* do something smart here */,
            token: /* create a new token */
        );

        // now make the new snapshot available
        snapshot = newSnapshot;
        completion.TrySetResult(snapshot);
        completion = new TaskCompletionSource<Snapshot>();
    }

    // we use this to fulfil requests immediately
    private Snapshot snapshot = Snapshot.Empty;

    // we use this to delay fulfilment
    private TaskCompletionSource<Snapshot> completion = new TaskCompletionSnapshot<Snapshot>();

    // this is called by clients
    public Task<Snapshot> GetNewSnapshotAsync(token)
    {
        // see if the client already has the same version as the server
        if (token == snapshot.Token)
        {
            // the client already has the latest snapshot
            // leave the request pending until we have new data to provide or we reach a graceful timeout
            // on graceful timeout we just return null
            return completion.Task.WithDefaultOnTimeout(null, TimeSpan.FromSeconds(10));
        }
        else
        {
            // the client has a different version so fulfil this request now
            // note how there is no data processing going on here - it is already done
            return Task.FromResult(snapshot);
        }
    }
}

Again, this is pseudo-code, and the sample shows how to do this the right way.

Let’s see how this works step-by-step:

  1. The client makes an initial request for a given data snapshot.
  2. The server returns the latest data snapshot, plus a token representing that snapshot version. This token can be anything, as long as it is unique per version of the snapshot. Guid.NewGuid() works well, as do forever increasing integers.
  3. The client handles the snapshot (e.g. updates its own cache, or paints a data grid, or updates a chart).
  4. After this (or even during), the client asks for a snapshot again, now passing the token it received.
  5. The server now looks at the token and decides whether the client already has the latest snapshot version or whether it needs another update.
    • If the tokens do not match, the server responds immediately with the latest snapshot, along with the new token.
    • If the tokens do match, the server will delay the response until a new snapshot is available or a graceful timeout is reached. As soon as a new snapshot version is created, the server fulfills that pending request.
  6. Whenever the request resolves, the client makes use of that snapshot and then requests a new snapshot again, providing the new token. The loop repeats until the client stops it.

This client-side loop of requesting-waiting-applying is what causes natural back-pressure in the system and lets both server and client adapt to individual network latency.

And yes, delaying the response is similar to the ages-old long-polling in HTTP. However, unlike HTTP, Orleans does not incur the cost of establishing a connection per request, and instead simulates the long-poll via messaging between grains or cluster clients.

What Reactive Caching Solves

Individual data streaming, reactive observables, or any other kind of push-only approach without the ability to conflate data based on the back-pressure of an individual consumer, cannot cater for significant differences in network latency on the consumer side.

Pushing all individual data items to each consumer results in the slowest consumers struggling to keep up. If the amount of data you’re sending to a slow consumer is chronically more than it can handle, the consumer can enter a perpetual state of delay in handling that data.

Exploiting high network bandwith by pushing batches of items can help up to a point and is often enough on the server-side to scale-out streaming solutions. You may have no choice in this, if the consumer does require all individual items without exception.

However, when the consumer only cares about the latest dataset right now, then reactive caching allows the consumer to skip interim snapshots as latency forces it to, yet still adapting in real-time as the network quality changes.

In addition, some other problems just disappear with Reactive Caching. In a typical streaming scenario, you have to deal with subscriptions in some form. This forces you to think about:

  • To whom do I have to send the data to?
  • Are they alive?
  • Have heartbeats failed?
  • When must clients re-sub?
  • When must the server un-sub them?
  • Who’s getting too much data?
  • Who needs a reset to cope with too much data?

We tend to delegate these concerns to an external queue provider. That works fine, at the expense of a network double-hop to an external and often shared system, incurring the cost of its own internal latency, and of course, facing the main challenge with streaming detailed above - data volume on the client-side.

With Reactive Caching, we allow clients to connect directly to the server-side, without intermediates. We don’t manage pub-sub models because there is nothing to manage. That’s because each single request is the subscription, is the heartbeat and is the back-pressure. If a client does not receive a response to every single request within the agreed graceful timeout, then it knows immediately something is wrong.

Why Orleans

While the client-side implementation of this pattern is straightforward in any technology, the server-side implementation can be quite hard to acomplish without the help of an in-memory stateful system.

For the server-side to work, a number of things must be in the goldilocks zone:

  • MyProjectionGrain instances (or whatever equivalent) must be thread-safe (or you must take care of that yourself).
  • Instances must be long-lived - requests will keep hanging off them by design.
  • Instances must know how to recover themselves and establish initial state.
  • Instances must stay in-memory to minimize latency.
  • You must be able to scale while maintaining responsiveness. This means
    • You must be able to create many instances across your cluster.
    • For each request, you must discover what node holds what, so the request can hang off the correct instance.
    • You must be able to create new instances on-demand as new requests for different projections come in.
    • You must be able to dispose of these instances when no longer required.
    • You must be able to replicate these instances as-needed (and not more than that) to cater for hot projections. Replicate too little and a node can get overwhelmed. Replicate too much and face memory issues.

The above is something stateless server-side technology by itself - such as ASP.NET - cannot provide. Distributed caching services can help to a point but become very expensive on the wallet early on.

Yet the above is something Orleans provides out-of-the-box, by virtue of being a virtual actor system. Implementing the server-side in Orleans means implementing one or more classes such as the pseudo-code above and letting the system do its thing. We also do not need expensive distributed caching providers - or caching at all for that matter. As an Actor System, Orleans is cache already. Cache we can program in plain C#.

To help you understand how simple this whole thing is, I’ve pushed a bare-bones sample to the Orleans repository. Just clone the whole thing, and follow the instructions in the readme file.

In our setup, we use both ASP.NET and Orleans in the same cluster, each one doing what they do best. Orleans handles the stateful in-memory projection stuff while ASP.NET handles the client-friendly protocols we provide to consumers.

I plan to update the sample, or create a new one, featuring an ASP.NET front-end and an example of the reactive replication pattern we’re ironing out. But that’s a story for another post.

Lessons Learned

Not all was sunshine and rainbows. We faced some challenges making this stuff work the first time. Here are some things to watch out for, if you’re going down this route.

#1: Beware Long-polling via HTTP

Long-polling on Orleans is very cheap. Long-polling on HTTP/1.1 is very expensive. HTTP/2 multiplexing requires goldilocks conditions to run, so it wasn’t even on our table. SignalR and WebSockets are there to hold your hand.

Why is this?

Orleans does not long-poll. Instead it simulates long-polling behaviour through message sending and receiving. An Orleans node or client does not need to establish a new connection on every single request. Most often, the connections are already established in the first place, and Orleans multiplexes messages back and forth through these connections. The cheapness of this underlying infrastructure is what allows long-polling to turn into reactive caching and so on.

HTTP/1.1 does long-poll. And it ain’t pretty. For every request, the browser must establish a connection and may decide to perform a DNS lookup beforehand. This makes the request lose valuable time even before the server knows about it.

In addition, browsers will impose a connection limit per domain. For Chrome this is six connections per domain. Each active long-poll counts towards this limit. If your application is issuing a lot of long-polls at the same time, then you will start seeing things not updating when they should. That’s not something you want your users to notice.

For .NET client applications, the default connection limit is two, but you can override it with the ServicePointManager. However, our load testing showed that abusing this will lead to random socket exhaustion. The HttpClientFactory mitigates this somewhat but one cannot rely on other teams to use specific tech to access standard REST APIs.

Why do it then?

Offering a long-poll-capable RESTful interface makes it easy for consumer-side developers to test reactive caching behaviour early on using the Swagger UI, as a human being just clicking things and seeing results. It works great for proof-of-concept-ing new functionality, before comitting to it. It’s also great for troubleshooting the server-side, while the debugger is attached.

What are the alternatives?

Once you’re serious, implement high-level SignalR or low-level WebSockets, or some other multiplexing-capable communication technology. Persistent connections are one of the basic building blocks of real-time applications.

#2: Reentrancy Will Catch You Off-Guard

The Orleans-side bits of this pattern require the use of reentrant grains. If the grains were not reentrant, a single reactive poll request would hang the grain until resolved.

However, reentrancy is a double-edge sword:

On one hand, a reentrant grain can handle multiple requests at the same time, while staying single-threaded. The grain just swaps requests whenever it encounters an await somewhere or it returns an unfulfilled task back to Orleans. This behaviour allows thousands of requests to “hang off” a single grain, waiting for a future response.

On the other hand, reentrant grains are prone to bugs from the unweary developer. I’ve lost count at the amount of bugs I created due to misattention. Once you know what to look out for in reentrant code, you’re good to go, but prepare to attach that debugger many times until you gain such foresight.

#3: Monitor Your Memory And Throughput (Even More!)

At their core, Orleans grains are concurrent programmable cache units. Running an Orleans cluster is a bit like running a distributed cache cluster - but one that you can program, and shard, and integrate with all sorts of other systems, without having your data leaving memory unless you say so. Unlike a distributed cache cluster, Orleans is cheap on the network and cheap on the wallet. Like a distributed cache cluster, memory is still your number one resource.

Our CQRS-style projection model enables the on-demand creation of many, small projections of data that update in real-time on the server-side. We implement these projections as grains in Orleans. Each projection instance can satisfy requests for a particular shape and shard of data. It holds only enough data to paint the screen area it represents, e.g. a grid or a chart. Many clients can hang off a single projection. A single client can also hang off multiple projections.

This can lead to a very high number of these projection grains spawning on the cluster. While Orleans can handle millions of grains across a cluster, and each projection is small, things do pile up, both in memory and throughput.

We have therefore found it important to develop a load test that focuses on hammering the model with both:

  • Requests that require new projections all the time - to monitor memory growth.
  • Requests that attempt to saturate a single projection - to monitor throughput.

It was the feedback gathered by this load test that lead us into the reactive replication pattern we are evaluating now.

Final Thoughts

Bringing Reactive Caching and CQRS together within Orleans is enabling us to create ground-breaking solutions in this industry for fractions of complexity and cost. This does not come without risk of failure and lots of head-scratching moments. Willingness to try new stuff is something one must convince paying customers of. It’s their money on the line. Disbelief is the main challenge to overcome when doing R&D.

When your application shows real-time data as you type filters to folk who are used to very slow, cumbersome systems, their initial reaction tends to be ah I see, you’re caching stuff, when does it refresh?.

Well, yes, we’re caching… An also no, we’re not caching. There is no cache refresh time or expiry date or consistency schedule or whatnot. All that stuff does not apply anymore. The data itself is cache. We are real-time caching, if that makes any sense.

Jorge Candeias's Picture

About Jorge Candeias

Jorge helps organizations build high-performing solutions on the Microsoft tech stack.

London, United Kingdom https://jorgecandeias.github.io