Raft [Part - 2]. Let's examine the node responsibilities.

images/shaking-hands-neon-sign.jpg

Raft Internals

To understand the need for a consensus algorithm like Raft, we need to understand what it’s like to build a distributed fault-tolerant system. Imagine building a distributed data store and let’s write down the basic requirements.

  • We provide a value to the store and it should persist the data for later retrieval.
  • The state needs to be correct, i.e the data we provided, should match the data we receive upon retrieval.
  • The data store should be resilient to common failures, i.e we should be able to retrieve the data even when the system lost a few nodes.
  • The system should capable of self-healing, i.e. the data store should be able to get back online once failures are rectified.

With Raft we can design such a system and we’ll create a basic working version of such a data store in the following series. We’ll tackle each major part of the Raft algorithm and understand the system. Raft mentions one of its core responsibilities is state machine replication.

A state machine simply means the datastore in our requirements. If we start with the same data (empty or otherwise) in multiple nodes and apply the same set of operations on all the nodes, all of the nodes will end up in the same state. Raft ensures these operations are performed on all the nodes in the system and to make it possible, it needs all the data updates are performed on the Leader node and these updates are replicated to all other nodes. In distributed systems, we assume things will break down with time and strive to develop resilient, self-healing systems with no single points of failure. With Raft, If a leader is unavailable, any node in the system can take the role of Leader as long it gets elected by the majority of the nodes. This process is called Leader Election.

Leader Election

Let’s model a Node.

 1from dataclasses import dataclass, field
 2from enum import Enum
 3
 4class TCPServer:
 5    pass
 6
 7class TCPClient:
 8    pass
 9
10class RaftStates(str, Enum):
11    FOLLOWER = "FOLLOWER"
12    CANDIDATE = "CANDIDATE"
13    LEADER = "LEADER"
14
15@dataclass
16class RaftState:
17    current_term: int = 0
18    state: RaftStates = RaftStates.FOLLOWER
19
20@dataclass
21class Raft:
22    raft_state: RaftState
23    local_id: str
24    local_address: str
25    server: TCPServer

All the nodes in the system are associated with an election term and Raft ensures there’s only one Leader for a given term. A Node starts with the role of Follower. It expects to hear from the Leader of the system at given intervals called heartbeats (random jitter on each node). If the Follower doesn’t receive these heartbeats, it promotes itself to the role of Candidate. The Candidate will vote for itself, increment its term and reach out to all the other nodes asking for votes. Each node can vote only for one node (including itself) and rejects the vote request, if it has already voted for the term. If the candidate gets a majority of the votes, it promotes itself to the Leader.

Follower

Let’s implement the Follower loop and the Leader heartbeat listener timeout.

 1from datetime import datetime, timedelta
 2from threading import Event, Thread
 3from queue import Queue
 4from typing import Optional
 5
 6def create_random_timeout(timeout: int) -> Event:
 7    ev = Event()
 8
 9    def _timer():
10        _timeout = randint(timeout, timeout * 2) / 1000
11        sleep(_timeout)
12        ev.set()
13
14    th = Thread(target=_timer)
15    th.start()
16    return ev
17
18class RaftRPC(str, Enum):
19    APPEND_ENTRIES = "AppendEntries"
20
21@dataclass
22class Raft:
23    # ...other fields
24    received_rpcs: Queue
25    leader_id: Optional[str]
26    leader_address: Optional[str]
27    leader_lock: Lock
28    heartbeat_timeout: int = 150 # timeout for leader heartbeat in milliseconds
29    leader_last_contact: Optional[datetime] = None
30    leader_last_contact_lock: Lock
31    
32    def set_state(self, state: RaftStates):
33        self.set_leader(None, None)
34        self.raft_state.state = state
35
36    def set_leader(self, leader_id: Optional[str], leader_address: Optional[str]):
37        with self.leader_lock:
38            self.leader_id = leader_id
39            self.leader_address = leader_address
40
41    def set_last_contact(self):
42        with self.leader_last_contact_lock:
43            self.leader_last_contact = datetime.utcnow()
44
45    def process_rpc_append_entries(self, rpc_identity, rpc):
46        self.set_last_contact()
47
48    def process_rpc(self, rpc):
49        kind = rpc["msg"]["kind"]
50        if kind == RaftRPC.APPEND_ENTRIES:
51            self.process_rpc_append_entries(rpc["identity"], rpc["msg"])
52        else:
53            pass
54
55    def run_follower(self):
56        heartbeat_timeout = create_random_timeout(self.heartbeat_timeout)
57        while self.raft_state.state == RaftStates.FOLLOWER:
58            if not self.received_rpcs.empty():
59                request_rpc = self.received_rpcs.get()
60                self.process_rpc(request_rpc)
61
62            if heartbeat_timeout.is_set():
63                if self.leader_last_contact:
64                    time_since_last_contact = (
65                        datetime.utcnow() - self.leader_last_contact
66                    )
67                    if time_since_last_contact > timedelta(
68                        milliseconds=self.heartbeat_timeout
69                    ):
70                        self.set_leader(None, None)
71                        self.set_state(RaftStates.CANDIDATE)
72                    else:
73                        heartbeat_timeout = create_random_timeout(self.heartbeat_timeout)
74                else:
75                    self.set_leader(None, None)
76                    self.set_state(RaftStates.CANDIDATE)

