Raft Internals
To understand the need for a consensus algorithm like Raft, we need to understand what it’s like to build a distributed fault-tolerant system. Imagine building a distributed data store and let’s write down the basic requirements.
- We provide a value to the store and it should persist the data for later retrieval.
- The state needs to be correct, i.e the data we provided, should match the data we receive upon retrieval.
- The data store should be resilient to common failures, i.e we should be able to retrieve the data even when the system lost a few nodes.
- The system should capable of self-healing, i.e. the data store should be able to get back online once failures are rectified.
With Raft we can design such a system and we’ll create a basic working version of such a data store in the following series. We’ll tackle each major part of the Raft algorithm and understand the system. Raft mentions one of its core responsibilities is state machine replication.
A state machine simply means the datastore in our requirements. If we start with the same data (empty or otherwise) in multiple nodes and apply the same set of operations on all the nodes, all of the nodes will end up in the same state. Raft ensures these operations are performed on all the nodes in the system and to make it possible, it needs all the data updates are performed on the Leader node and these updates are replicated to all other nodes. In distributed systems, we assume things will break down with time and strive to develop resilient, self-healing systems with no single points of failure. With Raft, If a leader is unavailable, any node in the system can take the role of Leader as long it gets elected by the majority of the nodes. This process is called Leader Election.
Leader Election
Let’s model a Node.
1from dataclasses import dataclass, field
2from enum import Enum
3
4class TCPServer:
5 pass
6
7class TCPClient:
8 pass
9
10class RaftStates(str, Enum):
11 FOLLOWER = "FOLLOWER"
12 CANDIDATE = "CANDIDATE"
13 LEADER = "LEADER"
14
15@dataclass
16class RaftState:
17 current_term: int = 0
18 state: RaftStates = RaftStates.FOLLOWER
19
20@dataclass
21class Raft:
22 raft_state: RaftState
23 local_id: str
24 local_address: str
25 server: TCPServer
All the nodes in the system are associated with an election term and Raft ensures there’s only one Leader for a given term. A Node starts with the role of Follower. It expects to hear from the Leader of the system at given intervals called heartbeats (random jitter on each node). If the Follower doesn’t receive these heartbeats, it promotes itself to the role of Candidate. The Candidate will vote for itself, increment its term and reach out to all the other nodes asking for votes. Each node can vote only for one node (including itself) and rejects the vote request, if it has already voted for the term. If the candidate gets a majority of the votes, it promotes itself to the Leader.
Follower
Let’s implement the Follower loop and the Leader heartbeat listener timeout.
1from datetime import datetime, timedelta
2from threading import Event, Thread
3from queue import Queue
4from typing import Optional
5
6def create_random_timeout(timeout: int) -> Event:
7 ev = Event()
8
9 def _timer():
10 _timeout = randint(timeout, timeout * 2) / 1000
11 sleep(_timeout)
12 ev.set()
13
14 th = Thread(target=_timer)
15 th.start()
16 return ev
17
18class RaftRPC(str, Enum):
19 APPEND_ENTRIES = "AppendEntries"
20
21@dataclass
22class Raft:
23 # ...other fields
24 received_rpcs: Queue
25 leader_id: Optional[str]
26 leader_address: Optional[str]
27 leader_lock: Lock
28 heartbeat_timeout: int = 150 # timeout for leader heartbeat in milliseconds
29 leader_last_contact: Optional[datetime] = None
30 leader_last_contact_lock: Lock
31
32 def set_state(self, state: RaftStates):
33 self.set_leader(None, None)
34 self.raft_state.state = state
35
36 def set_leader(self, leader_id: Optional[str], leader_address: Optional[str]):
37 with self.leader_lock:
38 self.leader_id = leader_id
39 self.leader_address = leader_address
40
41 def set_last_contact(self):
42 with self.leader_last_contact_lock:
43 self.leader_last_contact = datetime.utcnow()
44
45 def process_rpc_append_entries(self, rpc_identity, rpc):
46 self.set_last_contact()
47
48 def process_rpc(self, rpc):
49 kind = rpc["msg"]["kind"]
50 if kind == RaftRPC.APPEND_ENTRIES:
51 self.process_rpc_append_entries(rpc["identity"], rpc["msg"])
52 else:
53 pass
54
55 def run_follower(self):
56 heartbeat_timeout = create_random_timeout(self.heartbeat_timeout)
57 while self.raft_state.state == RaftStates.FOLLOWER:
58 if not self.received_rpcs.empty():
59 request_rpc = self.received_rpcs.get()
60 self.process_rpc(request_rpc)
61
62 if heartbeat_timeout.is_set():
63 if self.leader_last_contact:
64 time_since_last_contact = (
65 datetime.utcnow() - self.leader_last_contact
66 )
67 if time_since_last_contact > timedelta(
68 milliseconds=self.heartbeat_timeout
69 ):
70 self.set_leader(None, None)
71 self.set_state(RaftStates.CANDIDATE)
72 else:
73 heartbeat_timeout = create_random_timeout(self.heartbeat_timeout)
74 else:
75 self.set_leader(None, None)
76 self.set_state(RaftStates.CANDIDATE)
Once the Node assumes the role of Candidate it starts requesting votes from all the members of the cluster. If it does get the majority of the votes in its election term, it times out. This election timeout is significantly larger that the heartbeat timeout. If there exists a split vote (equal number of votes to 2 or more candidates) during an election term, some follower node other than the candidates would become a Candidate and ask for votes for a new term greater than the current election term and the current Candidates would reply to the new request favorably. If the Candidate gets a reply with a term greater than its current election term, it demotes itself to Follower.
Candidate
Let’s implement the Candidate loop and the Election timeout.
1def add_result_to_queue(fn: Callable, queue: Queue, *args, **kwargs):
2 try:
3 result = fn(*args, **kwargs)
4 queue.put_nowait(result)
5 except:
6 traceback.print_exc()
7
8@dataclass
9class Raft
10 # ... other methods and fields
11
12 def request_vote(self, term: int, member_id: str, member_address: str):
13 host, port = member_address.split(":", maxsplit=1)
14 client = TCPClient(host=host, port=int(port))
15 reply = {
16 "granted": False,
17 "term": term,
18 "member_id": member_id,
19 "member_address": member_address,
20 }
21
22 resp = client.send(
23 {
24 "kind": RaftRPC.REQUEST_VOTE,
25 "member_id": self.local_id,
26 "member_address": self.local_address,
27 "term": term,
28 },
29 timeout=100,
30 )
31
32 if resp:
33 reply["term"] = resp.get("term") or term
34 reply["granted"] = resp.get("granted") or False
35
36 client.close()
37 return reply
38
39 def start_election(self, term: int) -> Queue:
40 queue = Queue()
41 for member in self.members:
42 partial_fn = functools.partial(
43 add_result_to_queue,
44 self.request_vote,
45 queue,
46 )
47 th = Thread(
48 target=partial_fn,
49 args=(
50 term,
51 str(member["member_id"]),
52 str(member["member_address"]),
53 ),
54 )
55 th.start()
56
57 return queue
58
59 def run_candidate(self):
60 term = self.raft_state.current_term + 1
61 self.set_current_term(term)
62 votes = self.start_election(term)
63 votesInFavor, votesNeeded = 1, len(self.members) // 2
64 election_timeout = create_random_timeout(self.election_timeout)
65
66 while self.raft_state.state == RaftStates.CANDIDATE:
67 if not self.received_rpcs.empty():
68 request_rpc = self.received_rpcs.get()
69 self.process_rpc(request_rpc)
70
71 if not votes.empty():
72 vote = votes.get()
73 if vote["term"] > term:
74 self.set_current_term(vote["term"])
75 self.set_state(RaftStates.FOLLOWER)
76 return
77
78 if vote["granted"]:
79 votesInFavor += 1
80
81 if votesInFavor > votesNeeded:
82 self.set_leader(self.local_id, self.local_address)
83 self.set_state(RaftStates.LEADER)
84 return
85
86 if election_timeout.is_set():
87 print("Election timedout reached, restarting election")
88 return
To handle this RequestVote RPC call from the Candidate, Let’s add a handler.
1class RaftRPC(str, Enum):
2 REQUEST_VOTE = "RequestVote"
3 APPEND_ENTRIES = "AppendEntries"
4
5@dataclass
6class RaftState:
7 current_term: int
8 last_voted_term: Optional[int] = None
9 votes: Dict[int, str] = field(default_factory=lambda: {})
10 state: RaftStates = RaftStates.FOLLOWER
11
12@dataclass
13class Raft:
14 # ... other members and methods
15 def get_voted_for_candidate(self, term: int) -> Optional[str]:
16 return self.raft_state.votes.get(term)
17
18 def set_voted_for_candidate(self, term: int, candidate: str) -> None:
19 if self.raft_state.last_voted_term is None:
20 self.raft_state.last_voted_term = term
21 else:
22 if term > self.raft_state.last_voted_term:
23 self.raft_state.last_voted_term = term
24
25 if term in self.raft_state.votes:
26 raise Exception(f"Duplicate candidate for term [{term}]")
27
28 self.raft_state.votes[term] = candidate
29
30 def process_rpc(self, rpc):
31 try:
32 rpc_identity = rpc["identity"]
33 rpc_msg = rpc["msg"]
34 kind = rpc_msg.pop("kind")
35 if kind == RaftRPC.APPEND_ENTRIES:
36 self.process_rpc_append_entries(rpc_identity, rpc_msg)
37 elif kind == RaftRPC.REQUEST_VOTE:
38 self.process_rpc_vote_for(rpc_identity, rpc_msg)
39 else:
40 self.server.reply(rpc_identity, None)
41 except:
42 traceback.print_exc()
43
44 def process_rpc_vote_for(self, rpc_identity, rpc):
45 candidate_id = rpc["member_id"]
46 candidate_term = rpc["term"]
47 leader_id = self.leader_id
48 current_term = self.raft_state.current_term
49
50 resp = {
51 "member_id": self.local_id,
52 "member_address": self.local_address,
53 "term": current_term,
54 "granted": True,
55 }
56
57 if leader_id != None and leader_id != candidate_id:
58 resp["granted"] = False
59 print(f"We already have a leader: {self.leader_id}")
60
61 elif candidate_term < current_term:
62 resp["granted"] = False
63
64 else:
65 if candidate_term > current_term:
66 self.set_state(RaftStates.FOLLOWER)
67 self.set_current_term(candidate_term)
68 resp["term"] = candidate_term
69
70 last_voted_term = self.raft_state.last_voted_term
71
72 if last_voted_term != None and last_voted_term == candidate_term:
73 last_voted_for_candidate = self.get_voted_for_candidate(last_voted_term)
74 if candidate_id != last_voted_for_candidate:
75 print(
76 f"Rejecting vote from {candidate_id}. "
77 f"We already voted for {last_voted_for_candidate} in term {candidate_term}"
78 )
79 resp["granted"] = False
80 else:
81 print(
82 f"Received deplicate vote from {candidate_id} in term {candidate_term}"
83 )
84
85 if resp["granted"]:
86 print(f"Candidate {candidate_id} is our leader for term {candidate_term}")
87 self.set_voted_for_candidate(candidate_term, candidate_id)
88 self.set_last_contact()
89
90 self.server.reply(rpc_identity, resp)
Leader
Going on ahead, once the Candidate node transitions to the Leader, it’s responsible to send out heartbeats in form with additional replication info. Let’s implement the Leader loop now.
1class Raft:
2 # ... other members and methods
3 def replicate(self, step_down: Event, member_id: int, member_address: str):
4 host, port = member_address.split(":")
5 client = TCPClient(host, int(port))
6 while not step_down.is_set():
7 req = {
8 "kind": RaftRPC.APPEND_ENTRIES,
9 "member_id": self.local_id,
10 "member_address": self.local_address,
11 "term": self.raft_state.current_term,
12 }
13 resp = client.send(req, timeout=100)
14 if resp is None:
15 sleep(0.125)
16 else:
17 if resp["term"] > req["term"]:
18 step_down.set()
19 break
20 client.close()
21
22 def start_replication(self, step_down: Event):
23 threads = []
24 print(f"Starting Replicating for term {self.raft_state.current_term}")
25 for member in self.members:
26 th = Thread(
27 target=self.replicate,
28 args=(step_down, member["member_id"], member["member_address"]),
29 )
30 threads.append(th)
31 for th in threads:
32 th.start()
33
34 def run_leader(self):
35 step_down = Event()
36 self.start_replication(step_down)
37
38 while self.raft_state.state == RaftStates.LEADER:
39 if not self.received_rpcs.empty():
40 request_rpc = self.received_rpcs.get()
41 self.process_rpc(request_rpc)
42
43 if step_down.is_set():
44 self.set_state(RaftStates.FOLLOWER)
Tying up everything
We include the start method to start the Raft instance
1class Raft:
2 # ... other
3 def start(self):
4 print("Starting Raft")
5 print(self)
6 self.server.listen()
7 self.set_state(RaftStates.FOLLOWER)
8 self.set_current_term(0)
9 with futures.ThreadPoolExecutor(max_workers=10) as exec:
10 exec.submit(self.run)
11 exec.submit(self.run_store)
With this, we have tackled the first promise of a Raft implementation, Leader Election. We’ll tackle Log Replication and State Safety in the future. We’ll also write our state machine implementation and expose it over HTTP API later on in this series. We still haven’t talked about membership and discovery.
References
NOTE
This won’t serve as a production-ready Raft implementation and I am not trying to make it so.