Planet-scale event sourcing with Azure Cosmos DB

Planet-scale event sourcing with Azure Cosmos DB

Event sourcing is an application design principle that I find incredibly powerful and extremely well-suited to the requirements that we routinely have to fulfill when building medium- to large-scale systems.

In a nutshell, event sourcing says that updates to your application domain should not be directly applied to the domain state; instead, those updates should be materialized as events describing the intended changes and written to an event store. It should also be possible to somehow subscribe to the stream of events received and persisted by the event store in order to apply them to some external state - usually called view - that's more practical to query than the event stream.

principles of event sourcing

This approach brings a lot of benefits. First, this event store becomes your canonical source of truth that describes the updates applied to your domain in an unbiased form. You can apply (or rather project in event sourcing lingo) the events to any number of different views, depending on the queries that your system has to implement: from a relational database to a search engine to a data lake. Also, the very nature of the event log gives you invaluable auditing capabilities. You can literally go back in time and incrementally replay all the changes applied to your domain, something that's just impossible if you overwrite your domain state every time.

For a deeper dive into the motivations behind this design principle, I highly recommend to watch this talk by Greg Young.

Event sourcing becomes even more powerful if you combine it with Domain-Driven Design where each aggregate manages its own local state and has its own dedicated event stream within the event store.

event sourcing and domain-driven design

This way of modeling your domain allows for greater scalability as each aggregate can be considered as an independent and isolated (micro?) service, with all the benefits this can carry in terms of development, deployment and operations.

The challenges of global event sourcing

The software ecosystem around the implementation of event sourcing techniques has become pretty mature over the past 5 years or so:

  • Specialized data stores that meet the particular requirements of an event store (like exposing events as streams that one can subscribe to) have emerged; see Event Store or Apache Kafka
  • Framework and libraries that provide building blocks for event-sourced applications are available on most platforms; see EventFlow on .NET or Axon Framework on Java

But deploying and operating an event-sourced system on a large and possibly global scale (i.e. with no assumed limit on the number of events and spread over multiple datacenters across the world) presents very specific challenges. Not only do you have to make sure that your system can sustain high throughputs of events, but you also have to make that happen reliably over different, replicated locations.

In the rest of this article, I will explore how Azure Cosmos DB can serve as a distributed event store and how it yields unique benefits in such a use-case.

Funnily, I've realized that one of the very first blog posts I've ever written was on the very topic of multi-datacenter event sourcing! It's rather fascinating to witness how new products and platforms enable complex scenarios to be implemented with ever-increasing levels of simplicity, performance and reliability.

Use the change feed to subscribe to events

As mentioned above, an event store should expose new events being logged as streams that one can subscribe to. Although it is possible to poll for these changes, it's much more efficient to have the new events being pushed to whatever process that needs to react on them (typically, projecting the events into views).

Cosmos DB exposes a change feed that streams the changes happening to each of your database's containers; so every time an event is written, the container's change feed pushes the new document to anyone listening. There are many ways to consume this stream:

The Cosmos DB change feed doesn't only stream new events in real-time, but can also replay the sequence of logged events from either a particular point in time or the very beginning (i.e. the creation of your container). This is necessary if you've updated the way you are projecting your events to a view or added a new view to your system, which requires to feed all past events through your updated or new projections.

Note that it's currently not possible to apply filters to the change feed, so every subscriber receives all the events being logged to the container. This means that if you've applied some Domain-Driven Design to your data model and have different sets of projections for each aggregate type, you would have to implement some basic event routing to dispatch the events received from the change feed to the appropriate projections.

Limitless scaling of event storage

In most projects, starting with an initial system design that can accommodate massive scale is arguably some kind of premature optimization. You will most probably have to cope with moderate traffic at the beginning... and no traffic at all if the product you're working on doesn't take off (been there, done that!). If your product does meet success, then you can incrementally update your architecture to make it more scalable, as long as your initial design provided for modularity and extensibility.

That story is a bit different with event sourcing. In an event-sourced system, that ability to incrementally improve your overall scalability is actually made possible by the event store, as it lets you replace or add new views by replaying all historical events whenever required. This means that the event store really becomes the cornerstone of your data orchestration strategy, which in turn makes it challenging to replace or upgrade without significantly impacting your operations.

