Hello!

© 2024 Kishan Kumar. All rights reserved.

Consistent Hashing in System Design: An In-Depth Guide

Consistent hashing is a distributed hashing technique that aims to evenly distribute data across a set of nodes (servers) and minimize the impact of node additions or removals.

July 23, 2024

Hero

Photo by Google DeepMind on Unsplash

Consistent hashing is a fundamental technique in System Design that addresses the challenges of scalability, load balancing, and fault tolerance in distributed systems. This article explores what consistent hashing is, why it's necessary, and how it benefits modern distributed systems, using Java examples to illustrate its implementation.

What is Consistent Hashing?

It is a distributed hashing technique that provides a way to distribute data across multiple servers efficiently. It does so in a way that minimizes the reorganization of data when the nodes are added or removed.

To understand Consistent Hashing, let's first look at a simple hashing scenario:

Example: Imagine we have a system with 4 servers and 6 data items. A simple approach to distribute those data items to our 4 servers might be to use a simple hash function and the modulo operator:

To assign our data item to a server we might use the following function:

1server_number = hash(date_item) % number_of_servers
data_itemhash(date_item)number_of_serversserver_number
189240
276541
3205640
487143
5690743
61242

As we can see this method redistributes our data in an evenly manner to all of our servers. If we want to know on which server our data lies, all we need to do so is pass our data_item to the hash function and it will return us the server number.

E.g.; if I want to know on which server my date_item: 5 lies, I will first hash it which will give me 6907 then I will modulo it with the total number of servers: 4, which will give me : 3. This means my data_item is located on server 3.

This works fine until we need to remove or add a server. When that happens, most of our data items will need to be remapped, causing a significant redistribution of data.

Let's say one of our servers needs to be removed, that will affect the function that we used earlier

data_itemhash(date_item)number_of_serversserver_number
189231
276530
3205631
487131
5690731
61230

We have to redistribute data from almost every server in our case. This is a huge challenge if the volume of data is too big.

Let's look at how consistent hashing solves this problem, but before that, we'll need to understand a few of the tech jargon. Unfortunately, this can't be skipped.

Core Principles of Consistent Hashing

Hash Ring

The core idea of consistent hashing is to use a circular hash space (often referred to as a hash ring). Both the data and the nodes (servers) are hashed into this ring. The ring's hash values range from 0 to 232 - 1 (let's say we are using Java's inbuilt hashcode method). Note: the starting value of .hashcode is negative but we are omitting that for simplicity.

Node Placement

Each node (server) is assigned a position on the hash ring based on its hash value. This is typically done by hashing the node's identifier (e.g., IP address or some sort of serverId)

Data Placement

Each piece of data is also hashed to a position on the ring. The data is assigned to the first node it encounters while moving clockwise around the ring.

Example in Detail

Let's take an example to understand it better

Consistent Hashing

Consistent Hashing

In the above figure, we have 2 servers denoted by S1 and S2. Server S1 has serverId: North-Virginia and a hash value of 100. This hash can be generated using any hash function but for the sake of simplicity, we are assuming it to be 100. Whereas S2 has serverId: East-Mumbai and a hash of 500.

Now, these servers or nodes are placed on our hash ring whose starting value is 0 and the ending value is 2³² - 1.

Now, how do we decide which data will go to which node? Well, our data placement principle will help us. We will first take the date say the one with the key: 1. We hash it and find that its keyHash is 25. Now we move clockwise from that point till we find a server, and the first server that we encounter is S1 so we save the data on that node. Similarly, let's say our data key is 4, we hash it and find the value to be 128. We move clockwise till we find a server and we find S2 so we save the data on S2.

What if we want to add one more server say S3?

Let's say we added one more server with serverId: Eastern-Europe hashed its value and found it to be 980. What will happen in that case?

Node Addition (S3)

Node Addition (S3)

Notice that the data with key: 6 whose hash was 824 earlier resides in S1. But now according to the data placement policy if we move clockwise from it till we find a server we found S3 and not S1. So in that case we'll save the data 6 to S3.

