MIT 6.5840 Distributed Systems
MIT 6.5840 (formerly 6.824) is the gold-standard graduate course on distributed systems, taught by Prof. Robert Morris. Every lecture pairs theory from a seminal paper with implementation work in the labs. The 5-lab sequence builds a complete sharded, fault-tolerant key-value database from scratch.
Why do it in Rust? The official labs are Go-only. But since the lab specs are language-agnostic, you can implement them in Rust — which gives you Rust's async/ownership model on top of distributed systems concepts. Every major storage system (TiKV, FoundationDB, CockroachDB) uses these exact ideas.
Time Estimates per Lab
The 5 Labs
What You Need Before Starting
You do not need prior distributed systems experience. But you do need solid programming fundamentals and comfort with Rust's core model. Here is exactly what to learn first if you are missing any of these.
Rust Prerequisites
You will be writing concurrent, async Rust from Lab 1 onward. Make sure you are comfortable with these concepts before starting:
| Concept | Why You Need It | Where to Learn |
|---|---|---|
| Ownership & borrowing | Every shared data structure uses Arc<Mutex<T>> | Rust Book Ch 4 |
| Enums & pattern matching | RPC messages, task types, and state machines are all enums | Rust Book Ch 6 |
| Traits & trait objects | You define Persister, MrApp, and other interfaces as traits | Rust Book Ch 10 |
Smart pointers (Arc, Box) | All shared state across tasks uses Arc | Rust Book Ch 15 |
Concurrency (Mutex, threads) | Every lab is inherently concurrent | Rust Book Ch 16 |
async/await & Tokio basics | All network I/O and timers use Tokio | Tokio Tutorial |
| Serde serialization | Every RPC argument and log entry is serialized with bincode/serde | serde.rs |
Not sure if you're ready? Try this test: write a Tokio program with two tasks sharing an Arc<Mutex<HashMap<String, String>>>, where one task inserts values and the other reads them with a 100ms delay. If you can do that comfortably, you are ready for Lab 1.
CS Prerequisites
- Basic networking — understand that clients send requests, servers send responses, and messages can be lost or delayed. You do not need socket programming experience; the fake RPC layer handles transport.
- Threads vs processes — understand that threads share memory and processes do not. Raft peers are separate processes communicating over the network.
- Hash maps, sorting, file I/O — Lab 1 (MapReduce) relies heavily on all three.
Recommended (Not Required) Pre-Reading
- Designing Data-Intensive Applications by Martin Kleppmann — Chapters 5 (Replication), 6 (Partitioning), 7 (Transactions), and 9 (Consistency & Consensus) cover the exact concepts in these labs. The single best distributed systems book for practitioners.
- Aphyr's Distributed Systems Class Outline — a concise reading list of core topics with links.
Distributed Systems Fundamentals
Before touching any code, internalize these ideas. Every lecture and every lab traces back to them. If you get stuck later, come back here.
Why Is Distributed Systems Hard?
Three fundamental problems make distributed systems different from single-machine programming:
| Problem | What It Means | Lab Impact |
|---|---|---|
| Unreliable network | Messages can be lost, duplicated, delayed, or arrive out of order. There is no way to distinguish a slow server from a dead one. | Labs 1–5: all RPCs can fail. You must design for retries and idempotency everywhere. |
| Partial failure | Some nodes crash while others keep running. A crashed node might restart with old state. | Lab 3C: Raft must persist state so a restarted node recovers. Lab 4: KV service survives minority failures. |
| No global clock | Nodes cannot agree on "what time is it." You cannot order events by wall-clock timestamps. | Lab 3: Raft uses terms (logical clocks) instead of timestamps to order events and detect stale leaders. |
Failure Models
Distributed systems papers define how nodes can fail. The labs use the crash-recovery model:
- Crash-stop — a node crashes and never comes back. Simple but unrealistic.
- Crash-recovery — a node crashes but can restart and rejoin with its persisted state. This is the model Raft uses. Labs 3C and 3D test this extensively.
- Byzantine — a node can lie, send conflicting messages, or act maliciously. Raft does NOT handle this. Bitcoin (Lecture 19) does.
The CAP Theorem (One Paragraph)
During a network partition, a distributed system must choose between Consistency (every read returns the latest write) and Availability (every request gets a response). Raft chooses consistency: if a leader cannot reach a majority, it stops accepting writes rather than risk divergent state. Dynamo (Lecture 17) chooses availability: it accepts conflicting writes and reconciles them later. Understanding this trade-off is the single most important intuition in distributed systems.
Consistency Models — The Ladder
From weakest to strongest. The labs require linearizability, the strongest practical guarantee:
| Model | Guarantee | Used By |
|---|---|---|
| Eventual consistency | All replicas converge "eventually" — you might read stale data | Dynamo, DNS, Cassandra |
| Causal consistency | Causally related operations are seen in order; concurrent operations may differ | COPS (Lecture 17) |
| Sequential consistency | All operations appear in some global order consistent with each client's program order | ZooKeeper writes |
| Linearizability | Every operation appears to execute atomically at some point between its invocation and response. The strongest guarantee. | Labs 2–5, Raft-based KV stores, Spanner |
Linearizability in plain English: If client A's Put("x","1") returns before client B calls Get("x"), then B must see "1". Operations cannot appear to go backwards in time. Your KV servers in Labs 2, 4, and 5 must satisfy this.
The Replicated State Machine — The Core Idea
This single concept ties Labs 3, 4, and 5 together. Memorize it:
Client Leader Follower A Follower B
| | | |
|--- Put(x,1) -->| | |
| |-- AppendEntries -->| |
| |-- AppendEntries ---|----------> |
| | | |
| |<--- ACK -----------| |
| |<--- ACK --------------------------------|
| | | |
| | (majority replied — entry is committed) |
| | | |
| | Apply: data["x"] = "1" |
| | data["x"] = "1" |
| | data["x"] = "1" |
|<-- OK ---------| | |
Key insight: If every node starts with the same initial state and applies the same commands in the same order, they all end up in the same state. Raft's job is to make all nodes agree on that order. The KV server (Lab 4) is the state machine. Raft (Lab 3) is the log that ensures identical ordering.
Key Terms Glossary
| Term | Definition |
|---|---|
term | A logical clock in Raft. Each election increments the term. A term has at most one leader. Stale terms are rejected. |
commit_index | The highest log entry known to be replicated on a majority. Safe to apply to the state machine. |
last_applied | The highest log entry already applied to the state machine. Always <= commit_index. |
leader / follower / candidate | The three Raft node roles. Followers listen. Candidates run elections. The leader coordinates everything. |
quorum / majority | More than half the nodes. In a 5-node cluster, quorum = 3. All Raft decisions require a quorum. |
idempotent | An operation that produces the same result whether applied once or multiple times. Essential for retry safety. |
exactly-once semantics | Each client operation is applied exactly once, even if the RPC is retried. Implemented via client ID + sequence number deduplication. |
next_index[i] | Leader-only: the next log index to send to peer i. Initialized to leader's last log index + 1 on election. |
match_index[i] | Leader-only: the highest log index known to be replicated on peer i. Used to advance commit_index. |
Recommended 12-Week Study Plan
This schedule assumes 10–15 hours per week. Adjust based on your Rust experience. The key rule: watch lectures and read the paper before starting the corresponding lab.
Lab 3 (Raft) will take longer than you think. Plan for 3–4 weeks even if you are experienced. Most bugs are intermittent timing issues that only appear after hundreds of test runs. Budget extra time.
Where to Actually Start
Here is exactly what to do on Day 1, step by step. Most people get confused because the official repo is Go-only. You'll use it for the lab specifications only, and write everything in Rust.
Go to pdos.csail.mit.edu/6.824. This is the live course page with lab specs, lecture notes, and papers. Click "Schedule" and read it top to bottom to understand the structure.
You won't run Go code, but the repo contains lab spec HTML pages and test expectations you must match.
git clone git://g.csail.mit.edu/6.5840-golabs-2026 mit6824-spec
cd mit6824-spec/src
ls
# mr/ → Lab 1 skeleton + test scripts
# kvsrv/ → Lab 2 skeleton
# raft/ → Lab 3 skeleton (raft.go, test_test.go)
# kvraft/ → Lab 4 skeleton
# shardkv/ → Lab 5 skeleton
# shardctrler/ → Lab 5 shard controller
Read the *_test.go files — they describe exactly what your Rust code must do. Also read the lab HTML pages at pdos.csail.mit.edu/6.824/labs/lab-mr.html etc.
cargo new --lib mit6824-rust
cd mit6824-rust
# Create all crates
cargo new --lib common
cargo new --lib lab1-mapreduce
cargo new --lib lab2-kvsrv
cargo new --lib lab3-raft
cargo new --lib lab4-kvraft
cargo new --lib lab5-shardkv
The 2020 Robert Morris lectures on YouTube are the best. Watch Lec 1 (Introduction + MapReduce) and read the MapReduce paper before starting Lab 1. The lab will make much more sense.
For each lab: (1) read the official HTML lab page fully, (2) read the Go skeleton code to understand what interfaces to implement, (3) write your Rust equivalent, (4) write tests that mirror the Go test file. Repeat until tests pass 100+ times in a row.
Complete YouTube Lecture Map
All 20 lectures are free on YouTube from the 2020 Spring semester (Robert Morris). This is the most complete and polished version. Watch lectures before the corresponding lab, not after. Each lecture is tagged with its lab relevance.
Main playlist (2020): youtube.com/playlist?list=PLrw6a1wE39_tb2fErI4-WkMbsvGQk9_UB — 20 lectures, ~1.5 hours each.
Lab numbering note: The 2020 course had a different lab numbering (Lab 2 = Raft). The modern course (2024–2026) uses Lab 3 = Raft. This guide uses the modern numbering throughout. The lecture content is identical.
Required for Lab 1 (MapReduce)
Context for Replication (Watch Before Labs 2–3)
Required for Lab 3 (Raft) — Most Important Lectures
Essential bonus: Also watch Diego Ongaro's 2014 Raft talk — the Raft author explains the subtle parts (election restriction, commit safety) far better than any blog post. Watch it before starting Lab 3.
Required for Lab 4 (Fault-Tolerant KV)
Required for Lab 5 (Sharded KV)
Enrichment (Watch After Labs or During Week 12)
Project Setup & Workspace Layout
# Cargo.toml (workspace root)
[workspace]
members = [
"common", # shared types: ApplyMsg, RPC, Persister
"lab1-mapreduce",
"lab2-kvsrv",
"lab3-raft",
"lab4-kvraft",
"lab5-shardkv",
]
resolver = "2"
[workspace.dependencies]
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
bincode = "1"
rand = "0.8"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
The common Crate — What Goes in It
This is your shared foundation used by every other lab. It replaces Go's labrpc and labgob packages.
// common/src/lib.rs
pub mod labrpc; // fake network for testing
pub mod persister; // in-memory or file-backed durable storage
pub mod apply_msg; // ApplyMsg sent from Raft to KV state machine
pub mod errors; // KvError, RaftError enums
ApplyMsg — The Raft → App Channel
This is how Raft tells the application layer that a log entry has been committed or a snapshot has arrived. Every lab from 3 onward uses this type.
// common/src/apply_msg.rs
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone)]
pub enum ApplyMsg {
/// A committed log entry. The KV server applies this to its state machine.
Command {
valid: bool,
command: Vec<u8>, // bincode-encoded Op
command_index: u64,
command_term: u64,
},
/// A snapshot installed by Raft (from InstallSnapshot RPC).
Snapshot {
valid: bool,
snapshot: Vec<u8>, // raw snapshot bytes
snapshot_term: u64,
snapshot_index: u64,
},
}
Persister — Durable State
Raft needs to save current_term, voted_for, and log to stable storage. The Persister trait abstracts over an in-memory store (for tests) or a real file (for production).
// common/src/persister.rs
use std::sync::{Arc, Mutex};
pub trait Persister: Send + Sync {
fn read_raft_state(&self) -> Vec<u8>;
fn save_raft_state(&self, state: Vec<u8>);
fn read_snapshot(&self) -> Vec<u8>;
fn save_state_and_snapshot(&self, state: Vec<u8>, snapshot: Vec<u8>);
fn raft_state_size(&self) -> usize;
}
/// In-memory persister for tests
#[derive(Default)]
pub struct MemPersister {
inner: Mutex<MemPersisterInner>,
}
#[derive(Default)]
struct MemPersisterInner {
raft_state: Vec<u8>,
snapshot: Vec<u8>,
}
impl Persister for MemPersister {
fn read_raft_state(&self) -> Vec<u8> {
self.inner.lock().unwrap().raft_state.clone()
}
fn save_raft_state(&self, state: Vec<u8>) {
self.inner.lock().unwrap().raft_state = state;
}
fn read_snapshot(&self) -> Vec<u8> {
self.inner.lock().unwrap().snapshot.clone()
}
fn save_state_and_snapshot(&self, state: Vec<u8>, snapshot: Vec<u8>) {
let mut inner = self.inner.lock().unwrap();
inner.raft_state = state;
inner.snapshot = snapshot;
}
fn raft_state_size(&self) -> usize {
self.inner.lock().unwrap().raft_state.len()
}
}
Building the Fake Network (labrpc Replacement)
The Go labs use a custom labrpc package that can drop messages, delay them, and simulate partitions. You need to build this in Rust. This is the most important piece of test infrastructure.
// common/src/labrpc.rs
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, oneshot};
use tokio::time::Duration;
use rand::Rng;
/// One RPC in-flight
pub struct Envelope {
pub method: String,
pub args: Vec<u8>,
pub reply_tx: oneshot::Sender<Option<Vec<u8>>>,
}
struct NetworkConfig {
reliable: bool,
long_delays: bool,
connected: Vec<bool>, // per-peer connectivity
}
pub struct Network {
config: Arc<Mutex<NetworkConfig>>,
endpoints: Arc<Mutex<HashMap<usize, mpsc::Sender<Envelope>>>>,
}
impl Network {
pub fn new(n: usize) -> Self {
Network {
config: Arc::new(Mutex::new(NetworkConfig {
reliable: true,
long_delays: false,
connected: vec![true; n],
})),
endpoints: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn set_reliable(&self, v: bool) { self.config.lock().unwrap().reliable = v; }
pub fn disconnect(&self, peer: usize) { self.config.lock().unwrap().connected[peer] = false; }
pub fn connect(&self, peer: usize) { self.config.lock().unwrap().connected[peer] = true; }
/// Call an RPC, respecting network config (drops, delays)
pub async fn call(&self, to: usize, method: &str, args: Vec<u8>) -> Option<Vec<u8>> {
let (reliable, connected, long_delays) = {
let c = self.config.lock().unwrap();
(c.reliable, c.connected[to], c.long_delays)
};
if !connected { return None; }
// Simulate unreliable network: ~10% drop
if !reliable && rand::thread_rng().gen_bool(0.1) { return None; }
// Random short delay on unreliable network
if !reliable {
let ms = rand::thread_rng().gen_range(0..27);
tokio::time::sleep(Duration::from_millis(ms)).await;
}
// Long delay scenario (simulates slow network)
if long_delays && rand::thread_rng().gen_bool(0.1) {
tokio::time::sleep(Duration::from_millis(500)).await;
}
let tx_opt = self.endpoints.lock().unwrap().get(&to).cloned();
if let Some(tx) = tx_opt {
let (reply_tx, reply_rx) = oneshot::channel();
let env = Envelope { method: method.to_string(), args, reply_tx };
let _ = tx.send(env).await;
reply_rx.await.ok().flatten()
} else { None }
}
}
Two RPC strategies: Use the channel-based fake network above for all labs (easiest, fastest tests). Alternatively, use tonic/gRPC over real TCP for production realism. Channel-based is strongly recommended — it lets you simulate partitions and drops in milliseconds without OS-level tricks.
Lab 1: MapReduce
Read before starting: MapReduce paper (2004) · Watch Lectures 1–2 · Read lab-mr.html
Mental Model
Imagine you need to count every word in a library of 10,000 books. One person would take forever. Instead, you hire a coordinator (head librarian) and 10 workers (assistants).
- Map phase: The coordinator gives each worker a book. Each worker produces a list of
(word, 1)pairs, sorted into buckets by first letter. - Reduce phase: The coordinator assigns each bucket to a worker. Each worker sums up all the counts for each word in their bucket.
- Fault tolerance: If a worker disappears for 10 seconds, the coordinator re-assigns their task to someone else. Workers must write results atomically (temp file then rename) so partial output is never visible.
That is the entire lab. The coordinator is a state machine tracking task status. Workers are a loop: ask for a task, do it, report back, repeat.
Architecture
One Coordinator process manages task state. N Worker processes run in parallel, polling the coordinator for tasks. In real systems workers run on different machines; here they're different processes on the same machine talking via Unix socket or TCP.
// Task types
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum Task {
Map { id: usize, filename: String, n_reduce: usize },
Reduce { id: usize, n_map: usize },
Wait, // All tasks in-progress, try again soon
Done, // All tasks complete, worker may exit
}
// Coordinator task tracking
#[derive(Debug, Clone, PartialEq)]
enum TaskStatus {
Idle,
InProgress { assigned_at: std::time::Instant },
Completed,
}
pub struct Coordinator {
n_reduce: usize,
map_tasks: Vec<TaskStatus>,
reduce_tasks: Vec<TaskStatus>,
input_files: Vec<String>,
phase: Phase,
}
#[derive(PartialEq)]
enum Phase { Map, Reduce, Done }
request_task() on the CoordinatorScan map tasks first. Assign idle ones or re-assign InProgress ones older than 10 seconds. If all map tasks are Done, transition to Reduce phase. Return Task::Wait if all are InProgress but not done yet.
Read the input file, call the user's map_fn(filename, contents) → Vec<(String, String)>. Partition key-value pairs into N buckets using hash(key) % n_reduce. Write mr-{map_id}-{bucket} files as JSON lines. Use atomic rename (write to temp file, then rename).
Read all mr-*-{reduce_id} files. Merge, sort by key, call reduce_fn(key, values) → String for each unique key. Write final output to mr-out-{reduce_id}. Also atomic rename.
Define a trait for user-provided map/reduce functions. Implement a WordCount app for testing.
// User-provided map/reduce functions as a trait object
pub trait MrApp: Send + Sync {
fn map(&self, filename: &str, contents: &str) -> Vec<(String, String)>;
fn reduce(&self, key: &str, values: &[String]) -> String;
}
// Word count implementation
pub struct WordCount;
impl MrApp for WordCount {
fn map(&self, _: &str, contents: &str) -> Vec<(String, String)> {
contents.split_whitespace()
.map(|w| (w.to_lowercase(), "1".to_string()))
.collect()
}
fn reduce(&self, _: &str, values: &[String]) -> String {
values.len().to_string()
}
}
// Hash key to reduce bucket
fn ihash(key: &str) -> usize {
use std::hash::{Hash, Hasher, DefaultHasher};
let mut h = DefaultHasher::new();
key.hash(&mut h);
(0x7fff_ffff & h.finish()) as usize
}
You're done with Lab 1 when: Your word-count, indexer, map-parallelism, reduce-parallelism, job-count, early-exit, and crash recovery tests all pass. Run them 50+ times in a row — MapReduce bugs are usually deterministic, so fewer iterations suffice here.
Lab 2: Single-Machine KV Server
Read before starting: Watch Lectures 3–4 · Read lab-kvsrv1.html
This lab teaches exactly-once semantics over an unreliable network without replication. It's the foundation for Lab 4's deduplication logic.
Mental Model
You order food via a delivery app. You tap "Place Order." The screen spins... and times out. Did the order go through? You tap again. Now you might have two orders. This is the exactly-once problem over an unreliable network.
The solution: every order has a unique ID. The restaurant checks: "Did I already process order #42?" If yes, return the cached result without doing it again. If no, process it and remember that you did.
Client Server
| |
|--- Put(x,1) [seq=5] -------->| Server applies Put(x,1)
| | Server caches: client_42, seq=5, reply="ok"
| (reply lost!) |
| |
|--- Put(x,1) [seq=5] -------->| Server sees: seq=5 already processed!
| | Returns cached "ok" (does NOT re-apply)
|<-------- "ok" ----------------|
| |
Key invariant: The client increments its sequence number only after receiving a successful reply. The server stores the last-processed sequence number per client. This pattern is reused in Labs 4 and 5.
The Problem
Client sends Put("x","1"). Network drops the reply. Client retries. Without deduplication, the value is set twice (or an Append runs twice). The server must be idempotent: applying the same operation twice must produce the same result as applying it once.
use std::collections::HashMap;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum Op {
Get { key: String },
Put { key: String, value: String },
Append { key: String, value: String },
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Request {
pub client_id: u64,
pub seq_num: u64, // monotonically increasing per client
pub op: Op,
}
pub struct KvServer {
data: HashMap<String, String>,
/// client_id → (last_processed_seq, cached_reply)
dedup: HashMap<u64, (u64, String)>,
}
impl KvServer {
pub fn apply(&mut self, req: Request) -> String {
// Check dedup table
if let Some((last_seq, reply)) = self.dedup.get(&req.client_id) {
if req.seq_num == *last_seq { return reply.clone(); }
if req.seq_num < *last_seq { panic!("seq went backwards"); }
}
let result = match &req.op {
Op::Get { key } => self.data.get(key).cloned().unwrap_or_default(),
Op::Put { key, value } => { self.data.insert(key.clone(), value.clone()); String::new() },
Op::Append { key, value } => {
let old = self.data.entry(key.clone()).or_default();
old.push_str(value);
old.clone() // return value after append
}
};
self.dedup.insert(req.client_id, (req.seq_num, result.clone()));
result
}
}
// Client-side: increment seq_num only after receiving reply
pub struct Clerk {
client_id: u64,
seq_num: u64,
server_addr: String,
}
impl Clerk {
pub async fn put(&mut self, key: String, value: String) {
loop { // retry until we get a reply
let req = Request { client_id: self.client_id, seq_num: self.seq_num,
op: Op::Put { key: key.clone(), value: value.clone() } };
if let Ok(_) = self.rpc_call(req).await {
self.seq_num += 1; // only advance after successful reply
return;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
}
You're done with Lab 2 when: Concurrent Put/Append/Get operations with an unreliable network pass 100+ times. Verify that no duplicate applies occur — add a counter in your test that detects if an Append was applied twice.
Lab 3: Raft Consensus Algorithm
Read before starting: Raft extended paper (Ongaro 2014) — read Figure 2 as your implementation spec. Watch Lectures 6–7. Read the Students' Guide to Raft. Re-read Figure 2. Implement it exactly.
Mental Model
Raft is a protocol for getting N servers to agree on the same sequence of commands, even when some servers crash and restart. It works like a meeting with a chairperson:
- One leader proposes all decisions. Everyone else (followers) just listens and records.
- If the leader disappears, the group holds an election to pick a new one. Whoever has the most complete notes wins.
- A decision is committed only when a majority has recorded it. Even if some servers crash, the committed decisions survive.
- Each meeting has a term number. If someone shows up claiming to be leader from an old term, they are ignored.
times out, receives majority
starts election of votes
┌───────────────────┐ ┌─────────────────────┐
│ v │ v
┌───────────┐ ┌────────────────┐ ┌────────────────┐
──> │ FOLLOWER │ │ CANDIDATE │ │ LEADER │
└───────────┘ └────────────────┘ └────────────────┘
^ │ │
│ discovers │ discovers │
│ higher term │ higher term │
└───────────────────┘──────────────────────┘
The four sub-labs map to four capabilities: 3A = elections (picking a leader), 3B = log replication (the leader distributing decisions), 3C = persistence (surviving crashes), 3D = log compaction (not running out of memory).
This is the hardest lab. Most bugs are off-by-one log index errors or wrong term comparisons. Plan for 3–4 weeks. Run every test 500+ times before declaring it done. Raft bugs are intermittent — passing once means nothing.
Core State Structure
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
#[derive(Debug, Clone, PartialEq)]
pub enum Role { Follower, Candidate, Leader }
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct LogEntry {
pub term: u64,
pub command: Vec<u8>,
}
pub struct RaftState {
// ── Persistent (must survive crash) ──────────────────
pub current_term: u64,
pub voted_for: Option<usize>,
pub log: Vec<LogEntry>, // index 0 is dummy (term=0)
// ── Volatile ─────────────────────────────────────────
pub commit_index: u64,
pub last_applied: u64,
pub role: Role,
// ── Leader only (reset on election) ──────────────────
pub next_index: Vec<u64>, // next log index to send to each peer
pub match_index: Vec<u64>, // highest log index known replicated on peer
// ── Snapshot state (Lab 3D) ───────────────────────────
pub last_snap_index: u64,
pub last_snap_term: u64,
}
pub struct Raft {
me: usize,
n_peers: usize,
state: Arc<Mutex<RaftState>>,
persister: Arc<dyn Persister>,
apply_tx: mpsc::Sender<ApplyMsg>,
network: Arc<Network>,
}
Log Indexing with Snapshots — The #1 Source of Off-by-One Bugs
Before Lab 3D (snapshots), the log Vec index equals the Raft log index (with a dummy at position 0). After snapshots, the start of the Vec no longer corresponds to log index 0. You have discarded the first N entries. This means every log access must account for the offset:
Raft log indices: 1 2 3 4 5 6 7 8 9 10
[a] [b] [c] [d] [e] [f] [g] [h] [i] [j]
└── snapshotted ──┘ └── in self.log ──┘
last_snap_index = 5
self.log = [f, g, h, i, j] (Vec indices 0..4)
To convert Raft index → Vec index:
vec_idx = raft_index - last_snap_index - 1
To convert Vec index → Raft index:
raft_index = vec_idx + last_snap_index + 1
Last raft index = self.log.len() as u64 + last_snap_index
Apply this conversion everywhere you access self.log[...]. The three most common bugs:
- Forgetting to subtract
last_snap_indexwhen indexing into the log Vec - Forgetting to add
last_snap_indexwhen computing the last log index - Accessing
self.log[0]and expecting it to be log index 1 (after a snapshot, it is log indexlast_snap_index + 1)
Lab 3A: Leader Election
term— candidate's current termcandidate_id— candidate requesting votelast_log_index— index of candidate's last log entrylast_log_term— term of candidate's last log entry
- Reply
falseifterm < currentTerm - Grant vote if
votedForis null orcandidateId, AND candidate's log is at least as up-to-date as receiver's log - "At least as up-to-date": compare last log terms first; if equal, compare length (§5.4.1)
- Set
currentTerm = term, convert to follower
// RequestVote RPC types
#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub struct RequestVoteArgs {
pub term: u64,
pub candidate_id: usize,
pub last_log_index: u64,
pub last_log_term: u64,
}
#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub struct RequestVoteReply {
pub term: u64,
pub vote_granted: bool,
}
impl Raft {
// Handler called by receiver
pub fn handle_request_vote(&self, args: RequestVoteArgs) -> RequestVoteReply {
let mut state = self.state.lock().unwrap();
// If higher term seen, update and convert to follower
if args.term > state.current_term {
state.current_term = args.term;
state.voted_for = None;
state.role = Role::Follower;
}
if args.term < state.current_term {
return RequestVoteReply { term: state.current_term, vote_granted: false };
}
// Check if we already voted for someone else this term
let can_vote = state.voted_for.is_none()
|| state.voted_for == Some(args.candidate_id);
// Check candidate log is at least as up-to-date
let my_last_term = state.log.last().map_or(state.last_snap_term, |e| e.term);
let my_last_index = state.log.len() as u64 + state.last_snap_index;
let up_to_date = args.last_log_term > my_last_term
|| (args.last_log_term == my_last_term && args.last_log_index >= my_last_index);
if can_vote && up_to_date {
state.voted_for = Some(args.candidate_id);
self.persist(&state); // MUST persist before replying
self.election_timer.reset(); // reset timer on granting vote
RequestVoteReply { term: state.current_term, vote_granted: true }
} else {
RequestVoteReply { term: state.current_term, vote_granted: false }
}
}
// Kick off election (called when election timer fires)
pub async fn start_election(raft: Arc<Mutex<RaftState>>, me: usize, network: Arc<Network>, n: usize) {
let (term, args) = {
let mut s = raft.lock().unwrap();
s.current_term += 1;
s.role = Role::Candidate;
s.voted_for = Some(me);
// persist() here
(s.current_term, RequestVoteArgs {
term: s.current_term, candidate_id: me,
last_log_index: s.log.len() as u64 + s.last_snap_index,
last_log_term: s.log.last().map_or(s.last_snap_term, |e| e.term),
})
}; // drop lock here — never hold across await
let mut votes = 1usize; // vote for self
let mut set = tokio::task::JoinSet::new();
for peer in 0..n {
if peer == me { continue; }
let (net, a) = (network.clone(), args.clone());
set.spawn(async move {
let bytes = bincode::serialize(&a).unwrap();
net.call(peer, "RequestVote", bytes).await
});
}
while let Some(Ok(Some(bytes))) = set.join_next().await {
let reply: RequestVoteReply = bincode::deserialize(&bytes).unwrap();
let mut s = raft.lock().unwrap();
if reply.term > s.current_term {
s.current_term = reply.term; s.role = Role::Follower; return;
}
if s.role != Role::Candidate || s.current_term != term { return; }
if reply.vote_granted { votes += 1; }
if votes > n / 2 { // majority!
s.role = Role::Leader;
// Initialize leader state, send heartbeats
break;
}
}
}
}
Leader Heartbeat Loop (Critical — Must Implement)
The leader must send periodic empty AppendEntries (heartbeats) to all peers to prevent them from starting elections. Without this, followers time out and the cluster falls apart. This runs as a background task for the entire time a node is leader.
async fn heartbeat_loop(
raft: Arc<Mutex<RaftState>>,
me: usize,
n_peers: usize,
network: Arc<Network>,
) {
let mut interval = tokio::time::interval(
Duration::from_millis(100) // must be < election timeout (150–300ms)
);
loop {
interval.tick().await;
let (term, role, entries_per_peer) = {
let s = raft.lock().unwrap();
if s.role != Role::Leader { return; }
// Build AppendEntries args for each peer
let mut per_peer = Vec::new();
for peer in 0..n_peers {
if peer == me { continue; }
let next = s.next_index[peer];
let prev_idx = next - 1;
let prev_term = if prev_idx == s.last_snap_index {
s.last_snap_term
} else if prev_idx > s.last_snap_index {
s.log[(prev_idx - s.last_snap_index - 1) as usize].term
} else { 0 };
// Include new entries if any
let start = (next - s.last_snap_index - 1) as usize;
let entries = s.log[start..min(s.log.len(), start + 100)].to_vec();
per_peer.push((peer, AppendEntriesArgs {
term: s.current_term, leader_id: me,
prev_log_index: prev_idx, prev_log_term: prev_term,
entries, leader_commit: s.commit_index,
}));
}
(s.current_term, s.role.clone(), per_peer)
}; // lock dropped — never hold across await
for (peer, args) in entries_per_peer {
let (net, raft2) = (network.clone(), raft.clone());
tokio::spawn(async move {
let bytes = bincode::serialize(&args).unwrap();
if let Some(reply_bytes) = net.call(peer, "AppendEntries", bytes).await {
let reply: AppendEntriesReply = bincode::deserialize(&reply_bytes).unwrap();
// Handle reply: update next_index/match_index, advance commit
// (see handle_ae_failure and maybe_advance_commit in 3B)
}
});
}
}
}
You're done with Lab 3A when: test_initial_election, test_reelection, and test_many_elections pass 500+ times. Leaders should be elected within a few seconds. If elections take more than 5s, your timer randomization range is wrong.
Lab 3B: Log Replication
term,leader_id,leader_commitprev_log_index— index of log entry immediately preceding new onesprev_log_term— term of prev_log_index entryentries[]— log entries to store (empty = heartbeat)
- Reply false if
term < currentTerm - Reply false if log doesn't contain entry at
prev_log_indexwithprev_log_term - If existing entry conflicts (same index, different term), delete it and all following entries
- Append any new entries not already in the log
- If
leaderCommit > commitIndex, setcommitIndex = min(leaderCommit, last new entry index)
Fast Backup Optimization (Critical for 3B)
When a follower rejects AppendEntries, the naive approach decrements next_index[i] by 1 each time — O(N) round trips. The paper's optimization (§5.3 bottom of page 7) allows backing up by an entire term at once:
// Reply includes extra info for fast backup
#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub struct AppendEntriesReply {
pub term: u64,
pub success: bool,
// Fast backup fields (only meaningful on failure)
pub conflict_term: Option<u64>, // term in the conflicting entry
pub conflict_index: u64, // first index in conflict_term
}
// Leader: on AppendEntries rejection, adjust next_index
fn handle_ae_failure(state: &mut RaftState, peer: usize, reply: &AppendEntriesReply) {
match reply.conflict_term {
None => {
// Follower log is too short — jump to its last entry
state.next_index[peer] = reply.conflict_index + 1;
}
Some(ct) => {
// Find last entry in leader log with conflict_term
let last_in_ct = state.log.iter().rposition(|e| e.term == ct);
match last_in_ct {
Some(idx) => state.next_index[peer] = idx as u64 + 2,
None => state.next_index[peer] = reply.conflict_index,
}
}
}
state.next_index[peer] = state.next_index[peer].max(state.last_snap_index + 1);
}
// Commit index advancement (leader only)
// RULE: Only advance commit on entries from CURRENT term (Figure 8)
fn maybe_advance_commit(state: &mut RaftState, n: usize) {
let base = state.last_snap_index;
for n_idx in (state.commit_index + 1..=state.log.len() as u64 + base).rev() {
let log_idx = (n_idx - base - 1) as usize;
// Only commit entries from current term!
if state.log[log_idx].term != state.current_term { break; }
let replicated = state.match_index.iter().filter(|&&m| m >= n_idx).count() + 1;
if replicated > n / 2 {
state.commit_index = n_idx;
break;
}
}
}
Applier Loop — Sending Committed Entries to the State Machine
This is the glue between Raft and the application (KV server in Lab 4). A background task watches for commit_index > last_applied and sends committed entries via apply_tx. Without this, nothing actually gets applied.
async fn applier_loop(
raft: Arc<Mutex<RaftState>>,
apply_tx: mpsc::Sender<ApplyMsg>,
snap_index: Arc<std::sync::atomic::AtomicU64>,
) {
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
let msgs = {
let mut s = raft.lock().unwrap();
let mut msgs = Vec::new();
while s.last_applied < s.commit_index {
s.last_applied += 1;
let idx = s.last_applied;
let log_offset = (idx - s.last_snap_index - 1) as usize;
if log_offset >= s.log.len() { break; }
let entry = &s.log[log_offset];
msgs.push(ApplyMsg::Command {
valid: true,
command: entry.command.clone(),
command_index: idx,
command_term: entry.term,
});
}
msgs
}; // lock dropped before sending
// Send outside lock — apply_tx.send() may block
for msg in msgs {
let _ = apply_tx.send(msg).await;
}
}
}
Never send on apply_tx while holding the Mutex. The receiver (KV server) may need to call back into Raft (e.g., to take a snapshot), causing a deadlock. Collect messages under the lock, drop it, then send.
You're done with Lab 3B when: All basic agreement, fail-agree, concurrent-starts, rejoin, backup, and count tests pass 500+ times. The backup test is especially important — it verifies fast log backtracking works.
Lab 3C: Persistence
Raft must survive crashes. Three fields must be saved to the Persister before replying to any RPC or sending any RPC reply: current_term, voted_for, log.
#[derive(serde::Serialize, serde::Deserialize)]
struct PersistData {
current_term: u64,
voted_for: Option<usize>,
log: Vec<LogEntry>,
last_snap_index: u64,
last_snap_term: u64,
}
impl Raft {
fn persist(&self, s: &RaftState) {
let data = PersistData {
current_term: s.current_term,
voted_for: s.voted_for,
log: s.log.clone(),
last_snap_index: s.last_snap_index,
last_snap_term: s.last_snap_term,
};
let bytes = bincode::serialize(&data).unwrap();
self.persister.save_raft_state(bytes);
}
fn restore(&self, s: &mut RaftState) {
let bytes = self.persister.read_raft_state();
if bytes.is_empty() { return; }
if let Ok(data) = bincode::deserialize::<PersistData>(&bytes) {
s.current_term = data.current_term;
s.voted_for = data.voted_for;
s.log = data.log;
s.last_snap_index = data.last_snap_index;
s.last_snap_term = data.last_snap_term;
s.last_applied = data.last_snap_index;
s.commit_index = data.last_snap_index;
}
}
}
// CALL persist() at EVERY place you modify these fields:
// - current_term (any term update)
// - voted_for (when granting a vote)
// - log (when appending entries)
//
// Where to call persist():
// ✅ handle_request_vote (before reply when granting vote)
// ✅ handle_append_entries (before reply, after modifying log)
// ✅ start() — when leader appends to its own log
// ✅ Any code that changes current_term
Lab 3D: Log Compaction (Snapshots)
The log grows forever without compaction. The application takes a snapshot of its state and tells Raft to discard log entries before a certain index. Lagging followers receive snapshots via InstallSnapshot RPC.
// Application (KvServer) calls this when it wants to snapshot
impl Raft {
pub fn snapshot(&self, snap_index: u64, snap_data: Vec<u8>) {
let mut s = self.state.lock().unwrap();
if snap_index <= s.last_snap_index { return; }
let offset = (snap_index - s.last_snap_index) as usize;
if offset > s.log.len() { return; }
s.last_snap_term = s.log[offset - 1].term;
s.last_snap_index = snap_index;
s.log.drain(..offset); // discard compacted entries
let raft_state = bincode::serialize(&PersistData {
current_term: s.current_term,
voted_for: s.voted_for,
log: s.log.clone(),
last_snap_index: s.last_snap_index,
last_snap_term: s.last_snap_term,
}).unwrap();
drop(s);
self.persister.save_state_and_snapshot(raft_state, snap_data);
}
}
// InstallSnapshot RPC args
#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub struct InstallSnapshotArgs {
pub term: u64,
pub leader_id: usize,
pub last_included_index: u64,
pub last_included_term: u64,
pub data: Vec<u8>, // entire snapshot
}
// Receiver: install snapshot, send ApplyMsg::Snapshot to app
pub fn handle_install_snapshot(&self, args: InstallSnapshotArgs) -> (u64,) {
let mut s = self.state.lock().unwrap();
if args.term < s.current_term { return (s.current_term,); }
if args.term > s.current_term {
s.current_term = args.term; s.role = Role::Follower; s.voted_for = None;
}
if args.last_included_index <= s.last_snap_index { return (s.current_term,); }
// Discard entire log if snapshot covers it, or trim
let snap_end = (args.last_included_index - s.last_snap_index) as usize;
if snap_end <= s.log.len() && s.log[snap_end-1].term == args.last_included_term {
s.log.drain(..snap_end);
} else { s.log.clear(); }
s.last_snap_index = args.last_included_index;
s.last_snap_term = args.last_included_term;
s.commit_index = s.commit_index.max(args.last_included_index);
s.last_applied = s.last_applied.max(args.last_included_index);
let snap_data = args.data.clone();
let cur_term = s.current_term;
drop(s);
// Notify app to apply snapshot
let _ = self.apply_tx.try_send(ApplyMsg::Snapshot {
valid: true, snapshot: snap_data,
snapshot_index: args.last_included_index,
snapshot_term: args.last_included_term,
});
(cur_term,)
}
You're done with Lab 3C+3D when: All persistence tests (crash, restart, re-elect, continue) and snapshot tests (install-snapshot, snapshot-size) pass 500+ times. Pay special attention to the Figure 8 unreliable test — it catches commit safety bugs.
Lab 4: Fault-Tolerant KV (Raft-backed)
Read before starting: Watch Lectures 8–9 · Read lab-kvraft1.html
Mental Model
You now combine Labs 2 and 3 into one system. Every client operation flows through this pipeline:
Client KV Server (Leader) Raft Layer KV Servers (All)
| | | |
|--- Put(x,1) --------->| | |
| |--- raft.start(cmd) ---->| |
| | |-- AppendEntries --->|
| | |<-- majority ACK ----|
| | | |
| |<-- ApplyMsg(cmd,idx) ----|-- ApplyMsg -------->|
| | | |
| | apply to state machine | apply to state machine
| | data["x"] = "1" | data["x"] = "1" |
| | check dedup table | |
| | wake pending client | |
|<------ "ok" ----------| | |
Key insight: The KV server never modifies its state directly. It submits every operation to Raft, waits for Raft to commit it, then applies it from the apply_ch. This ensures all replicas apply operations in the same order. The dedup table from Lab 2 prevents duplicate applies across leader changes.
Plug your Lab 3 Raft into a KV state machine. Every Put/Get/Append goes through Raft consensus before being applied. The cluster survives minority node failures.
use std::collections::HashMap;
use tokio::sync::{mpsc, oneshot};
pub struct KvServer {
raft: Raft,
me: usize,
data: HashMap<String, String>,
dedup: HashMap<u64, (u64, String)>, // client_id → (seq, reply)
// log_index → waker waiting for that index to be applied
pending: HashMap<u64, oneshot::Sender<Result<String, KvError>>>,
max_raft_state: Option<usize>, // trigger snapshot at this size
last_applied: u64,
}
#[derive(Debug)]
pub enum KvError { WrongLeader, Timeout, Internal }
impl KvServer {
// RPC handler — called by client
pub async fn handle_op(server: Arc<Mutex<KvServer>>, req: Request)
-> Result<String, KvError>
{
// Submit to Raft
let (index, rx) = {
let mut s = server.lock().unwrap();
let cmd = bincode::serialize(&req).unwrap();
match s.raft.start(cmd) {
Err(_) => return Err(KvError::WrongLeader),
Ok((idx, _term)) => {
let (tx, rx) = oneshot::channel();
s.pending.insert(idx, tx);
(idx, rx)
}
}
};
// Wait for Raft to apply, with timeout
tokio::time::timeout(
std::time::Duration::from_millis(500),
rx
).await
.map_err(|_| KvError::Timeout)?
.map_err(|_| KvError::Internal)?
}
}
// Background applier — consumes apply_ch from Raft
async fn applier(server: Arc<Mutex<KvServer>>, mut rx: mpsc::Receiver<ApplyMsg>) {
while let Some(msg) = rx.recv().await {
let mut s = server.lock().unwrap();
match msg {
ApplyMsg::Command { command_index, command, .. } => {
if command_index <= s.last_applied { continue; } // stale
s.last_applied = command_index;
let req: Request = bincode::deserialize(&command).unwrap();
// Check duplicate
let result = if let Some((last_seq, cached)) = s.dedup.get(&req.client_id) {
if req.seq_num == *last_seq { cached.clone() }
else { s.apply_and_cache(&req) }
} else { s.apply_and_cache(&req) };
// Wake waiting client handler
if let Some(tx) = s.pending.remove(&command_index) {
let _ = tx.send(Ok(result));
}
// Lab 4B: trigger snapshot if log is too large
if let Some(max_sz) = s.max_raft_state {
if s.raft.persister_size() > max_sz {
let snap = s.encode_snapshot();
s.raft.snapshot(command_index, snap);
}
}
}
ApplyMsg::Snapshot { snapshot, snapshot_index, .. } => {
s.install_snapshot(snapshot, snapshot_index);
}
}
}
}
Leader redirect: If raft.start() returns NotLeader, return WrongLeader to the client. The client tries every server in a round-robin until it finds the current leader. Cache the last known leader index for faster retries.
You're done with Lab 4 when: All basic, concurrent, unreliable, snapshot, and linearizability tests pass 200+ times. Lab 4B snapshot tests verify that Raft state size stays bounded after many operations. If it grows unboundedly, your snapshot trigger is not firing.
Lab 5: Sharded Key/Value Service
Read before starting: Watch Lectures 12–13, 16 · Read lab-shard.html
Mental Model
One Raft group (Lab 4) cannot handle all the data — it is limited by the leader's throughput. The solution: shard the data. Split all keys into 10 buckets (shards) using a hash function. Each bucket is owned by a different Raft group. Now you have 10x the throughput.
But who decides which group owns which shard? A dedicated shard controller (itself a Raft group) manages a sequence of configurations. When groups join or leave, the controller rebalances shards, and groups must migrate shard data to their new owners.
Client Shard Controller (Raft group)
| |
| key2shard("x") = 3 | Config 7: shard 3 → Group 101
| | shard 7 → Group 102
v | ...
Group 101 (Raft) |
┌──────────────────┐ |
│ Shard 0: {...} │ |
│ Shard 3: {...} ◄─┼──── client sends Put(x,1) here
│ Shard 6: {...} │
└──────────────────┘
Group 102 (Raft)
┌──────────────────┐
│ Shard 1: {...} │
│ Shard 7: {...} │
└──────────────────┘
The hard part is migration. When the controller reassigns shard 3 from Group 101 to Group 102, Group 102 must pull the shard data from Group 101 before it can serve requests for those keys. During migration, requests for shard 3 return "wrong group" and clients retry.
The full distributed database. Keys are divided into NShards = 10 buckets. Multiple replica groups (each running Raft from Lab 3) own subsets of shards. A dedicated shard controller determines the mapping.
Part 5A: Shard Controller
pub const NSHARDS: usize = 10;
#[derive(Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct Config {
pub num: u64, // config number (starts at 1)
pub shards: [u64; NSHARDS], // shards[i] = gid owning shard i (0 = unassigned)
pub groups: HashMap<u64, Vec<String>>, // gid → server list
}
pub enum CtlOp {
Join { gids: HashMap<u64, Vec<String>> }, // add new groups
Leave { gids: Vec<u64> }, // remove groups
Move { shard: usize, gid: u64 }, // manually assign shard
Query { num: i64 }, // -1 = latest config
}
// Rebalance: after join/leave, ensure shards are equally distributed
fn rebalance(shards: &mut [u64; NSHARDS], groups: &HashMap<u64, Vec<String>>) {
if groups.is_empty() { shards.fill(0); return; }
let n = groups.len();
let base = NSHARDS / n;
let extras = NSHARDS % n;
// Sort group ids for determinism
let mut gids: Vec<u64> = groups.keys().cloned().collect();
gids.sort();
let target: HashMap<u64, usize> = gids.iter().enumerate()
.map(|(i, &g)| (g, base + if i < extras { 1 } else { 0 })).collect();
// Move shards from over-loaded to under-loaded groups greedily
// (full greedy rebalance algorithm)
}
Part 5B–5C: Shard State Machine
Each shard has its own lifecycle status. This is the key data structure for migration:
// Per-shard status in the ShardKV server
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum ShardStatus {
Serving, // This group owns and is actively serving this shard
Pulling, // We need this shard — we're pulling data from old owner
BePulling, // Old owner: waiting for new owner to pull our data
GCing, // New owner confirmed — safe to delete our copy
}
pub struct Shard {
pub data: HashMap<String, String>,
pub status: ShardStatus,
}
pub struct ShardKvServer {
raft: Raft,
me: usize,
gid: u64,
shards: [Shard; NSHARDS],
config: Config, // current config
last_config: Config, // previous config (needed for pulling)
dedup: HashMap<u64, (u64, String)>,
}
The Four Background Tasks
The ShardKV server runs four background tasks alongside the Raft applier. All four are leader-only (check raft.is_leader() before acting):
Query the shard controller for config current_config.num + 1. If found, submit it as a Raft log entry. Only advance one config at a time. Never skip from config 5 to config 7 — you must process config 6 first, because shard migration depends on knowing both the old and new owner.
For all shards in Pulling state, contact the old owner group (from last_config). Send a ShardMigration RPC to get the shard data + dedup table. Submit the received data as a Raft log entry on this group. Only pull when all shards in Pulling state can be fetched.
For shards in GCing state, notify the old owner they can delete their copy. The old owner moves shard from BePulling to deleted (via Raft log entry). On success, submit a GC-complete Raft entry to move our shard from GCing to Serving.
Check if this group owns the shard for that key in the current config. Only serve from shards in Serving status. If status is Pulling, BePulling, or GCing, return ErrWrongGroup. The client retries with a refreshed config.
Shard Migration State Machine
Each shard transitions through these states during a config change. This is the most confusing part of Lab 5 — memorize this lifecycle:
OLD OWNER (Group A): NEW OWNER (Group B):
Config N: Shard 3 = Serving Config N: Shard 3 = (not owned)
| |
config N+1 applied config N+1 applied
| |
v v
Shard 3 = BePulling Shard 3 = Pulling
| |
| ◄──── PullShard RPC ──────────── |
| ────── shard data + dedup ─────► |
| |
| apply MigrationData
| |
v v
Shard 3 = BePulling Shard 3 = GCing
| |
| ◄──── GC notification ───────── |
| |
apply GC apply GCDone
| |
v v
Shard 3 = (deleted) Shard 3 = Serving
Critical rule: Do NOT poll for config N+2 until ALL shards are in Serving state under config N+1. If you advance configs while migration is in progress, you will lose shard data. This is the #1 bug in Lab 5.
Part 5B–5D: ShardKV Internal Operations
Unlike Lab 4 where all Raft entries are client ops, Lab 5 has four types of operations going through the Raft log:
// All operations submitted to Raft in the ShardKV server
#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub enum ShardOp {
/// Normal client operation (Put/Get/Append)
Client {
client_id: u64,
seq_num: u64,
key: String,
op: Op,
},
/// New configuration detected — update shard ownership
ConfigChange {
config: Config,
},
/// Shard data pulled from old owner — install it
MigrationData {
config_num: u64,
shard_id: usize,
data: HashMap<String, String>,
dedup: HashMap<u64, (u64, String)>,
},
/// Old owner confirmed deletion — mark shard as fully serving
ShardGC {
config_num: u64,
shard_id: usize,
},
}
Client Clerk for Sharded KV
The client must determine which group to contact for each key:
fn key2shard(key: &str) -> usize {
let h = ihash(key);
h % NSHARDS
}
pub struct ShardClerk {
sm_clerk: Clerk, // clerk for shard controller
config: Config, // cached latest config
client_id: u64,
seq_num: u64,
// Per-group: index of last known leader
leaders: HashMap<u64, usize>,
}
impl ShardClerk {
pub async fn get(&mut self, key: String) -> String {
loop {
let shard = key2shard(&key);
let gid = self.config.shards[shard];
if gid == 0 {
// No group owns this shard — refresh config
self.config = self.sm_clerk.query(-1).await;
continue;
}
let servers = &self.config.groups[&gid];
let leader = self.leaders.get(&gid).copied().unwrap_or(0);
// Try each server in the group, starting with cached leader
for offset in 0..servers.len() {
let idx = (leader + offset) % servers.len();
match self.rpc_get(&servers[idx], &key).await {
Ok(val) => {
self.leaders.insert(gid, idx);
return val;
}
Err(KvError::WrongGroup) => break, // refresh config
Err(_) => continue, // try next server
}
}
// Refresh config from shard controller and retry
tokio::time::sleep(Duration::from_millis(100)).await;
self.config = self.sm_clerk.query(-1).await;
}
}
}
Dedup tables must migrate with shard data. When shard 3 moves from Group A to Group B, the dedup entries for clients that wrote to shard 3 must also move. Otherwise, duplicate detection breaks after migration.
You're done with Lab 5 when: All shard controller tests (join, leave, move, multi, minimal-transfers), sharded KV tests (basic, concurrent, unreliable), migration tests, and GC/challenge tests pass 200+ times. The unreliable + migration tests are the hardest — they combine network failures with shard movement. If you pass those, you have built a real distributed database.
Essential Resources (Bookmark These)
These are the most valuable supplementary resources for the labs, ranked by importance. Many students wish they had found these earlier.
Quick Links
| Resource | URL | When to Use |
|---|---|---|
| Course home page | pdos.csail.mit.edu/6.824 | Lab specs, papers, schedule |
| 2020 YouTube playlist | YouTube playlist | All 20 lectures |
| Raft paper (extended) | Lab 3 spec | |
| MapReduce paper | Lab 1 background | |
| Tokio tutorial | tokio.rs/tokio/tutorial | Async Rust primer |
| Diego Ongaro's Raft talk | YouTube | Best Raft explanation, watch before Lab 3 |
Key Rust Patterns for These Labs
Go → Rust Concurrency Mapping
| Go Pattern | Rust / Tokio Equivalent |
|---|---|
go func() { ... }() | tokio::spawn(async { ... }) |
time.Sleep(100ms) | tokio::time::sleep(Duration::from_millis(100)).await |
sync.Mutex | Arc<std::sync::Mutex<T>> (use std, not tokio::sync) |
sync.WaitGroup | tokio::task::JoinSet |
make(chan T, N) | tokio::sync::mpsc::channel(N) |
select {} | tokio::select! {} |
sync.Cond + Wait() | tokio::sync::Notify |
time.NewTimer() | tokio::time::sleep_until(Instant) or interval() |
atomic.AddInt64 | std::sync::atomic::AtomicU64 |
Election Timer Pattern
// Each peer has a reset-able election timer
use tokio::sync::Notify;
use tokio::time::{Duration, sleep};
use rand::Rng;
async fn election_timer_loop(notify: Arc<Notify>, raft: Arc<Mutex<RaftState>>) {
loop {
let timeout = Duration::from_millis(rand::thread_rng().gen_range(150..300));
tokio::select! {
_ = sleep(timeout) => {
// Timeout fired — start election if follower/candidate
let role = raft.lock().unwrap().role.clone();
if role != Role::Leader {
tokio::spawn(start_election(raft.clone()));
}
}
_ = notify.notified() => {
// Heartbeat received — reset timer (loop again)
}
}
}
}
// In AppendEntries handler, call:
// self.election_notify.notify_one();
Papers to Read (In Order)
| Paper | Year | Lab | Priority |
|---|---|---|---|
| MapReduce (Dean & Ghemawat) | 2004 | Lab 1 | 🔴 Required |
| The Google File System | 2003 | Context | 🟡 Read it |
| Raft Extended Paper (Ongaro) | 2014 | Lab 3 | 🔴 Required × 3 |
| ZooKeeper (Hunt et al.) | 2010 | Context | 🟡 Read it |
| Spanner (Corbett et al.) | 2012 | Context/Lab4 | 🟠 Recommended |
| Dynamo (DeCandia et al.) | 2007 | Lab 5 context | 🟠 Recommended |
| COPS (Lloyd et al.) | 2011 | Lec 17 | 🟡 Read it |
| FaRM (Dragojević et al.) | 2014 | Lec 14 | ⚪ Optional |
| CRAQ (Terrace & Freedman) | 2009 | Lec 9 | 🟡 Read it |
| Aurora (Verbitski et al.) | 2017 | Lec 10 | ⚪ Optional |
All papers are linked from the schedule page: pdos.csail.mit.edu/6.824/schedule.html. Each lecture lists the paper to read beforehand.
Debugging Distributed Systems in Rust
Structured Logging
// Add to each crate's main or test setup
fn init_tracing() {
let _ = tracing_subscriber::fmt()
.with_env_filter("lab3_raft=debug,lab4_kvraft=info")
.with_thread_ids(true)
.with_target(true)
.try_init();
}
// ALWAYS log: term, role, peer index, event name
tracing::debug!(
term = s.current_term,
role = ?s.role,
me = self.me,
from = args.candidate_id,
granted = vote_granted,
"RequestVote"
);
Stress Test Script
Never consider a Raft implementation correct until it passes 500 consecutive test runs. Use this script:
#!/bin/bash
# stress_test.sh — runs a test N times, stops on first failure
# Usage: ./stress_test.sh 500 lab3_raft test_initial_election
COUNT=$1; PKG=$2; TEST=$3
for i in $(seq 1 $COUNT); do
if ! cargo test --package $PKG $TEST -- --nocapture 2>&1 | tail -1 | grep -q ok; then
echo "❌ FAILED on run $i"
exit 1
fi
echo "✅ Run $i/$COUNT passed"
done
echo "🎉 All $COUNT runs passed"
Common Bugs Table
| Bug | Symptom | Fix |
|---|---|---|
Hold Mutex across .await | Compiler error: MutexGuard is not Send | Drop guard before every .await point |
| Election timer not reset on AppendEntries | Multiple leaders in same term | Call notify.notify_one() in ALL valid AE handlers, not just successful ones |
| Committing old-term entries directly | Stale reads after leader change (Figure 8) | log[N].term == currentTerm check before advancing commitIndex |
| Not persisting before RPC reply | Vote for two candidates same term after crash | Call persist() before any return in RequestVote / AppendEntries handlers |
| next_index underflow | Index panic on subtraction | next_index[i] = max(1, next_index[i] - 1) |
| last_applied not reset after snapshot | Re-applies committed entries | Set last_applied = snapshot_index in InstallSnapshot handler |
| KvServer applies duplicate ops after snapshot | Data corruption on dedup table | Include dedup table in snapshot data; restore it on snapshot load |
| Lab 5: processing config N+1 before N migration done | Missing shard data | Only poll next config when all shards are in Serving state |
Async Rust Pitfalls in These Labs
1. std::sync::Mutex vs tokio::sync::Mutex
Use std::sync::Mutex for Raft state (never held across await). Use tokio::sync::Mutex only if you absolutely must hold across await — but try to avoid that entirely.
// ❌ WRONG — will fail to compile
async fn bad(state: Arc<std::sync::Mutex<RaftState>>) {
let guard = state.lock().unwrap();
some_async_fn().await; // guard is still alive here — NOT Send
}
// ✅ CORRECT — drop guard before await
async fn good(state: Arc<std::sync::Mutex<RaftState>>) {
let value = {
let guard = state.lock().unwrap();
guard.some_field // copy/clone what you need
}; // guard dropped here
some_async_fn(value).await;
}
2. tokio::spawn requires 'static + Send
All data passed into tokio::spawn must be 'static (owned, not borrowed) and Send. Clone Arcs before spawning, don't pass references.
// ❌ WRONG — cannot borrow across spawn
let raft_ref = &raft;
tokio::spawn(async move { raft_ref.something(); });
// ✅ CORRECT — clone the Arc
let raft2 = raft.clone();
tokio::spawn(async move { raft2.something(); });
3. select! cancellation
When a tokio::select! branch wins, the other branches' futures are dropped. If those futures were in the middle of work (e.g., sending an RPC), that work is cancelled. Design your futures to be cancel-safe, or use JoinSet instead.
tokio::select! {
_ = sleep(timeout) => { /* timer fired */ }
result = rpc_future => {
// rpc_future may have sent the RPC but not gotten reply
// — this is fine for Raft since RPCs are idempotent
}
}
4. Deadlock: locking the same Mutex twice
In Raft, it's easy to call a method that locks the state mutex when you already hold it. Use a pattern where the lock is acquired only at the start of each RPC handler, work is done, then released.
// Avoid calling self.some_method() while holding self.state.lock()
// if some_method() also locks self.state
// Pattern: extract a free function that takes &mut RaftState
fn advance_commit_index(s: &mut RaftState, n_peers: usize) { /* ... */ }
// Call it with the lock held, but it never re-locks
let mut s = self.state.lock().unwrap();
advance_commit_index(&mut s, self.n_peers);
drop(s);
Final advice: Struggle genuinely with each lab before looking anything up. The distributed systems intuition you build while hunting an intermittent Raft bug for two days is irreplaceable. TiKV, etcd, CockroachDB, and Solana's consensus layer are all built on these exact ideas — you're not just doing exercises, you're learning the foundation of modern distributed infrastructure.