- Autoscaling stateless systems is a well-understood field at this point. Yet, autoscaling stateful systems suffer from a much shorter amount of common standards and practices, especially in the public domain.
- Synchronization of data on new nodes is a big challenge when scaling up a stateful system as it takes time and resources from the cluster, and the complexity only increases as the size of the data increases.
- Consensus algorithms like Raft are commonly used in stateful systems to select a new leader when the previous one crashes but are rarely smart enough to understand which node would deliver the best performance to the cluster.
- There are differences between scaling reads and writes in a stateful system that prevent the adoption of a single strategy for all; adopting special strategies for each scenario improves the chances of successful automation.
- Understanding when scaling or descaling the system should happen is vital, as there are substantial differences between scaling up a system that is slowly degrading in performance versus defending a system that is under a sudden and temporary significant increase in demand, such as when under a DoS attack.
Considering the trend in software engineering for segregation and the ever-growing need for scalability, a common challenge arose where autoscaling stateful systems (databases being most common) became complex and, at times, unfeasible. That has led to many companies choosing to over-provision such systems so that, based on expected loads, the systems can cope with the highest expected demands. This, of course, brings problems as over-provisioning resources is costly. It does not guarantee reliability, as sudden surges of demand or a DOS attack can easily compromise the expected loads. This article aims to dig deeper into the challenges faced when attempting to auto-scale stateful systems and proposes an opinionated design solution on how to address many of those challenges through a mix of existing and novel approaches.
Recapitulating a Little
If we look at how software engineering evolved historically, we see a few significant milestones in terms of building software and the restrictions and expectations of users.
Suppose we took a concise, historical tour of software engineering. We would start with the mainframes and their centralized approach with large servers, pass by the desktop applications, including the Client-Server advent. We would then move into the web applications revolution and the multiple phases within it, from large monoliths to modern microservices.
In all that history, we would see clear trends for segregation. Vertical segregation is where we divide systems by concerns (or context), generally having the database(s) separated from our applications and in many cases having our UI separated from our business or service layer. And there is horizontal segregation, where the systems can be scaled out by provisioning more nodes to support rising demands, even automatically, with the help of tools like an orchestrator such as Kubernetes.
This segregation hunger led to the inception of architectural approaches like the shared-nothing architecture in which an application is built to not hold state in itself, becoming what we know as a stateless application and making it a lot simpler to scale out. That sounds like an awesome solution, but soon, engineers realized that there is seldom a “truly stateless” application—one that doesn’t hold state at all.
What happens instead is that parts of the application (usually the services or microservices) are built stateless. However, they still rely on stateful systems such as databases to hold state on their behalf. This is the central theme of this article. I will discuss this common challenge in software engineering, how to efficiently auto-scale stateful systems in modern applications?
Targeted Use Cases
This article does not target stateful systems that hold state in web servers.
It provides a foundational design upon which software engineers can potentially build their own databases, explicitly addressing the concerns of getting a storage system that works in a single node and turning it into a distributed system with opinionated autoscaling capabilities. For example: Imagine a microservice architecture for online ordering, like the diagram in Figure 1.
Figure 1: Online shopping example use case
Let’s say there are requirements that lead to the conclusion that using RocksDB as a key value storage engine for your project requirements is better than Redis. The trouble is that RocksDB is only a storage engine—it can only be deployed in one node as is. Let’s assume your requirements are for a new system that will have a global outreach and require significant extensibility. This article is a good start on how you can go about transforming a single-node storage engine like RocksDB into a distributed and auto-scalable application.
Note that RocksDB is just an example here. It could be any other storage engine or tool like Apache Lucene for text indexation or just in-memory storage without any engine, for that matter. This design pattern is generic and can apply to any storage engine, language, and data type and structure. Also, the designs shared here could be used to give autoscaling capabilities to any of the databases listed (Mongo, Redis, Postgres).
Stateful System Definition
A stateful system is a system where state must be handled. In modern web applications, the job of holding and managing state is typically done by a database, but it can also be a web server, for instance, when user sessions are stored in the web server’s memory.
In websites, a typical example of state to be managed is a user’s shopping cart (see Figure 1). The cart has to be saved between HTTP requests so that when the user finishes shopping and proceeds to checkout and payment, the cart is in the correct state with the right products and amounts. The cart information has to be stored somewhere, and that somewhere is a stateful system. In our use case example in Figure 1, the Mongo DB cluster, Redis cluster, and Postgres cluster are the stateful systems.
When we start thinking about autoscaling stateful systems, the main concerns that come to mind are “When should we scale? What should be the trigger? How should this scale take place? How will we move the data? How will the nodes achieve consensus?”
Here are the main problems I will cover in this article:
Every distributed system that holds state has to agree on the next valid state of the cluster. This is a well-researched area that we can refer to, we can take as examples Ark, Raft, and BDR, which are the consensus algorithms used by MongoDB, Redis, and Postgres, respectively. These are the databases picked for our example of online ordering presented before in Figure 1.
The need for consensus arises in software applications every time a cluster must agree on the next value of a record stored, being the most notorious use cases of database implementations. In this article, I will propose something new to make Raft consensus smarter when selecting a new leader.
In stateful systems, even though there is an increase in the offerings of systems that can auto-scale (managed instances of databases in cloud providers, for example), in practice, we see a struggle within companies to be able to implement such scenarios, which appear to be caused by:
- Lack of transparency: When we look at mainstream cloud providers like AWS and Azure it is easy enough to configure autoscaling, but it is not disclosed how exactly the autoscaling will happen, and knowing exactly how it works is vital for critical scenarios with large datasets—questions like “When is the data moved? What is the strategy?” These should be clearly stated on the product offerings.
- Inexistence of public patterns to autoscaling: no publicly available patterns currently exist on how to auto-scale stateful systems.
Data migration lag
A common way of scaling a system is adding new nodes to the cluster. However, when we talk about stateful clusters, time is needed to achieve synchrony with all the data already held by the other nodes—in some cases, these amounts can be massive.
Let’s take our online shopping example (Figure 1). If we talk about large organizations that operate in many geographical areas, the number of records can reach billions. At this scale, a clear and efficient approach to synchronizing and moving data is paramount.
Fast vs. slow demand increase
There are two instances where more capacity is required:
- A steady, medium- to long-term gradual increase in demand. In our ordering example, this would be the numbers of consumers growing consistently over a period of time.
- A sudden surge in demand that may not be predictable and may risk the service becoming unavailable, which happens when the system is under a DoS attack, for instance.
There needs to be more publicly available design patterns for the listed problems above. It is not a good idea to simply believe that the cloud provider’s approach will work; even if it does, you may find yourself locked to that particular provider, which is not ideal.
My vision is a public proposal for a generic, replicable, opinionated approach for autoscaling stateful systems aiming to automatically scale up (vertical) and scale out (horizontal) from a single node up to hundreds or thousands of nodes in a single cluster with minimum configuration and interference of the operator. The solutions presented in this article are theoretical at this stage and require implementation and testing.
The designs are not bound to any specific data types; in other words, you can use the same solutions to handle JSON objects, serialized data, streams, blobs, or other types of data.
The Writer writes and the Reader reads!
The cluster leader responsible for writing new states only does the write operations—it does not perform reads. Read replicas, on the other hand, do all the read operations and never do writes.
Proxy as part of the cluster
You must have a proxy implementation that does not serve reads nor writes as part of the stateful cluster; this enables the cluster to use this proxy as a node that also slowly synchronizes the data of the cluster, eventually becoming ready to serve read or write requests if required.
Trigger autoscaling by average response time
Most autoscaling approaches used by cloud providers use CPU and memory thresholds, but that is not the best way to deliver the best client experience. Even though the resources may be under stress at certain times, it does not necessarily mean that the user is feeling it on the other end; the system may be using 99% of CPU and delivering requests in good time.
Using average response time as the primary trigger changes the decision-making on when to scale, taking the client’s perspective of the system’s performance.
A priory sharding labeling
Labeling each object/record stored with a shard ID avoids the costs of doing it when the pressure is higher on the system so that whenever you need to start sharding, the labels are already set, and no intervention is required.
Here we go… The designs
In this proposal for autoscaling stateful systems, there are three different actors. Each actor will have specific responsibilities in the cluster. It is worth pointing out that each actor proposed does not necessarily need to be running in its own process or node, and that has much to do with the ability to run the system in a single node, which raises concerns for production purposes. Still, it is paramount for testing, POC, or even some MVP setups.
Without further ado, let’s look at each actor and its responsibilities.
The Writer (or leader) is the actor responsible for taking care of the write operations. It writes the new state in its own storage and is responsible for replicating the data to the Read Replica actor(s). There is only one Writer per shard (I will elaborate more on sharding later).
The Writer is the leader of a consensus, and all the write operations are executed through it. No read operations are executed through the Writer.
The Read Replica is the actor that serves all the read requests. It contains a replica of the data from the Writer (leader), except when both Writer and Reader are running in a single node/process, in which case they can share the same storage.
When the consensus protocol elects a new leader, each Read Replica is responsible for opening a multiplex pipe of communication with the leader that remains open until one of the nodes dies or the connection is broken by a network partition. This is important to speed up communication between the actors/nodes by avoiding the overhead of opening and closing connections.
The Load Manager actor serves as a gateway and load balancer, sending write requests to the leader and read requests to the replicas. It is also a back pressure mechanism that can accept thousands of inbound connections. Still, it maintains the number of parallel threads against the target (Read Replica or Writer) limited to a configurable number, therefore keeping the pressure on these actors controlled. The Apache Tomcat Nio Connector inspires this. And it is vital to defend the cluster against sudden increases in loads or DOS attacks, in which case the pressure will be absorbed by the Load Manager, keeping the read and write actors safe and receiving a steady flow of requests.
The Load Manager is also responsible for routing write requests to the correct shard and sending query requests to each shard when aggregation is required. It also aggregates and sorts the results before returning them to the client, reducing the amount of work required from the Read Replica. The Load Manager addresses these concerns in the same manner as a database proxy would, with the difference that it is part of the cluster and not an external added component in this case.
There can be more than one instance of the Load Manager. Every time the Load Manager instance(s) reach a configurable threshold of CPU and/or memory, a new Load Manager is provisioned, generating a cluster of Load Managers that should have their own independent consensus mechanism.
The basic interaction between the actors proposed in this design pattern is expressed in the diagram presented in Figure 2:
Figure 2: Basic design of the proposed solution
The fact that Raft only has one leader at a given time is very important for the strategies I will describe here.
I propose modifying the Raft protocol to increase the overall performance of the cluster by making Raft aware of node differences and selecting the “bigger” node available as a leader. This is especially important when talking about autoscaling write operations, as we only have one leader at a given time. The most obvious way to increase its capabilities is to provision a “bigger” leader and therefore trigger a new election. Raft then needs to be able to identify and elect the new “bigger” node as leader.
An alternative approach is to modify Raft to be able to receive a “switch to” instruction which would cause the cluster to switch the leader to the specified “bigger” node.
The latter approach is preferred, as it would be a smaller change to protocol and would decouple the task of switching the leader from the switch logic.
Bigger in this context is related to CPU, memory, storage technology (SSD), or other resources—it all depends on the purpose of the stateful cluster. If the cluster is intended to serve complex calculations, bigger probably means more CPU, but if it serves requests, it might mean more memory and better storage technology.
The phases of scalability that I explain next are named Mach* in an allusion to the speed term used to describe objects as fast or faster than the speed of sound. In this article, each Mach stage effectively implies the cluster’s number of nodes.
*Mach terminology is only used in this article—it is not an industry naming convention.
Configurable Scaling Triggers
It is important to understand when is the right time to auto-scale/auto-descale. For instance, it is a bad idea to attempt to scale when the system is under a lot of pressure, and that is why the back pressure offered by the Load Manager actor is so significant.
I will focus on scenarios where the increase in demand happens gradually over time. For that, there are two essential types of configurations that can be used to trigger autoscaling.
In both scenarios, it is the responsibility of the Load Manager to recognize that a trigger should happen and emit a notification to the operator. (Note: the operator may be a human or a software system, preferably the latter.)
The configurable triggers are:
1. By average response time threshold
One of the jobs of the Load Manager is to monitor the average response time of requests. When the average response time of requests reaches a threshold, a trigger for scale is issued.
Example of scaling UP configuration: 3 seconds/request on average in the last 60 min.
Example of scaling DOWN configuration:
2. By timeout threshold
The timeout threshold is a percentage of requests that may time out within a given period of time before an auto-scale signal is issued.
Example of scaling UP configuration: > 1% of requests timed out in the last 5 min.
Example of scaling DOWN configuration: Not recommended for timeout threshold as no level of timeouts is advisable to be safe for a downgrade in the cluster.
Refer back to Figure 1, and assume that we have replaced Redis with our auto-scalable RocksDB. The new RocksDB auto-scalable cluster would scale up and down based on these thresholds being breached without any interference of the human operator/admin.
Notes before continuing reading:
- The following examples for each Mach stage focus on increasing read capabilities, which will indirectly increase write capabilities as per the following workload segregation. To scale targeted write capacity, a special section will be dedicated after Mach IV.
- A “Node” in this text means a participant in the cluster and an individual process running—it does not necessarily indicate different hardware.
- The total number of replicas to be provisioned before starting to create shards has to be configurable.
- The cluster can start in any desired setup, being Mach IV the minimum setup recommended for production purposes.
- All three actors’ (Writer, Read Replica, and Load Manager) implementations are modular and always deployed in the nodes. When a node is labeled as Writer, for example, it means that only the Writer module is enabled on it, but it still contains the disabled modules of Read Replica and Load Manager, which is what allows nodes to switch responsibilities if needed.
This is the initial state of the system/cluster; all its actors are active in a single node, it is indicated for small use cases or testing pipeline scenarios, and it looks like the diagram in Figure 3.
Figure 3: Single node deployment—Mach I.
In Mach I, all components are deployed as a single process in a single node. This single node is responsible for managing all the read and write requests.
Use case: Mostly recommended for testing scenarios—not ideal for production.
Consensus and replication
At Mach I, no consensus or replication is required as the components communicate in memory module to the module.
In Mach II, the cluster counts with two nodes deployed—the second node on a scale out is always a Load Manager. That is to ensure back pressure protection on the node responding to the requests and to allow the new node to gradually synchronize the data.
The topology is represented in the diagram in Figure 4.
Figure 4: Two-node deployment—Mach II.
Consensus and replication
At Mach II, consensus is not required as it is impossible to establish Raft consensus with less than three nodes.
Replication will happen from the Read Replica module deployed on node two to the Read Replica module that also runs in Node 2 alongside the Load Manager. The point to be noted is that the Read Replica in Node 2 (the Load Manager node) does not serve requests; this design decision is to always have a node “nearly” synchronized, which can enter in operation as an extra Read Replica or a leader node extremely fast, as I will explain in Mach III.
Use case: Can be used in scenarios where reliability is not so important and low operational costs are.
Mach III indicates that an extra node has been added to the cluster, which now has three nodes in total.
The new node will always enter the cluster as a new Load Manager, the clients will be redirected to the new Load Manager, and the Load Manager provisioned in Mach II takes the role of a Read Replica.
The diagram in Figure 5 represents the Mach III scenario.
Figure 5: Three-node deployment—Mach III.
Consensus and replication
No consensus is required yet because besides having three nodes, only Node 1 and 2 can actively serve requests.
Use case: Already offers a good performance by separating read and write operations in different nodes. However, if a node fails due to the lack of a second Read Replica node, this would force the Writer to start serving read requests until a new node is provisioned.
Leader crashes: The cluster returns to Mach II topology with Node 2 assuming the write and read operations until a new node is added to the cluster.
Node 2 (Read Replica) goes down: The Leader/Writer starts serving read requests until a new node is added back to the cluster.
Node 3 (Load Manager) goes down: Node 2 will start operating as Load Manager and no longer as Read Replica, and Node 1 will perform the write and read operations.
In all three scenarios, a signal is sent to the operator requesting the provisioning of a new node to replace the fallen one, and the new node always enters the cluster as a Load Manager.
At this stage, there are four nodes in the cluster, and there is a second Read Replica. The deployment will look like the diagram in Figure 6:
Figure 6: Four-node deployment—Mach IV.
Use case: Minimum setup indicated for production workloads, good performance, and good response in case of a failing node.
Consensus and replication
At Mach IV, consensus is introduced, but no election is held initially. Node 1 will remain as leader and centralize the write operations not to waste time switching to a new leader. It is vital that the Raft implementation is extended to support this arrangement. It is also crucial that if the leader goes down, Node 2 or 3 becomes the new leader, reverting to Mach III topology. The Load Manager is responsible for making such a decision.
Leader crashes: Raft protocol can’t have an election with only two nodes remaining in the cluster, so the Load Manager will randomly pick a new leader between Nodes 2 and 3 that contain a Read Replica running.
From Mach IV, the resiliency strategy for Replicas and Load Manager nodes is the same as Mach III, and new replica nodes can be added to the cluster to a configurable max number before shards are created.
Mach V, VI, and so on…
New Read Replicas keep being added to the cluster until a configured max number where sharding starts to take place. Note that adding Replicas means always adding a new Load Manager and taking the place of the previous Load Manager. The previous Load Manager then joins the Raft consensus and starts serving read requests.
Use case: As the cluster grows larger, it becomes more reliable since the failure of one node is not as dramatic as in smaller setups.
Leader crashes: Raft elects a new leader among the Read Replicas available, and the new leader communicates to the Load Manager of its election.
Read Replica crashes: A new node is requested to be added to the cluster.
Load Manager crashes: The latest Read Replica added to the cluster assumes the Load Manager responsibilities, no longer serving read requests itself until a new node is provisioned, and as always, it enters the cluster as a Load Manager.
Read Intensive vs. Write Intensive Scenarios
Until Mach IV, the autoscaling neglects the characteristics of the load to the detriment of having what is considered the minimum replication setup for a reliable system. The system will continue to auto-scale, but now differently. It will now consider if the usual load is Read Intensive (80% or more reads), Write Intensive (80% or more write operations), or balanced (all other scenarios). This may not account for exceptional use cases but remember that this is an opinionated pattern and that, if necessary, it can be adapted for special circumstances. The target here is to address most of the use cases—not all.
Scaling Read Intensive scenarios
This requires the simplest strategy where new nodes keep being added in the same manner as Mach I to Mach VII, which represents seven nodes (1 Read Manager, 1 Leader Writer, and 5 Read Replicas) where the cluster will then start using shard labels (more on it soon) to create shards of the existing data and divide the load of incoming requests between the newly created shards.
Before it operates in two shards, a new node is added to the cluster to support the topology introduced in Figure 7, with a minimum of eight nodes.
Use case: Large-scale scenarios with ever-growing demands.
The diagram in Figure 7 represents a setup with two shards, each shard containing one leader and two Read Replicas, plus an extra replica in a Load Manager node.
Figure 7: Sharding topology example for two shards.
This can be further scaled into a third or fourth shard when enough nodes are provisioned in each shard. For example, if Shard 2 scales to seven nodes, the next step is to add a new node and divide it into two shards.
Each Load Manager holds a replica of only one shard, and that is the shard the Load Manager can assume the read or writer role if needed, but to be able to fulfill requests from clients, it needs to write and read from all shards.
Scaling Write Intensive scenarios
For the scenarios where write operations are the focus or where sensible degradation is observed on write operations, there are two ways to scale the writing capacity of the cluster:
1. Provision of a bigger leader
The first approach is to provision a bigger node (memory, and/or CPU, and/or read/write storage speed) than the current leader. This node will initially be provisioned as the Load Manager (as always) and will remain as Lead Manager until it synchronizes with the current leader exactly like in the Mach transitions explained earlier. Once synchronized, it starts a new election and becomes the new leader of the Raft consensus, switching places with the old leader. Each node in the cluster has specification levels labeled from 1 to 5, where 1 is the lowest level of specification (smaller resources), and 5 is the highest (bigger resources).
Once scaling vertically has reached its limit (node level 5), sharding begins. A new leader will be provisioned as Read Replica that will first synchronize only the shard it will take over from the current leader and then will become the leader itself only for that shard of the data. The level of the new node added (1 to 5) has to be configurable.
This is a different trigger for sharding than the Read Intensive scenario described before. Read Intensive sharding is triggered based on horizontal scale (number of nodes). For Write Intensive scenarios, sharding is triggered based on vertical scale; in other words, the size of the leader reached the maximum possible (5).
It is also possible to specify that when the new shard is provisioned, both leaders are of a certain level, for instance, level 3. This is important because two level 3s make a “level 6” Writer capable cluster, but this should also be configurable. In this scenario, the previous level 5 Writer node would be replaced by two level 3 nodes, one for each shard.
A priority labeling for sharding strategy
The problem: Figuring out how to separate information into shards efficiently is no easy task, especially if the data is large and complex.
A simple solution: Every time a record/object is stored in the cluster, a bucket ID in a range from 1 to 1000 is assigned to it. This bucket ID is random and guarantees that, at a large scale, each bucket will have a similar number of objects assigned to it, balancing the shards.
When sharding becomes required, bucket ID labels are used to decide which objects will belong in each shard. For instance, for two shards, the first shard will have objects allocated in the buckets from 1 to 500 inclusive, and the second shard will have objects allocated in the buckets 501 to 1000 inclusive, considering that the total number of buckets was defined to be 1000.
This splitting buckets per shard process will be repeated every time a new shard is required. This means that, for this example, the maximum number of shards is 1000, which is probably unrealistically high for most scenarios.
Conclusion and Future Work
By no means do I believe this article addresses all the nuances of autoscaling a stateful system but instead offers a template and a pattern based on many techniques I have used separately during my career, now put into a single standard that can be used as a foundation for stateful autonomous scalability implementations. These designs may not be the best fit for scenarios where the majority of the read operations have to be executed against many or all shards. In such scenarios, a better shard bucket definition is advisable to attempt to have all the data needed in a single shard or in as few shards as possible.