Did you notice something? The addition of a server didn't result in the remapping of all the data as it happened in the very first example.

That's the potential of what consistent hashing is capable of. Here only the data that has a hash value between S2 and S3 needs to be moved from S1 to S3. Other data will remain as it is.

One drawback of this approach is that when a new node joins the system, it needs to "steal" its key ranges from the other nodes. However, the nodes handling the key ranges off to the new node have to scan their local persistence store to retrieve the appropriate set of data items,

Note that performing such a scan operation on a production node is tricky as scans are highly resource intensive operations and they need to be executed in the background without affecting the customer performance. This requires us to run the bootstrapping task at the lowest priority. However, this significantly slows the bootstrapping process and during busy shopping season, when the nodes are handling millions of requests a day, the bootstrapping can take a day to complete. Second, when a node joins/leaves the system, the key ranges handled by many nodes change and the Merkle trees for the new ranges need to be re-calculated, which is a non-trivial operation to perform on a production system.

Finally, there is no easy way to take a snapshot of the entire key space due to the randomness in the key ranges, and this makes the archival complicated.

What if an Existing Server crashes?

Let's take a scenario where an existing server crashes. How will our data remapped? In this case, again we'll follow our data placement policy, where we keep on moving in a clockwise direction until we find an available server. In this case, 3 and 4 data will be remapped to the server S3. Only the data present between S1 and S2 need to be remapped to S3. Thus there is a limited set of data movement.

Node Crash

Node Crash

You might have noticed that we have placed the servers such that they have the same partition size i.e. the distance between each of those servers, but it is impossible to keep the same size of a partitions on the ring for all servers. Again it all depends on the hashes of the server. The next server S4 hash may be very close to S1.

As we also saw that when we removed S2, the partition size of S3 was twice as big as that of S1. How can we address this issue? The answer lies in Virtual Nodes.

Virtual Nodes

You might ask what are virtual nodes, to put it in simple terms, it is a real node only but by tweaking the server hash we can place them at different positions in the hash ring. Think of them as multiple instances of your physical node.

Virtual nodes (or v-nodes) are an extension of the consistent hashing technique that further improves the distribution of data across nodes in a distributed system

Let's understand this via an Example. Say all the servers S1, S2, and S3 (physical nodes) have two virtual nodes.

Virtual Nodes

Virtual Nodes

In the above figure, each physical node has two virtual nodes. S1_1 and S1_2 are virtual nodes of S1. Notice how evenly the data space is distributed among different physical nodes. In practice, the number of these virtual nodes are quite large, sometimes in thousands.

To find which server a key (or data) is stored on, we go clockwise from the key's location and find the first virtual node encountered on the ring. If we take an example of data: 9. If we go clockwise from its location the nearest virtual node is S3_2. So the data will be stored on the S3 physical node. Note: without the introduction of virtual node, the data used to be stored on S2.

Important: As the number of virtual nodes increases, the distribution of keys becomes more balanced.
But what will happen if the physical node itself crashes? All the data present on that node will be lost. How to deal with that?

Replication

Replication in consistent hashing is a technique used to improve fault tolerance and data availability in distributed systems. By replicating data across multiple nodes, the system ensures that even if one or more nodes fail, the data remains accessible from other nodes.

How Replication Works in Consistent Hashing

Primary and Replica Nodes: When a piece of data is hashed to a position on the hash ring, it is assigned to a primary node. Additionally, the data is also replicated to a specified number of replica nodes that are placed sequentially after the primary node in a clockwise direction.

Replication Factor: The replication factor (N) determines the number of replicas for each piece of data. For example, if N = 3, there will be one primary node and two replica nodes for each piece of data. Now, the replicas are chosen in a way that it doesn't lie on the same virtual node but on a different virtual node. So, if I want to replicate the data for S1. I will put the data on S2's virtual node and S3's virtual node. Thus, S1 will be my primary node whereas S2 and S3 contain the replicated data.