Once the Node assumes the role of Candidate it starts requesting votes from all the members of the cluster. If it does get the majority of the votes in its election term, it times out. This election timeout is significantly larger that the heartbeat timeout. If there exists a split vote (equal number of votes to 2 or more candidates) during an election term, some follower node other than the candidates would become a Candidate and ask for votes for a new term greater than the current election term and the current Candidates would reply to the new request favorably. If the Candidate gets a reply with a term greater than its current election term, it demotes itself to Follower.

Candidate

Let’s implement the Candidate loop and the Election timeout.

 1def add_result_to_queue(fn: Callable, queue: Queue, *args, **kwargs):
 2    try:
 3        result = fn(*args, **kwargs)
 4        queue.put_nowait(result)
 5    except:
 6        traceback.print_exc()
 7
 8@dataclass
 9class Raft
10    # ... other methods and fields 
11
12    def request_vote(self, term: int, member_id: str, member_address: str):
13        host, port = member_address.split(":", maxsplit=1)
14        client = TCPClient(host=host, port=int(port))
15        reply = {
16            "granted": False,
17            "term": term,
18            "member_id": member_id,
19            "member_address": member_address,
20        }
21
22        resp = client.send(
23            {
24                "kind": RaftRPC.REQUEST_VOTE,
25                "member_id": self.local_id,
26                "member_address": self.local_address,
27                "term": term,
28            },
29            timeout=100,
30        )
31
32        if resp:
33            reply["term"] = resp.get("term") or term
34            reply["granted"] = resp.get("granted") or False
35
36        client.close()
37        return reply
38
39    def start_election(self, term: int) -> Queue:
40        queue = Queue()
41        for member in self.members:
42            partial_fn = functools.partial(
43                add_result_to_queue,
44                self.request_vote,
45                queue,
46            )
47            th = Thread(
48                target=partial_fn,
49                args=(
50                    term,
51                    str(member["member_id"]),
52                    str(member["member_address"]),
53                ),
54            )
55            th.start()
56
57        return queue
58
59    def run_candidate(self):
60        term = self.raft_state.current_term + 1
61        self.set_current_term(term)
62        votes = self.start_election(term)
63        votesInFavor, votesNeeded = 1, len(self.members) // 2
64        election_timeout = create_random_timeout(self.election_timeout)
65
66        while self.raft_state.state == RaftStates.CANDIDATE:
67            if not self.received_rpcs.empty():
68                request_rpc = self.received_rpcs.get()
69                self.process_rpc(request_rpc)
70
71            if not votes.empty():
72                vote = votes.get()
73                if vote["term"] > term:
74                    self.set_current_term(vote["term"])
75                    self.set_state(RaftStates.FOLLOWER)
76                    return
77
78                if vote["granted"]:
79                    votesInFavor += 1
80
81                if votesInFavor > votesNeeded:
82                    self.set_leader(self.local_id, self.local_address)
83                    self.set_state(RaftStates.LEADER)
84                    return
85
86            if election_timeout.is_set():
87                print("Election timedout reached, restarting election")
88                return

To handle this RequestVote RPC call from the Candidate, Let’s add a handler.

 1class RaftRPC(str, Enum):
 2    REQUEST_VOTE = "RequestVote"
 3    APPEND_ENTRIES = "AppendEntries"
 4
 5@dataclass
 6class RaftState:
 7    current_term: int
 8    last_voted_term: Optional[int] = None
 9    votes: Dict[int, str] = field(default_factory=lambda: {})
