Denormalizing your data with Azure Functions and Cosmos DB's change feed

Denormalizing your data with Azure Functions and Cosmos DB's change feed

In this article, we're going to explore denormalization, a common data processing pattern and how it can be easily implemented using Azure Functions to process the change feed of Cosmos DB containers.

A scalable aggregation technique

Let's start by exploring what is denormalization and why it is required in some situations.

Data stores that achieve true horizontal scalability do so by partitioning, i.e. splitting your data over a cluster of servers. In Cosmos DB, this means that not only will the different containers of your database (i.e. collections/tables/graphs depending on the API you're using) be spread across different machines, but even the containers themselves may be split over multiples machines (unless they are "fixed capacity" ones).

Because all your data is not on a single machine, aggregation capabilities are limited. It's not that cross-container aggregations would not be possible, but as they would require costly synchronization between multiple servers (your containers may be spread over dozens or hundreds of machines), the performance of such queries would be greatly affected. And because Cosmos DB is all about performance, it doesn't expose such capability.

I really consider this as the kind of constraint that forces us to adopt sane data modeling and processing practices. It actually reminds me of a recent project I've been working on and that was implemented on a database that has a very rich and flexible query language, allowing us to perform literally any kind of aggregation and projection across the entire dataset. In retrospect, I think that this flexibility actually harmed us more than it helped, at least in some cases. It was so easy to respond to new product requests by amending existing queries, squeezing more joins and projections that we eventually ended up with monster queries that were difficult to maintain. Stricter constraints would have required us to implement more scalable methods, like denormalization.

Our use-case

So let's consider the simple scenario of a blogging platform where we would store Users and Posts. Posts would reference their authors with an authorId field. Let's also imagine that we would store these 2 types of entities in separate collections because posts would also contain views, comments and other statistics, and so require more write throughput than users.

Now we would probably serve a "feed" of the most recently published posts and this feed would aggregate the authors directly in each post, in the following form:

{
  "id": "post-id",
  "content": "...",
  "author" : {
    "username": "author-username",
    "pictureUrl": "https://..."
  },
  "publicationTime": "2018-05-20T12:00:00Z"
}

With a relational database, this would be achieved by setting the authorId of posts as a foreign key, then querying both Posts and Users tables with a JOIN to construct this feed in real-time. But that's not an option here; instead, we will denormalize our data by maintaining this feed in a separate, dedicated container that's ready to be queried. This container will effectively contain a copy of the data present in the Posts and Users containers, with the authors already aggregated in the posts.

Note that this technique is anything but new. Relational databases like SQL Server or PostgreSQL can expose materialized views that precompute and cache expensive queries. Setting a denormalized container with Cosmos DB takes a bit more work than setting such a denormalized view though, as we need to gather and join the data ourselves.

Implementation

In order to denormalize our feed, we would need to perform the following actions:

  • whenever a post is created or updated, add its author to the post and upsert it in the Feed container
  • whenever a user is updated, update all posts authored by this user in the Feed container to replace their author with the updated user

At this point you may think that adding these "hooks" inside your application's code would be cumbersome and error-prone. The good news is, they can (and should!) be implemented outside of your application by leveraging Cosmos DB's change feed. Each Cosmos DB container exposes an API that streams the changes happening to that container; so every time a document is added or updated, the container's change feed pushes the new document to anyone listening.

There are a couple of ways to listen to change feeds. The Cosmos DB SQL SDK can query it and there's also a powerful Change Feed Processor library that makes it easy to distribute the processing of a change feed over multiple workers. My personal favorite is to use Azure Functions as they provide a built-in Cosmos DB trigger. With this trigger, your function gets called every time there's new data available from the change feed of the container you've specified.

So we're going to implement 2 Azure Functions using the Cosmos DB trigger: one that's watching the Posts change feed and an other one watching the Users change feed.

Two Azure Functions listening to the change feeds of the Users and Posts continers to update the Feed container

Let's start with the first one:

[FunctionName("PostsTrigger")]
public static async Task Run([CosmosDBTrigger(
  databaseName: "denorm",
  collectionName: "posts",
  ConnectionStringSetting = "CosmosDbConnectionString",
  LeaseCollectionName = "leases")]IReadOnlyList<Document> documents)
{
  if (documents != null && documents.Count > 0)
  {
    foreach (var post in documents)
    {
      // fetch author
      var authorId = post.GetPropertyValue<string>("authorId");
      var author = await _documentClient.ReadDocumentAsync(
        UriFactory.CreateDocumentUri("denorm", "users", authorId));

      // set author to post and upsert it
      post.SetPropertyValue("author", author.Resource);
      await _documentClient.UpsertDocumentAsync(
        UriFactory.CreateDocumentCollectionUri("denorm", "feed"), 
        post);
    }
  }
}

Our second function will have to update the author field of all posts authored by the user that has been updated, so we will use a stored procedure to implement that. Here's the code:

function updateUser(user) {
  var collection = getContext().getCollection();

  var accept = collection.queryDocuments(
    collection.getSelfLink(),
    `SELECT * FROM feed f WHERE f.authorId = '${user.id}'`,
    function (err, feed) {
      if (err) throw err;

      for (i in feed) {
        var post = feed[i];
        post.author = user;

        var accept2 = collection.upsertDocument(
          collection.getSelfLink(),
          post);
        if (!accept2)
          throw new Error('upsert not accepted');
      }
    });

  if (!accept)
    throw new Error('query not accepted');
}

And the corresponding function that will call that stored procedure every time a user is updated:

[FunctionName("UsersTrigger")]
public static async Task Run([CosmosDBTrigger(
  databaseName: "denorm",
  collectionName: "users",
  ConnectionStringSetting = "CosmosDbConnectionString",
  LeaseCollectionName = "leases")]IReadOnlyList<Document> documents)
{
  if (documents != null && documents.Count > 0)
  {
    foreach (var user in documents)
    {
      await _documentClient.ExecuteStoredProcedureAsync<string>(
        UriFactory.CreateStoredProcedureUri(
          "denorm", "feed", "updateUser"),
        user);
    }
  }
}

Denormalization on auto-pilot

Nothing else is needed for the feed data to automatically reflect any update in posts or users. And because the Feed container is just a normal Cosmos DB container, every feed document is fully indexed by default so you can issue queries that efficiently filter by authorId or sort by publicationTime.

This simple example showed how to perform a basic aggregation between 2 containers, but more elaborate operations can be implemented in your denormalization pipeline, like adding new fields that are combinations of existing fields (for more efficient filtering or different sorting).

Azure Functions really feel like the best way to host such a pipeline because:

  • they're completely independent of your application code and can be considered as an standalone "extension" to your Cosmos DB database,
  • if you're using Functions in a consumption plan, you pay only when your functions are running and they will scale out automatically when required.

Comments