That's a use-case where Cosmos DB's elastic scalabity really shines. By following basic - but essential - data modeling techniques, it's entirely possible to design an event store that can accommodate literally any kind of scale and any volume of events.

Cosmos DB achieves this by partitioning the data it stores. Each document (or event in our case) is assigned to a logical partition and each logical partition is stored in a physical partition (which roughly translates to a server instance). The shuffling of logical partitions across physical partitions is transparently managed by Cosmos DB, re-balancing your data across different servers and adding more servers to your usage without any downtime. So what is left for us to take care of is the assignment of events to logical partitions, and that's probably the most important bit to understand in order to design a scalable data model over Cosmos DB.

When creating a Cosmos DB container, we have to specify a partition key, which is a field that the database will look for in our documents to decide which logical partition it belongs to. So all documents with the same partition key value will end up in the same logical partition. Now what's also crucial to understand is that a good partitioning strategy ensures an even distribution of documents across logical partitions and an even distribution of requests across those partitions. In particular:

  • write requests should hit a wide range of different logical partitions to avoid "hot" partitions that would become write bottlenecks,
  • most read requests should ideally hit one and only one logical partition, so Cosmos DB can directly target the physical partition (i.e. the server) that has all the required data without the need to aggregate results across multiple servers.

In practice, finding a partitioning strategy that ticks all these boxes is not always a trivial exercise, but in the case of an event store data model it happens to be ridiculously simple! By using a concatenation of the event's aggregate type and aggregate ID (like user-123 for example), you make sure that all the events related to a single aggregate end up in the same logical partition. As a consequence:

  • write requests will hit different partitions,
  • the most frequent read request, which is retrieving all events for a specific aggregate in order to hydrate its state, will hit a single partition.

As it is currently not possible to specify a partition key that's composed of multiple fields, we have to add a composite property to our events that is the actual concatenation of aggregate type and aggregate ID, and use that field as the partition key. This technique is also described here.

Note that there is a hard limit of 10 GB of storage per logical partition, but I guess it's pretty safe to consider that 10 GB should be more than enough to store all events related to one aggregate (otherwise the way you've modeled your domain into aggregates may rather be flawed).

Guaranteed performance

In an event-sourced system, the event store is the backbone of your data processing pipeline and as such, must ensure strong guarantees in terms of:

  • availability; it's hardly possible to run in any kind of degraded state if the event store is down, as you can't accept any write request and may serve stale reads if you've stopped projecting events to your views,
  • latency; low response times mean faster writes and faster consistency of the views,
  • throughput; limiting the event store's throughput directly limits the overall throughput of your application.

Cosmos DB provides financially backed Service Level Agreements covering each of those metrics and more. Also, it gives the unique ability to elastically scale its throughput to accommodate your application's performance requirements; you can adjust each of your containers' throughput from hundreds to millions of requests per second by dragging a cursor or issuing an API call.

Going global in a click

We usually think about scalability in terms of request throughput or data size, but applications that have a global user base also need to eventually expand the geographical footprint of their infrastructure. By multiplying their points of presence across the globe, they ensure minimal latency and optimal user experience wherever their users are.

Geo-scaling stateless compute has become pretty straightforward as you just have to deploy your code to different datacenters. It's much more complicated when it comes to data! You obviously want your data store to be as close to the compute as possible, otherwise the benefits of geo-scaling would be greatly limited. This means that your data has to be replicated across every point of presence so each server has direct, low-latency access to the data.

One of the biggest advantages of event sourcing is that your event log is your single-source of truth. Because all the views contain data that is derived from the event log, they can be discarded and rebuilt at will without any fear of data loss. So geo-scaling an event-sourced system only requires the geo-replication of the event store, with all derived views maintained locally.

global replication of the event store

The only problem is, replicating data is usually hard and replicating it globally is even harder. You have to make sure that your data remains consistent across all replicas, minimize the latency of the replication process, guarantee the overall availability of your cluster... These are responsibilities that you want to hand over to a solution that has a proven track record of robustness and reliability.