10    state: RaftStates = RaftStates.FOLLOWER
11
12@dataclass
13class Raft:
14    # ... other members and methods
15    def get_voted_for_candidate(self, term: int) -> Optional[str]:
16        return self.raft_state.votes.get(term)
17
18    def set_voted_for_candidate(self, term: int, candidate: str) -> None:
19        if self.raft_state.last_voted_term is None:
20            self.raft_state.last_voted_term = term
21        else:
22            if term > self.raft_state.last_voted_term:
23                self.raft_state.last_voted_term = term
24
25        if term in self.raft_state.votes:
26            raise Exception(f"Duplicate candidate for term [{term}]")
27
28        self.raft_state.votes[term] = candidate
29
30    def process_rpc(self, rpc):
31        try:
32            rpc_identity = rpc["identity"]
33            rpc_msg = rpc["msg"]
34            kind = rpc_msg.pop("kind")
35            if kind == RaftRPC.APPEND_ENTRIES:
36                self.process_rpc_append_entries(rpc_identity, rpc_msg)
37            elif kind == RaftRPC.REQUEST_VOTE:
38                self.process_rpc_vote_for(rpc_identity, rpc_msg)
39            else:
40                self.server.reply(rpc_identity, None)
41        except:
42            traceback.print_exc()
43
44    def process_rpc_vote_for(self, rpc_identity, rpc):
45        candidate_id = rpc["member_id"]
46        candidate_term = rpc["term"]
47        leader_id = self.leader_id
48        current_term = self.raft_state.current_term
49
50        resp = {
51            "member_id": self.local_id,
52            "member_address": self.local_address,
53            "term": current_term,
54            "granted": True,
55        }
56
57        if leader_id != None and leader_id != candidate_id:
58            resp["granted"] = False
59            print(f"We already have a leader: {self.leader_id}")
60
61        elif candidate_term < current_term:
62            resp["granted"] = False
63
64        else:
65            if candidate_term > current_term:
66                self.set_state(RaftStates.FOLLOWER)
67                self.set_current_term(candidate_term)
68                resp["term"] = candidate_term
69
70            last_voted_term = self.raft_state.last_voted_term
71
72            if last_voted_term != None and last_voted_term == candidate_term:
73                last_voted_for_candidate = self.get_voted_for_candidate(last_voted_term)
74                if candidate_id != last_voted_for_candidate:
75                    print(
76                        f"Rejecting vote from {candidate_id}. "
77                        f"We already voted for {last_voted_for_candidate} in term {candidate_term}"
78                    )
79                    resp["granted"] = False
80                else:
81                    print(
82                        f"Received deplicate vote from {candidate_id} in term {candidate_term}"
83                    )
84
85        if resp["granted"]:
86            print(f"Candidate {candidate_id} is our leader for term {candidate_term}")
87            self.set_voted_for_candidate(candidate_term, candidate_id)
88            self.set_last_contact()
89
90        self.server.reply(rpc_identity, resp)

Leader

Going on ahead, once the Candidate node transitions to the Leader, it’s responsible to send out heartbeats in form with additional replication info. Let’s implement the Leader loop now.

 1class Raft:
 2    # ... other members and methods
 3    def replicate(self, step_down: Event, member_id: int, member_address: str):
 4        host, port = member_address.split(":")
 5        client = TCPClient(host, int(port))
 6        while not step_down.is_set():
 7            req = {
 8                "kind": RaftRPC.APPEND_ENTRIES,
 9                "member_id": self.local_id,
10                "member_address": self.local_address,
11                "term": self.raft_state.current_term,
12            }
13            resp = client.send(req, timeout=100)
14            if resp is None:
15                sleep(0.125)
16            else:
17                if resp["term"] > req["term"]:
18                    step_down.set()
19                    break
20        client.close()
21
22    def start_replication(self, step_down: Event):
23        threads = []
24        print(f"Starting Replicating for term {self.raft_state.current_term}")
25        for member in self.members:
26            th = Thread(
27                target=self.replicate,
28                args=(step_down, member["member_id"], member["member_address"]),
29            )
30            threads.append(th)
31        for th in threads:
32            th.start()
33
34    def run_leader(self):
35        step_down = Event()
36        self.start_replication(step_down)
37
38        while self.raft_state.state == RaftStates.LEADER:
39            if not self.received_rpcs.empty():
40                request_rpc = self.received_rpcs.get()
41                self.process_rpc(request_rpc)
42
43            if step_down.is_set():
44                self.set_state(RaftStates.FOLLOWER)

Tying up everything

We include the start method to start the Raft instance

 1class Raft:
 2    # ... other 
 3    def start(self):
 4        print("Starting Raft")
 5        print(self)
 6        self.server.listen()
 7        self.set_state(RaftStates.FOLLOWER)
 8        self.set_current_term(0)
 9        with futures.ThreadPoolExecutor(max_workers=10) as exec:
10            exec.submit(self.run)
11            exec.submit(self.run_store)

With this, we have tackled the first promise of a Raft implementation, Leader Election. We’ll tackle Log Replication and State Safety in the future. We’ll also write our state machine implementation and expose it over HTTP API later on in this series. We still haven’t talked about membership and discovery.

References

NOTE

This won’t serve as a production-ready Raft implementation and I am not trying to make it so.