One of the benefits of replication is that it makes the whole system fault-tolerant. If a primary node fails, the data can still be accessed from one of its replicas, ensuring the system remains operational. Also, replication increases the availability of data, making it possible for multiple nodes to serve read requests simultaneously, thereby improving read performance. By distributing read requests across multiple replicas, the system can balance the load more effectively, preventing any single node from becoming a bottleneck.

Implementation in Java

First, let's create a Server class. This class will hold the data. We'll use a hashmap to hold our data. Note: everything is done in In-Memory. This server class will also have an ID called serverId.

1public class Server {
2        private final Map<String, String> map;
3        private final String serverId;
4
5        public Server(String serverId) {
6            this.map = new HashMap<>();
7            this.serverId = serverId;
8        }
9
10        public void put(String key, String value) {
11            map.put(key, value);
12        }
13
14        public String get(String key) {
15            return map.getOrDefault(key, "");
16        }
17
18        public String getServerId() {
19            // each server will be assigned a random server id,
20            // we'll basically hash this server id to get the position of our node in the consistent hash ring
21            return serverId;
22        }
23
24        @Override
25        public String toString() {
26            return String.format("server-id: %s, server-hash: %s", serverId, serverId.hashCode());
27        }
28    }

Now we’ll create a ConsistentHashing class. This class implements the hashRing as a NavigableMap. Why NavigableMap? Because this map is easy to traverse and all the hash values are sorted in increasing order which is what we want.

We have exposed several methods addServer(), addServer(), put(), get(). The code is heavily documented for the users so that it can be easily followed.

When we call put() method, we first hash the key using .hashcode() and find the nearest server in a clockwise direction, when found we fetch that server instance and save the data only to that server.

Similarly, when we want to get the key, we figure out on which server it lies by navigating through the hash ring and once found we return it.

