Designing multi-region distributed system and things to be taken into account

Irdeto
9 min readAug 13, 2021

--

Existing system

As part of our Digital Rights Management (DRM) solution, we have designed a system that generates a key for encryption and then stores the encrypted content in the operator’s content delivery network. To play such content, users obtain a license that contains a key for decryption of the content and policies that define the quality of the content (HD, UHD), set geo-restrictions and time limits, etc. The current system is designed using the CQRS pattern, and in our previous blog post we’ve explained how we arrived at this architecture From a Monolithic to a Highly Responsive Distributed System

Existing CQRS system
  • Command: The system is responsible for taking user-entered data through UI and also from user devices. The data is stored in PostgreSQL and also sent to Kafka so other interested services can build their own state using these messages.
  • Query: User devices try to play the protected content, and the DRM client will send license requests to the server. DRM service gets the required policies and keys, evaluate them, and issue the license if the evaluation passed.

Encountered issues

Though we were quite happy with the architecture, it turned out that when the service needed to serve viewers around the world, there emerged some issues with latency and system availability.

Latency

When the solution is deployed in one region, viewers from other regions may experience extra latency because of the physical location of the servers.

Let’s see what happens when a viewer from Sydney tries to request a playback from the DRM server in Ireland.

Existing system handling requests from multiple regions

The shortest distance between Ireland and Sydney is 17,289KM, which means any request from Sydney will take about at least 57 milliseconds to reach its destination machine in Ireland. As we use HTTP, it takes three round trips before the connection can be established.

HTTP handshake round trips

So, it will take 3 * 57 = 171 milliseconds to establish a connection, and another 57 milliseconds to send the actual request, and still another 57 milliseconds to receive the response.

The total time for a single request from Sydney to the server in Ireland is http_connection_time(171) + request_time(57) + server-processing-time + response_time(57)

If we use persistent connection, then the average time per request is 114 + server_processing_time(in our case 10 - 20 milliseconds) Obviously, this added latency could lead to bad viewing experience.

Availability

As the solution is deployed in AWS and relies on AWS services like RDS, REDIS, and the SLA of the system depends on the SLAs of all these services. Any failure will result in downtime of the system and damage availability.

Some of our customers were asking for very challenging SLAs like 99.99 and 99.999, which is highly impossible for a solution that relies on services like RDS, REDIS, etc from amazon which has lower availability than what our customers are asking for.

So we needed to design the system which will handle the 2 issues mentioned above.

Solution

The obvious choice was to have a system that can be deployed in multiple regions and do the latency-based routing for the user requests.

As we were already using the CQRS pattern in our solution where any changes in service will publish a message to a Kafka broker and all other interested services will update their state by consuming these messages, we decided to extend the same functionality to distribute the data across multiple regions using the Kafka.

So we decided to use MirrorMaker to replicate the data from the Kafka cluster of region-1 to the Kafka cluster of region-2 and vice versa, which resulted in the following design

Multi-region design and data replication

Any request to any region will send a message to Kafka and the same message will be replicated to another region by the MirrorMaker(there will be another article explaining why we choose MirrorMaker), so each service has a consumer to consume the data and populate its own data store.

When region-2 goes down all the requests will be forwarded to region-1 and all the messages will be stored in Kafka, once the region-2 comes back MirrorMaker will start replicating the data from region-1 to region-2 from where it is left.

So data will be eventually consistent and there will be no data loss in failure scenarios.

Things to be paid attention to

Message order

Messages produced in one region can arrive in another region out of order.

Example: The sequence of operations in a region are

  • Update entity
  • Delete entity

Yet, in region-2, if we consume/get the delete message first and the update message second, this will lead to an inconsistent/invalid state.

To avoid this, Kafka has a concept called “Message Key”: if some messages need to be consumed in a specific order, you can send them to Kafka using the same message key.

How it works: Kafka has multiple topics and each topic is made up of partitions, and messages under a partition are always ordered(queue). so sending messages with the same key means they will end up in the same partition.

The order of messages coming from a single region can be solved with the message key, but what if we have 3 regions and messages from region-1 and region-2 can arrive in region-3 out of order?

Example:

  • Update entity sent to region-1
  • Delete entity sent to region-2

As these 2 actions are happening in 2 different regions, we can guarantee the order of the messages in region-3, but we still need to make sure all 3 regions are consistent.

