July 23, 2024
Photo by Google DeepMind on Unsplash
Consistent hashing is a fundamental technique in System Design that addresses the challenges of scalability, load balancing, and fault tolerance in distributed systems. This article explores what consistent hashing is, why it's necessary, and how it benefits modern distributed systems, using Java examples to illustrate its implementation.
To understand Consistent Hashing, let's first look at a simple hashing scenario:
Example: Imagine we have a system with 4 servers and 6 data items. A simple approach to distribute those data items to our 4 servers might be to use a simple hash function and the modulo operator:
To assign our data item to a server we might use the following function:
1server_number = hash(date_item) % number_of_servers
data_item | hash(date_item) | number_of_servers | server_number |
---|---|---|---|
1 | 892 | 4 | 0 |
2 | 765 | 4 | 1 |
3 | 2056 | 4 | 0 |
4 | 871 | 4 | 3 |
5 | 6907 | 4 | 3 |
6 | 12 | 4 | 2 |
As we can see this method redistributes our data in an evenly manner to all of our servers. If we want to know on which server our data lies, all we need to do so is pass our data_item to the hash function and it will return us the server number.
E.g.; if I want to know on which server my date_item: 5 lies, I will first hash it which will give me 6907 then I will modulo it with the total number of servers: 4, which will give me : 3. This means my data_item is located on server 3.
This works fine until we need to remove or add a server. When that happens, most of our data items will need to be remapped, causing a significant redistribution of data.
Let's say one of our servers needs to be removed, that will affect the function that we used earlier
data_item | hash(date_item) | number_of_servers | server_number |
---|---|---|---|
1 | 892 | 3 | 1 |
2 | 765 | 3 | 0 |
3 | 2056 | 3 | 1 |
4 | 871 | 3 | 1 |
5 | 6907 | 3 | 1 |
6 | 12 | 3 | 0 |
We have to redistribute data from almost every server in our case. This is a huge challenge if the volume of data is too big.
Let's look at how consistent hashing solves this problem, but before that, we'll need to understand a few of the tech jargon. Unfortunately, this can't be skipped.
The core idea of consistent hashing is to use a circular hash space (often referred to as a hash ring). Both the data and the nodes (servers) are hashed into this ring. The ring's hash values range from 0 to 232 - 1 (let's say we are using Java's inbuilt hashcode method). Note: the starting value of .hashcode is negative but we are omitting that for simplicity.
Each node (server) is assigned a position on the hash ring based on its hash value. This is typically done by hashing the node's identifier (e.g., IP address or some sort of serverId)
Each piece of data is also hashed to a position on the ring. The data is assigned to the first node it encounters while moving clockwise around the ring.
Let's take an example to understand it better
Consistent Hashing
In the above figure, we have 2 servers denoted by S1 and S2. Server S1 has serverId: North-Virginia and a hash value of 100. This hash can be generated using any hash function but for the sake of simplicity, we are assuming it to be 100. Whereas S2 has serverId: East-Mumbai and a hash of 500.
Now, these servers or nodes are placed on our hash ring whose starting value is 0 and the ending value is 2³² - 1.
Now, how do we decide which data will go to which node? Well, our data placement principle will help us. We will first take the date say the one with the key: 1. We hash it and find that its keyHash is 25. Now we move clockwise from that point till we find a server, and the first server that we encounter is S1 so we save the data on that node. Similarly, let's say our data key is 4, we hash it and find the value to be 128. We move clockwise till we find a server and we find S2 so we save the data on S2.
Let's say we added one more server with serverId: Eastern-Europe hashed its value and found it to be 980. What will happen in that case?
Node Addition (S3)
Notice that the data with key: 6 whose hash was 824 earlier resides in S1. But now according to the data placement policy if we move clockwise from it till we find a server we found S3 and not S1. So in that case we'll save the data 6 to S3.
Did you notice something? The addition of a server didn't result in the remapping of all the data as it happened in the very first example.
That's the potential of what consistent hashing is capable of. Here only the data that has a hash value between S2 and S3 needs to be moved from S1 to S3. Other data will remain as it is.
One drawback of this approach is that when a new node joins the system, it needs to "steal" its key ranges from the other nodes. However, the nodes handling the key ranges off to the new node have to scan their local persistence store to retrieve the appropriate set of data items,Note that performing such a scan operation on a production node is tricky as scans are highly resource intensive operations and they need to be executed in the background without affecting the customer performance. This requires us to run the bootstrapping task at the lowest priority. However, this significantly slows the bootstrapping process and during busy shopping season, when the nodes are handling millions of requests a day, the bootstrapping can take a day to complete. Second, when a node joins/leaves the system, the key ranges handled by many nodes change and the Merkle trees for the new ranges need to be re-calculated, which is a non-trivial operation to perform on a production system.
Finally, there is no easy way to take a snapshot of the entire key space due to the randomness in the key ranges, and this makes the archival complicated.
Let's take a scenario where an existing server crashes. How will our data remapped? In this case, again we'll follow our data placement policy, where we keep on moving in a clockwise direction until we find an available server. In this case, 3 and 4 data will be remapped to the server S3. Only the data present between S1 and S2 need to be remapped to S3. Thus there is a limited set of data movement.
Node Crash
You might have noticed that we have placed the servers such that they have the same partition size i.e. the distance between each of those servers, but it is impossible to keep the same size of a partitions on the ring for all servers. Again it all depends on the hashes of the server. The next server S4 hash may be very close to S1.
As we also saw that when we removed S2, the partition size of S3 was twice as big as that of S1. How can we address this issue? The answer lies in Virtual Nodes.
You might ask what are virtual nodes, to put it in simple terms, it is a real node only but by tweaking the server hash we can place them at different positions in the hash ring. Think of them as multiple instances of your physical node.
Virtual nodes (or v-nodes) are an extension of the consistent hashing technique that further improves the distribution of data across nodes in a distributed system
Let's understand this via an Example. Say all the servers S1, S2, and S3 (physical nodes) have two virtual nodes.
Virtual Nodes
In the above figure, each physical node has two virtual nodes. S1_1 and S1_2 are virtual nodes of S1. Notice how evenly the data space is distributed among different physical nodes. In practice, the number of these virtual nodes are quite large, sometimes in thousands.
To find which server a key (or data) is stored on, we go clockwise from the key's location and find the first virtual node encountered on the ring. If we take an example of data: 9. If we go clockwise from its location the nearest virtual node is S3_2. So the data will be stored on the S3 physical node. Note: without the introduction of virtual node, the data used to be stored on S2.
Replication in consistent hashing is a technique used to improve fault tolerance and data availability in distributed systems. By replicating data across multiple nodes, the system ensures that even if one or more nodes fail, the data remains accessible from other nodes.
Primary and Replica Nodes: When a piece of data is hashed to a position on the hash ring, it is assigned to a primary node. Additionally, the data is also replicated to a specified number of replica nodes that are placed sequentially after the primary node in a clockwise direction.
Replication Factor: The replication factor (N) determines the number of replicas for each piece of data. For example, if N = 3, there will be one primary node and two replica nodes for each piece of data. Now, the replicas are chosen in a way that it doesn't lie on the same virtual node but on a different virtual node. So, if I want to replicate the data for S1. I will put the data on S2's virtual node and S3's virtual node. Thus, S1 will be my primary node whereas S2 and S3 contain the replicated data.
One of the benefits of replication is that it makes the whole system fault-tolerant. If a primary node fails, the data can still be accessed from one of its replicas, ensuring the system remains operational. Also, replication increases the availability of data, making it possible for multiple nodes to serve read requests simultaneously, thereby improving read performance. By distributing read requests across multiple replicas, the system can balance the load more effectively, preventing any single node from becoming a bottleneck.
First, let's create a Server class. This class will hold the data. We'll use a hashmap to hold our data. Note: everything is done in In-Memory. This server class will also have an ID called serverId.
1public class Server {
2 private final Map<String, String> map;
3 private final String serverId;
4
5 public Server(String serverId) {
6 this.map = new HashMap<>();
7 this.serverId = serverId;
8 }
9
10 public void put(String key, String value) {
11 map.put(key, value);
12 }
13
14 public String get(String key) {
15 return map.getOrDefault(key, "");
16 }
17
18 public String getServerId() {
19 // each server will be assigned a random server id,
20 // we'll basically hash this server id to get the position of our node in the consistent hash ring
21 return serverId;
22 }
23
24 @Override
25 public String toString() {
26 return String.format("server-id: %s, server-hash: %s", serverId, serverId.hashCode());
27 }
28 }
Now we’ll create a ConsistentHashing class. This class implements the hashRing as a NavigableMap. Why NavigableMap? Because this map is easy to traverse and all the hash values are sorted in increasing order which is what we want.
We have exposed several methods addServer(), addServer(), put(), get(). The code is heavily documented for the users so that it can be easily followed.
When we call put() method, we first hash the key using .hashcode() and find the nearest server in a clockwise direction, when found we fetch that server instance and save the data only to that server.
Similarly, when we want to get the key, we figure out on which server it lies by navigating through the hash ring and once found we return it.
1public class ConsistentHashing {
2 private final NavigableMap<Integer, Server> hashRing;
3
4 public ConsistentHashing() {
5 this.hashRing = new TreeMap<>();
6 }
7
8 public void addServer(Server server) {
9 // find the hash of the server
10 Integer serverHash = server.hashCode();
11 System.out.printf("Adding server: %s. Server Hash: %s\n", server.getServerId(), serverHash);
12 hashRing.put(serverHash, server);
13 }
14
15 public void removeServer(Server server) {
16 Integer serverHash = server.hashCode();
17 hashRing.remove(serverHash);
18 }
19
20 public void put(String key, String value) {
21 // when we are trying to add data to this consistent hashing, what happens first?
22 // first we hash the key itself to get a number
23 // then we search from the clockwise from that number till the number where it matches the first server hashcode
24 // once found, we take the server and add that key there
25 Integer hashOfKey = key.hashCode();
26 Integer nearestServerHash = hashOfFirstServerFoundClockWise(hashOfKey);
27 // once we get the hash
28 Server server = hashRing.get(nearestServerHash);
29
30 System.out.printf("Saving the key: %s on server: %s. Hash of the key: %s. Nearest Server Hash: %s.\n", key, server.getServerId(), key.hashCode(), nearestServerHash);
31 server.put(key, value);
32 }
33
34 public String get(String key) {
35 Integer hashOfKey = key.hashCode();
36 Integer nearestServerHash = hashOfFirstServerFoundClockWise(hashOfKey);
37 // once we get the hash
38 Server server = hashRing.get(nearestServerHash);
39 String value = server.get(key);
40 if (null == value || value.isEmpty()) {
41 System.out.printf("Unable to find the key: %s on server: %s\n", key, server.getServerId());
42 return "";
43 }
44 System.out.printf("Found the key: %s on server: %s\n", key, server.getServerId());
45 return server.get(key);
46 }
47
48
49 private Integer hashOfFirstServerFoundClockWise(Integer hashOfKey) {
50 if (!hashRing.containsKey(hashOfKey)) {
51 SortedMap<Integer, Server> tailMap = hashRing.tailMap(hashOfKey);
52 return tailMap.isEmpty() ? hashRing.firstKey() : tailMap.firstKey();
53 }
54 return hashOfKey;
55 }
56
57 // main class, this is where we initialise our consisten hashing class
58 public static void main(String[] args) {
59 // we'll initialise a distributed system with 5 servers all around the globe
60 System.out.println("------------------------");
61 System.out.println("---CONSISTENT HASHING---");
62 System.out.println("------------------------");
63 Server s1 = new Server("North-Virginia");
64 Server s2 = new Server("East-Mumbai");
65 Server s3 = new Server("China");
66 Server s4 = new Server("Eastern-Europe");
67 Server s5 = new Server("Tokyo-Japan");
68
69 ConsistentHashing consistentHashing = new ConsistentHashing();
70 // add these server over the hash ring
71 consistentHashing.addServer(s1);
72 consistentHashing.addServer(s2);
73 consistentHashing.addServer(s3);
74 consistentHashing.addServer(s4);
75 consistentHashing.addServer(s5);
76
77 System.out.println("------------------------");
78 System.out.println("Server Hashes: ");
79 System.out.println("------------------------");
80 System.out.println(consistentHashing.hashRing.keySet());
81 System.out.println("------------------------\n");
82 System.out.println("------------------------");
83 System.out.println("Saving the Keys");
84 System.out.println("------------------------");
85 consistentHashing.put("user-id-mathew", "mob+14241");
86 consistentHashing.put("user-id-john", "mob+12524");
87 consistentHashing.put("user-id-michelle", "mob+51512");
88 consistentHashing.put("user-id-trump", "mob+068432");
89 consistentHashing.put("user-id-kishan", "mob+015437");
90 consistentHashing.put("user-id-sonu", "mob+014678");
91 System.out.println("------------------------\n");
92 System.out.println("------------------------");
93 System.out.println("Fetching the Keys");
94 System.out.println("------------------------");
95 System.out.println("value: " + consistentHashing.get("user-id-mathew"));
96 System.out.println("value: " + consistentHashing.get("user-id-john"));
97 System.out.println("value: " + consistentHashing.get("user-id-michelle"));
98 System.out.println("value: " + consistentHashing.get("user-id-trump"));
99 System.out.println("value: " + consistentHashing.get("user-id-sonu"));
100
101
102 // since there is no replication, all of our data corresponding to a ranger of keys will be present on one node, if that goes down all our data will be wiped
103 // for e.g. let's remove the node that contains most of our data i.e. server called China
104 System.out.println("------------------------\n");
105 System.out.println("-------------------------");
106 System.out.println("--China Server Crashes--");
107 System.out.println("------------------------");
108 consistentHashing.removeServer(s3);
109 System.out.println("Fetching the Keys with One server down");
110 System.out.println("------------------------");
111 System.out.println("value: " + consistentHashing.get("user-id-mathew"));
112 System.out.println("value: " + consistentHashing.get("user-id-john"));
113 System.out.println("value: " + consistentHashing.get("user-id-michelle"));
114 System.out.println("value: " + consistentHashing.get("user-id-trump"));
115 System.out.println("value: " + consistentHashing.get("user-id-sonu"));
116 System.out.println("------------------------\n");
117 // notice john and sonu were put on the China server whose hashcode was the greatest of all, after it got removed, if we move clockwise the next server will be the very first which is Tokyo-Japan
118
119 }
120}
Here’s an implementation of Consistent Hashing with Replication taken into picture.
1import java.util.*;
2
3/**
4 * This class explains a basic implementation of consistent hashing with replication
5 * It will have methods to add a server, to remove a server
6 * Each server will have its own cache to store the keys
7 * We'll expose a simple get and put method that will help us save the data and retrieve the data from a list of nodes
8 * Here, everything will be in memory, consider the servers as the one that is deployed on a different machine though
9 */
10
11public class ConsistentHashingWithReplication {
12
13 private final int replicationFactor;
14 private final NavigableMap<Integer, Server> serverMap;
15
16 public ConsistentHashingWithReplication(int replicationFactor) {
17 this.replicationFactor = replicationFactor;
18 this.serverMap = new TreeMap<>();
19 }
20
21 public void addServer(Server server) {
22 // find the hash of the server
23 Integer serverHash = server.hashCode();
24 System.out.printf("Adding server: %s. Server Hash: %s\n", server.getServerId(), serverHash);
25
26 // we are basically using a simple hashcode function
27 serverMap.put(serverHash, server);
28 }
29
30 public void removeServer(Server server) {
31 Integer serverHash = server.hashCode();
32 serverMap.remove(serverHash);
33 }
34
35
36 public void put(String key, String value) {
37 // when we are trying to add data to this consistent hashing, what happens first?
38 // first we hash the key itself to get a number
39 // then we search from the clockwise from that number till the number where it matches the first server hashcode
40 // once found, we take the server and add that key there
41 // but since we have replication enabled, we'll keep on searching replicationFactor - 1 more nodes in the clockwise direction and add the data there as well
42 // such that in case the main server crashes we keep on serving the data
43 Integer hashOfKey = key.hashCode();
44 List<Integer> nearestNServerHashes = hashOfNServerFoundClockWise(hashOfKey);
45 System.out.println("------------------------");
46 nearestNServerHashes.forEach(serverHash -> {
47 // once we get the hash
48 Server server = serverMap.get(serverHash);
49 System.out.printf("Saving the key: %s on server: %s. Hash of the key: %s. Nearest Server Hash: %s.\n", key, server.getServerId(), key.hashCode(), serverHash);
50 server.put(key, value);
51 });
52 System.out.println("------------------------");
53 }
54
55 public String get(String key) {
56 Integer hashOfKey = key.hashCode();
57 List<Integer> nearestNServerHashes = hashOfNServerFoundClockWise(hashOfKey);
58
59 for (Integer serverHash : nearestNServerHashes) {
60 // once we get the hash
61 Server server = serverMap.get(serverHash);
62 String value = server.get(key);
63 if (null != value && !value.isEmpty()) {
64 System.out.printf("Found the key: %s on server: %s, server hash: %S \n", key, server.getServerId(), serverHash);
65 return server.get(key);
66 }
67 }
68 System.out.printf("Unable to find the key: %s on any server\n", key);
69 return "";
70 }
71
72
73 private List<Integer> hashOfNServerFoundClockWise(Integer hashOfKey) {
74
75 List<Integer> hashes = new ArrayList<>();
76 SortedMap<Integer, Server> tailMap = serverMap.tailMap(hashOfKey);
77 Integer firstServerHash = tailMap.isEmpty() ? serverMap.firstKey() : tailMap.firstKey();
78 hashes.add(firstServerHash);
79
80 Integer currentKey = firstServerHash;
81
82 for (int i = 1; i < replicationFactor; ++i) {
83 Map.Entry<Integer, Server> nextEntry = serverMap.higherEntry(currentKey);
84 if (nextEntry != null) {
85 currentKey = nextEntry.getKey();
86 } else {
87 // wrap around the ring
88 currentKey = serverMap.firstKey();
89 }
90 hashes.add(currentKey);
91 }
92 return hashes;
93 }
94
95 public static void main(String[] args) {
96 // we'll initialise a distributed system with 5 servers all around the globe
97 System.out.println("------------------------");
98 System.out.println("---CONSISTENT HASHING---");
99 System.out.println("------------------------");
100 Server s1 = new Server("North-Virginia");
101 Server s2 = new Server("East-Mumbai");
102 Server s3 = new Server("China");
103 Server s4 = new Server("Eastern-Europe");
104 Server s5 = new Server("Tokyo-Japan");
105
106 ConsistentHashingWithReplication consistentHashing = new ConsistentHashingWithReplication(2);
107 // add these server over the hash ring
108 consistentHashing.addServer(s1);
109 consistentHashing.addServer(s2);
110 consistentHashing.addServer(s3);
111 consistentHashing.addServer(s4);
112 consistentHashing.addServer(s5);
113 // when adding a new server the re-balancing should happen
114 System.out.println("------------------------");
115 System.out.println("Server Hashes: ");
116 System.out.println("------------------------");
117 System.out.println(consistentHashing.serverMap.keySet());
118 System.out.println("------------------------\n");
119 System.out.println("------------------------");
120 System.out.println("Saving the Keys (N = 2)");
121 System.out.println("------------------------");
122 consistentHashing.put("user-id-mathew", "mob+14241");
123 consistentHashing.put("user-id-john", "mob+12524");
124 consistentHashing.put("user-id-michelle", "mob+51512");
125 consistentHashing.put("user-id-trump", "mob+068432");
126 consistentHashing.put("user-id-sonu", "mob+014678");
127 System.out.println("------------------------\n");
128 System.out.println("------------------------");
129 System.out.println("Fetching the Keys");
130 System.out.println("------------------------");
131 System.out.println("value: " + consistentHashing.get("user-id-mathew"));
132 System.out.println("value: " + consistentHashing.get("user-id-john"));
133 System.out.println("value: " + consistentHashing.get("user-id-michelle"));
134 System.out.println("value: " + consistentHashing.get("user-id-trump"));
135 System.out.println("value: " + consistentHashing.get("user-id-sonu"));
136
137
138 // What will happen if the server china crashes? Since we have enabled replication of 2, we'll still be able to get the data
139 // All the data present in China is replicated to North-Virginia as well. So, if server in china crashes our application will stil
140 // be able to serve the data from North Virginia
141 System.out.println("------------------------\n");
142 System.out.println("-------------------------");
143 System.out.println("--China Server Crashes--");
144 System.out.println("------------------------");
145 consistentHashing.removeServer(s3);
146 System.out.println("Fetching the Keys:");
147 System.out.println("------------------------");
148 System.out.println("value: " + consistentHashing.get("user-id-mathew"));
149 System.out.println("value: " + consistentHashing.get("user-id-john"));
150 System.out.println("value: " + consistentHashing.get("user-id-michelle"));
151 System.out.println("value: " + consistentHashing.get("user-id-trump"));
152 System.out.println("value: " + consistentHashing.get("user-id-sonu"));
153 System.out.println("------------------------\n");
154
155 // What if North Virginia also goes down?
156 // Well in that case, the data belonging to the china server will get all wiped out, but the data that was meant for Virginia was replicated to Japan
157 consistentHashing.removeServer(s1);
158 System.out.println("-------------------------");
159 System.out.println("--North Virginia Server Crashes--");
160 System.out.println("------------------------");
161 System.out.println("Fetching the Keys:");
162 System.out.println("------------------------");
163 System.out.println("value: " + consistentHashing.get("user-id-mathew"));
164 System.out.println("value: " + consistentHashing.get("user-id-john"));
165 System.out.println("value: " + consistentHashing.get("user-id-michelle"));
166 System.out.println("value: " + consistentHashing.get("user-id-trump"));
167 System.out.println("value: " + consistentHashing.get("user-id-sonu"));
168 }
169}
Consistent hashing is a fundamental technique in system design that addresses the challenges of scalability, load balancing, and fault tolerance in distributed systems. By minimizing data movement and ensuring even distribution, it provides an efficient and reliable way to manage resources in a dynamic environment. With its widespread applications in distributed caching, CDNs, and databases, consistent hashing remains a cornerstone of modern system design.
Subscribe to the newsletter to learn more about the decentralized web, AI and technology.
Please be respectful!