1public class ConsistentHashing {
2    private final NavigableMap<Integer, Server> hashRing;
3
4    public ConsistentHashing() {
5        this.hashRing = new TreeMap<>();
6    }
7
8    public void addServer(Server server) {
9        // find the hash of the server
10        Integer serverHash = server.hashCode();
11        System.out.printf("Adding server: %s. Server Hash: %s\n", server.getServerId(), serverHash);
12        hashRing.put(serverHash, server);
13    }
14
15    public void removeServer(Server server) {
16        Integer serverHash = server.hashCode();
17        hashRing.remove(serverHash);
18    }
19
20    public void put(String key, String value) {
21        // when we are trying to add data to this consistent hashing, what happens first?
22        // first we hash the key itself to get a number
23        // then we search from the clockwise from that number till the number where it matches the first server hashcode
24        // once found, we take the server and add that key there
25        Integer hashOfKey = key.hashCode();
26        Integer nearestServerHash = hashOfFirstServerFoundClockWise(hashOfKey);
27        // once we get the hash
28        Server server = hashRing.get(nearestServerHash);
29
30        System.out.printf("Saving the key: %s on server: %s. Hash of the key: %s. Nearest Server Hash: %s.\n", key, server.getServerId(), key.hashCode(), nearestServerHash);
31        server.put(key, value);
32    }
33
34    public String get(String key) {
35        Integer hashOfKey = key.hashCode();
36        Integer nearestServerHash = hashOfFirstServerFoundClockWise(hashOfKey);
37        // once we get the hash
38        Server server = hashRing.get(nearestServerHash);
39        String value = server.get(key);
40        if (null == value || value.isEmpty()) {
41            System.out.printf("Unable to find the key: %s on server: %s\n", key, server.getServerId());
42            return "";
43        }
44        System.out.printf("Found the key: %s on server: %s\n", key, server.getServerId());
45        return server.get(key);
46    }
47
48
49    private Integer hashOfFirstServerFoundClockWise(Integer hashOfKey) {
50        if (!hashRing.containsKey(hashOfKey)) {
51            SortedMap<Integer, Server> tailMap = hashRing.tailMap(hashOfKey);
52            return tailMap.isEmpty() ? hashRing.firstKey() : tailMap.firstKey();
53        }
54        return hashOfKey;
55    }
56
57		// main class, this is where we initialise our consisten hashing class
58    public static void main(String[] args) {
59        // we'll initialise a distributed system with 5 servers all around the globe
60        System.out.println("------------------------");
61        System.out.println("---CONSISTENT HASHING---");
62        System.out.println("------------------------");
63        Server s1 = new Server("North-Virginia");
64        Server s2 = new Server("East-Mumbai");
65        Server s3 = new Server("China");
66        Server s4 = new Server("Eastern-Europe");
67        Server s5 = new Server("Tokyo-Japan");
68
69        ConsistentHashing consistentHashing = new ConsistentHashing();
70        // add these server over the hash ring
71        consistentHashing.addServer(s1);
72        consistentHashing.addServer(s2);
73        consistentHashing.addServer(s3);
74        consistentHashing.addServer(s4);
75        consistentHashing.addServer(s5);
76	       
77        System.out.println("------------------------");
78        System.out.println("Server Hashes: ");
79        System.out.println("------------------------");
80        System.out.println(consistentHashing.hashRing.keySet());
81        System.out.println("------------------------\n");
82        System.out.println("------------------------");
83        System.out.println("Saving the Keys");
84        System.out.println("------------------------");
85        consistentHashing.put("user-id-mathew", "mob+14241");
86        consistentHashing.put("user-id-john", "mob+12524");
87        consistentHashing.put("user-id-michelle", "mob+51512");
88        consistentHashing.put("user-id-trump", "mob+068432");
89        consistentHashing.put("user-id-kishan", "mob+015437");
90        consistentHashing.put("user-id-sonu", "mob+014678");
91        System.out.println("------------------------\n");
92        System.out.println("------------------------");
93        System.out.println("Fetching the Keys");
94        System.out.println("------------------------");
95        System.out.println("value: " + consistentHashing.get("user-id-mathew"));
96        System.out.println("value: " + consistentHashing.get("user-id-john"));
97        System.out.println("value: " + consistentHashing.get("user-id-michelle"));
98        System.out.println("value: " + consistentHashing.get("user-id-trump"));
99        System.out.println("value: " + consistentHashing.get("user-id-sonu"));
100
101
102        // since there is no replication, all of our data corresponding to a ranger of keys will be present on one node, if that goes down all our data will be wiped
103        // for e.g. let's remove the node that contains most of our data i.e. server called China
104        System.out.println("------------------------\n");
105        System.out.println("-------------------------");
106        System.out.println("--China Server Crashes--");
107        System.out.println("------------------------");
108        consistentHashing.removeServer(s3);
109        System.out.println("Fetching the Keys with One server down");
110        System.out.println("------------------------");
111        System.out.println("value: " + consistentHashing.get("user-id-mathew"));
112        System.out.println("value: " + consistentHashing.get("user-id-john"));
113        System.out.println("value: " + consistentHashing.get("user-id-michelle"));
114        System.out.println("value: " + consistentHashing.get("user-id-trump"));
115        System.out.println("value: " + consistentHashing.get("user-id-sonu"));
116        System.out.println("------------------------\n");
117        // notice john and sonu were put on the China server whose hashcode was the greatest of all, after it got removed, if we move clockwise the next server will be the very first which is Tokyo-Japan
118
119    }
120}

Here’s an implementation of Consistent Hashing with Replication taken into picture.