For this, we used a time base resolution, we run all our services in UTC and we consider our region’s times are in sync. Every message sent to the Kafka topic has message creation time in the body. While processing these messages in the destination region, we check the message creation time and the last modified time of this record in the destination region to make a decision whether to apply the change or ignore the message.

So, for the above example, an update entity message will be created at say 10:09:00:300 and a delete entity message is created at 10:09:00:350. These messages arrive out of order, so we first have the delete entity message. We process it in region-3 and store it in our database with the modification time of 10:09:00:350. When the update entity messages arrive, we check the last modification time of the record in the database of this region. We can see that this message is from the past and, therefore, we don’t change the state as we already have the latest state.

Conflict handling

There will be cases where we can have parallel changes made on the same entity in 2 different regions at the same time, resulting in 2 messages in region-3 for the same entity at the same time.

If they had different times, we could use the above timestamp-based approach to have the latest state.

But in this case, region-1 and region-2 send messages for the same entity created at the same time, and the one from region-1 arrives in region-3 first and we process it. Yet, the message from region-2 for the same entity arrives, we check the creation time to see whether to process the message or ignore it, but the message created time and the record modification time are the same, so we need to make a decision about which state to persist.

For this case, we use an approach called rank-based selection. Each region runs with a unique rank number and when we have conflicts of messages created at the same time we use the rank of the region to have a consistent state across all the regions, in our case we are saying the region with a higher rank will be the latest state.

In our case, region-1 has rank 1, region-2 has rank 2 and region-3 has rank 3. According to this rank number now region-3 will have a message coming from region-2 as the latest state and region-1 will also have the message from the region-2 state as the latest state.

Failure handling while consuming the messages

When we are consuming messages from Kafka, the message processing might fail for multiple reasons. As we could see, all the reasons fall roughly into two categories

  1. The failures that are automatically recovered by retrying
  2. The failures requiring manual correction
Message retry and error-topic

Automatically recoverable failures

Failures like “the message can’t be processed” because the dependent entity of this message still did not arrive in this region can be automatically recovered by retrying.

We use multiple retry topics with their own consumers so that they can consume with their own speed(by configuring consumer fetch time). Once we retry 2 times(using 2 retry topics) and it still fails we can send that message to error-topic, where somebody needs to take manual action and resubmit it to the main topic after the correction.

Manual correction

There will be failures like an “invalid” message or other failures for which retrying will not solve the problem, and these messages will be directly sent to the error-topic. They require manual correction and resubmitting to the main topic after the correction.

Every message in error-topic will create an alert in the monitoring system that needs to be solved immediately.

Tenants (customers) choose regions and GDPR

We have a multi-tenant system that spans multiple regions. At the same time, there are use cases where some tenants don’t want their data to be replicated in region-3 and region-4 or moved outside a particular region.

For these scenarios, we use the topic-per-tenant approach and we dynamically reconfigure MirrorMaker using the sidecar approach to define which topic(tenant) needs to be configured to which region.

Besides, some tenants want to add new regions. For this purpose, we have all the messages stored for a tenant and we migrate these messages to the new region as well and also reconfigure MirrorMaker to replicate this tenant data to the new region as well.

Running MirrorMaker effectively

One more problem we had is running the MirrorMaker for production-ready configuration.

MirrorMaker uses the Kafka connector concept to do 2-way replication.

Connectors run low-level consumers and producers to copy the data from one Kafka cluster to another one.

Since MirrorMaker copies data from a cluster in one region to another region, there will be chances of network failure. Besides, we might consume messages fast but it was slow while producing, meaning that we have internal producer buffers filling resulting in the MirrorMaker application memory increase.

We configure the low-level consumer and producer configurations and timeouts so that we have fixed the MirrorMaker memory and stability.

Backward compatibility of messages

The events schema changes along the way. For this reason, it’s impossible to consume old events with the latest consumer or the latest event with an old consumer, as they won’t be compatible. Also, any change to the message schema in region-1 will cause an impact on region-2 consumers. So, we always need to think about how consumers will behave with any changes in the message schema across all regions.

Any changes in a message always need to be released in multiple steps, and always have to take proper steps to avoid backward compatibility failures.

Conclusion

With the new multi-region design, we achieved

  1. Lower latency: Users from each region will be served using the servers from the same region. As these servers are physically close to the users, the requests can reach servers quickly and responses will be served back faster resulting in a better user experience.
  2. High availability: Since we have the system deployed in multiple regions, if any region goes down, we can forward the user requests to another region until we recover the failed region resulting in higher availability of the system.

Written by: Iranna Patil and Barbaros Alp

--

--