Cosmos DB takes the pain out of global replication by providing a real (and currently unmatched) turnkey experience. Replicating a container is simply achieved by selecting which regions you want your container to be replicated to and hitting "Save" from the Azure portal:

adding regions to cosmos db replication

This can also be done programmatically as the same level of configuration is available from the Cosmos DB REST Resource Provider.

Cosmos DB makes the geo-replicated nature of your event store completely transparent to your application, as its multi-homing API seamlessly directs your requests to the nearest available region. This means that you can add new points of presence by simply replicating your event store to a new region and deploy your app there, without any change in your application code.

Ensure event log consistency with unique keys

As easy as it is to expand our global footprint, we have to keep in mind that adding more points of presence to our application also increases the level of concurrency that we have to deal with at the event log level. Read concurrency isn't a problem really as the partitioning strategy we've detailed above ensures an ideal distribution of events across logical partitions that map to individual aggregates. Write concurrency, however, requires more attention.

Although it is possible to control the write access to the event store from a single aggregate in a single region (I would personally do that with an actor framework like Orleans or Akka.Net to implement aggregates as single-threaded singletons), you can't realistically practice the same control with aggregates spread over multiple regions without drastically limiting your scalability. In other words, you may have multiple instances of the same aggregate, living simultaneously in different regions and issuing conflicting events concurrently.

The trick here is to rely on the event sequence number. Each event stream (that is, related to a single aggregate) is composed of a series of events with monotonically increasing sequence numbers; every time an event is added to the stream, we take the last event's sequence number and increment it. So each and every event can be uniquely identified by the combination of its aggregate type, aggregate ID and sequence number. All we have to do then is to declare a unique key composed of these 3 properties so any attempt to create an event that matches those properties on an existing event will fail with some explicit error. Getting this error would mean that an other instance of the aggregate has concurrently updated the event stream, so we should refresh our aggregate's state and try again.

Note that unique keys are only scoped to logical partitions, meaning that you can have duplicate keys across different partitions. That isn't an issue at all in our case as we have already partitioned our events by aggregate type and aggregate ID; another sign that Cosmos DB's features and constraints fit the event store use-case really well!

Maximum write availability

The replication method described so far lets you add read regions to your database, effectively increasing your read availability across the globe. By default, this setup still has a single write region, which means that read requests will be routed to the nearest read regions by the multi-homing API, but every write request will hit the same, single write region. While this may be adequate for read-heavy applications, you may also want to improve the write latency of your event store; two different approaches are possible here.

Trigger a manual failover to change the write region

The first option is suitable for global applications that see most of their traffic during business hours, or most generally at day time. Here you want to maximize your write availability around midday in each of the regions where your application is deployed. This technique is sometimes nicknamed "follow-the-sun", as you slide your resources across the globe over a 24 hour period.

This can be achieved by triggering a manual failover of your write region. This feature was initially developed to let businesses test their resilience to issues and outages, but it's also a convenient way to change the location of your write region on a schedule. Note that the failover operation is executed without any downtime or data loss, so it really is a safe way to perform this operation. If you're interested in learning more about this technique, check this great blog post by Cloud Developer Advocate Anthony Chu.

Leverage the new multi-master capability

If your data access patterns are not as predictable and you need to ensure the best write availability anywhere and anytime, then your application may be a good candidate for the new multi-master support that was announced at the Build 2018 conference. Just as its name implies, this feature lets you have more than one region accepting writes within your global deployment. Cosmos DB ensures that every event written in every write region gets replicated to all other regions.

Multi-master support is still in private preview at the time of this writing and I didn't have the opportunity to test it yet, so I'm not able to go into further details for now. Notably, I'm not sure how the unique key constraint we described above is enforced over multiple write regions - I would assume that any violation of the constraint ends up as a conflict that would have to be resolved. I will update this section as soon as I've clarified this part.

Ready to take on the world!

If you want to start prototyping an event store based on Cosmos DB, you can use the local emulator available for Windows or try Cosmos DB for free for a limited time.

Whether you have already implemented event sourcing solutions on Cosmos DB or any other data store, or plan to design such a solution and seek some advice, please feel free to share your feedback in the comments below!