1import java.util.*;
2
3/**
4 * This class explains a basic implementation of consistent hashing with replication
5 * It will have methods to add a server, to remove a server
6 * Each server will have its own cache to store the keys
7 * We'll expose a simple get and put method that will help us save the data and retrieve the data from a list of nodes
8 * Here, everything will be in memory, consider the servers as the one that is deployed on a different machine though
9 */
10
11public class ConsistentHashingWithReplication {
12
13    private final int replicationFactor;
14    private final NavigableMap<Integer, Server> serverMap;
15
16    public ConsistentHashingWithReplication(int replicationFactor) {
17        this.replicationFactor = replicationFactor;
18        this.serverMap = new TreeMap<>();
19    }
20
21    public void addServer(Server server) {
22        // find the hash of the server
23        Integer serverHash = server.hashCode();
24        System.out.printf("Adding server: %s. Server Hash: %s\n", server.getServerId(), serverHash);
25
26        // we are basically using a simple hashcode function
27        serverMap.put(serverHash, server);
28    }
29
30    public void removeServer(Server server) {
31        Integer serverHash = server.hashCode();
32        serverMap.remove(serverHash);
33    }
34
35
36    public void put(String key, String value) {
37        // when we are trying to add data to this consistent hashing, what happens first?
38        // first we hash the key itself to get a number
39        // then we search from the clockwise from that number till the number where it matches the first server hashcode
40        // once found, we take the server and add that key there
41        // but since we have replication enabled, we'll keep on searching replicationFactor - 1 more nodes in the clockwise direction and add the data there as well
42        // such that in case the main server crashes we keep on serving the data
43        Integer hashOfKey = key.hashCode();
44        List<Integer> nearestNServerHashes = hashOfNServerFoundClockWise(hashOfKey);
45        System.out.println("------------------------");
46        nearestNServerHashes.forEach(serverHash -> {
47            // once we get the hash
48            Server server = serverMap.get(serverHash);
49            System.out.printf("Saving the key: %s on server: %s. Hash of the key: %s. Nearest Server Hash: %s.\n", key, server.getServerId(), key.hashCode(), serverHash);
50            server.put(key, value);
51        });
52        System.out.println("------------------------");
53    }
54
55    public String get(String key) {
56        Integer hashOfKey = key.hashCode();
57        List<Integer> nearestNServerHashes = hashOfNServerFoundClockWise(hashOfKey);
58
59        for (Integer serverHash : nearestNServerHashes) {
60            // once we get the hash
61            Server server = serverMap.get(serverHash);
62            String value = server.get(key);
63            if (null != value && !value.isEmpty()) {
64                System.out.printf("Found the key: %s on server: %s, server hash: %S \n", key, server.getServerId(), serverHash);
65                return server.get(key);
66            }
67        }
68        System.out.printf("Unable to find the key: %s on any server\n", key);
69        return "";
70    }
71
72
73    private List<Integer> hashOfNServerFoundClockWise(Integer hashOfKey) {
74
75        List<Integer> hashes = new ArrayList<>();
76        SortedMap<Integer, Server> tailMap = serverMap.tailMap(hashOfKey);
77        Integer firstServerHash = tailMap.isEmpty() ? serverMap.firstKey() : tailMap.firstKey();
78        hashes.add(firstServerHash);
79
80        Integer currentKey = firstServerHash;
81
82        for (int i = 1; i < replicationFactor; ++i) {
83            Map.Entry<Integer, Server> nextEntry = serverMap.higherEntry(currentKey);
84            if (nextEntry != null) {
85                currentKey = nextEntry.getKey();
86            } else {
87                // wrap around the ring
88                currentKey = serverMap.firstKey();
89            }
90            hashes.add(currentKey);
91        }
92        return hashes;
93    }
94
95    public static void main(String[] args) {
96        // we'll initialise a distributed system with 5 servers all around the globe
97        System.out.println("------------------------");
98        System.out.println("---CONSISTENT HASHING---");
99        System.out.println("------------------------");
100        Server s1 = new Server("North-Virginia");
101        Server s2 = new Server("East-Mumbai");
102        Server s3 = new Server("China");
103        Server s4 = new Server("Eastern-Europe");
104        Server s5 = new Server("Tokyo-Japan");
105
106        ConsistentHashingWithReplication consistentHashing = new ConsistentHashingWithReplication(2);
107        // add these server over the hash ring
108        consistentHashing.addServer(s1);
109        consistentHashing.addServer(s2);
110        consistentHashing.addServer(s3);
111        consistentHashing.addServer(s4);
112        consistentHashing.addServer(s5);
113        // when adding a new server the re-balancing should happen
114        System.out.println("------------------------");
115        System.out.println("Server Hashes: ");
116        System.out.println("------------------------");
117        System.out.println(consistentHashing.serverMap.keySet());
118        System.out.println("------------------------\n");
119        System.out.println("------------------------");
120        System.out.println("Saving the Keys (N = 2)");
121        System.out.println("------------------------");
122        consistentHashing.put("user-id-mathew", "mob+14241");
123        consistentHashing.put("user-id-john", "mob+12524");
124        consistentHashing.put("user-id-michelle", "mob+51512");
125        consistentHashing.put("user-id-trump", "mob+068432");
126        consistentHashing.put("user-id-sonu", "mob+014678");
127        System.out.println("------------------------\n");
128        System.out.println("------------------------");
129        System.out.println("Fetching the Keys");
130        System.out.println("------------------------");
131        System.out.println("value: " + consistentHashing.get("user-id-mathew"));
132        System.out.println("value: " + consistentHashing.get("user-id-john"));
133        System.out.println("value: " + consistentHashing.get("user-id-michelle"));
134        System.out.println("value: " + consistentHashing.get("user-id-trump"));
135        System.out.println("value: " + consistentHashing.get("user-id-sonu"));
136
137
138        // What will happen if the server china crashes? Since we have enabled replication of 2, we'll still be able to get the data
139        // All the data present in China is replicated to North-Virginia as well. So, if server in china crashes our application will stil
140        // be able to serve the data from North Virginia
141        System.out.println("------------------------\n");
142        System.out.println("-------------------------");
143        System.out.println("--China Server Crashes--");
144        System.out.println("------------------------");
145        consistentHashing.removeServer(s3);
146        System.out.println("Fetching the Keys:");
147        System.out.println("------------------------");
148        System.out.println("value: " + consistentHashing.get("user-id-mathew"));
149        System.out.println("value: " + consistentHashing.get("user-id-john"));
150        System.out.println("value: " + consistentHashing.get("user-id-michelle"));
151        System.out.println("value: " + consistentHashing.get("user-id-trump"));
152        System.out.println("value: " + consistentHashing.get("user-id-sonu"));
153        System.out.println("------------------------\n");
154
155        // What if North Virginia also goes down?
156        // Well in that case, the data belonging to the china server will get all wiped out, but the data that was meant for Virginia was replicated to Japan
157        consistentHashing.removeServer(s1);
158        System.out.println("-------------------------");
159        System.out.println("--North Virginia Server Crashes--");
160        System.out.println("------------------------");
161        System.out.println("Fetching the Keys:");
162        System.out.println("------------------------");
163        System.out.println("value: " + consistentHashing.get("user-id-mathew"));
164        System.out.println("value: " + consistentHashing.get("user-id-john"));
165        System.out.println("value: " + consistentHashing.get("user-id-michelle"));
166        System.out.println("value: " + consistentHashing.get("user-id-trump"));
167        System.out.println("value: " + consistentHashing.get("user-id-sonu"));
168    }
169}

Conclusion

Consistent hashing is a fundamental technique in system design that addresses the challenges of scalability, load balancing, and fault tolerance in distributed systems. By minimizing data movement and ensuring even distribution, it provides an efficient and reliable way to manage resources in a dynamic environment. With its widespread applications in distributed caching, CDNs, and databases, consistent hashing remains a cornerstone of modern system design.

.   .   .

The 0xkishan Newsletter

Subscribe to the newsletter to learn more about the decentralized web, AI and technology.

Comments on this article

Please be respectful!

© 2024 Kishan Kumar. All rights reserved.