MIT 6.5840 · formerly 6.824 · Complete Self-Study Guide

Distributed Systems
Labs in Rust

A complete field guide — from Day 1 setup through all 5 lab sequences. Covers every concept from the YouTube lectures, mapped directly to Rust implementations. No Go required.

Course  pdos.csail.mit.edu/6.824
Labs  5 sequences · 12 sub-parts
Language  Rust 2021 + Tokio
Total time  ~10–14 weeks

MIT 6.5840 Distributed Systems

MIT 6.5840 (formerly 6.824) is the gold-standard graduate course on distributed systems, taught by Prof. Robert Morris. Every lecture pairs theory from a seminal paper with implementation work in the labs. The 5-lab sequence builds a complete sharded, fault-tolerant key-value database from scratch.

Why do it in Rust? The official labs are Go-only. But since the lab specs are language-agnostic, you can implement them in Rust — which gives you Rust's async/ownership model on top of distributed systems concepts. Every major storage system (TiKV, FoundationDB, CockroachDB) uses these exact ideas.

Time Estimates per Lab

LAB 1
MapReduce
1–2 wks
LAB 2
KV Server
1 wk
LAB 3
Raft
3–4 wks
LAB 4
KV+Raft
1–2 wks
LAB 5
Sharded KV
2–3 wks

The 5 Labs

LAB 1
MapReduce
★★☆☆ Moderate~1–2 weeks
Distributed MapReduce coordinator + N workers. Workers ask coordinator for tasks via RPC, execute map/reduce functions, write intermediate files. Coordinator detects worker failures and re-assigns tasks after 10 seconds.
RPC over Unix/TCPCoordinator state machineCrash recoveryAtomic file rename
LAB 2
Key/Value Server (single machine)
★★☆☆ Moderate~1 week
Single-server KV store (Put/Append/Get) that must handle duplicate RPCs over an unreliable network. Clients retry on timeout — server must deduplicate by client ID + monotonic sequence number.
LinearizabilityExactly-once semanticsClient retryDedup table
LAB 3
Raft Consensus Algorithm
★★★★ Very Hard~3–4 weeks
Full implementation of Raft split into 4 sub-parts. The hardest lab in the course. Leader election → log replication → crash persistence → log compaction with snapshots. Implement Figure 2 of the paper exactly.
3A: Leader Election3B: Log Replication3C: Persistence3D: Log Compaction
LAB 4
Fault-Tolerant KV (Raft-backed)
★★★☆ Hard~1–2 weeks
Replicated KV service on top of your Lab 3 Raft. KV state machine runs on top of Raft consensus. Handles leader changes, deduplication across nodes, and snapshot-based log trimming.
4A: Without snapshots4B: With snapshotsLeader redirectSnapshot trigger
LAB 5
Sharded Key/Value Service
★★★★ Very Hard~2–3 weeks
Shard the KV database across multiple Raft replica groups. A dedicated shard controller manages config. Implements live shard migration with full consistency guarantees and garbage collection.
5A: Shard Controller5B: Sharded KV5C: Shard Migration5D: GC + Challenge

What You Need Before Starting

You do not need prior distributed systems experience. But you do need solid programming fundamentals and comfort with Rust's core model. Here is exactly what to learn first if you are missing any of these.

Rust Prerequisites

You will be writing concurrent, async Rust from Lab 1 onward. Make sure you are comfortable with these concepts before starting:

ConceptWhy You Need ItWhere to Learn
Ownership & borrowingEvery shared data structure uses Arc<Mutex<T>>Rust Book Ch 4
Enums & pattern matchingRPC messages, task types, and state machines are all enumsRust Book Ch 6
Traits & trait objectsYou define Persister, MrApp, and other interfaces as traitsRust Book Ch 10
Smart pointers (Arc, Box)All shared state across tasks uses ArcRust Book Ch 15
Concurrency (Mutex, threads)Every lab is inherently concurrentRust Book Ch 16
async/await & Tokio basicsAll network I/O and timers use TokioTokio Tutorial
Serde serializationEvery RPC argument and log entry is serialized with bincode/serdeserde.rs
+

Not sure if you're ready? Try this test: write a Tokio program with two tasks sharing an Arc<Mutex<HashMap<String, String>>>, where one task inserts values and the other reads them with a 100ms delay. If you can do that comfortably, you are ready for Lab 1.

CS Prerequisites

  • Basic networking — understand that clients send requests, servers send responses, and messages can be lost or delayed. You do not need socket programming experience; the fake RPC layer handles transport.
  • Threads vs processes — understand that threads share memory and processes do not. Raft peers are separate processes communicating over the network.
  • Hash maps, sorting, file I/O — Lab 1 (MapReduce) relies heavily on all three.

Recommended (Not Required) Pre-Reading

Distributed Systems Fundamentals

Before touching any code, internalize these ideas. Every lecture and every lab traces back to them. If you get stuck later, come back here.

Why Is Distributed Systems Hard?

Three fundamental problems make distributed systems different from single-machine programming:

ProblemWhat It MeansLab Impact
Unreliable networkMessages can be lost, duplicated, delayed, or arrive out of order. There is no way to distinguish a slow server from a dead one.Labs 1–5: all RPCs can fail. You must design for retries and idempotency everywhere.
Partial failureSome nodes crash while others keep running. A crashed node might restart with old state.Lab 3C: Raft must persist state so a restarted node recovers. Lab 4: KV service survives minority failures.
No global clockNodes cannot agree on "what time is it." You cannot order events by wall-clock timestamps.Lab 3: Raft uses terms (logical clocks) instead of timestamps to order events and detect stale leaders.

Failure Models

Distributed systems papers define how nodes can fail. The labs use the crash-recovery model:

  • Crash-stop — a node crashes and never comes back. Simple but unrealistic.
  • Crash-recovery — a node crashes but can restart and rejoin with its persisted state. This is the model Raft uses. Labs 3C and 3D test this extensively.
  • Byzantine — a node can lie, send conflicting messages, or act maliciously. Raft does NOT handle this. Bitcoin (Lecture 19) does.

The CAP Theorem (One Paragraph)

During a network partition, a distributed system must choose between Consistency (every read returns the latest write) and Availability (every request gets a response). Raft chooses consistency: if a leader cannot reach a majority, it stops accepting writes rather than risk divergent state. Dynamo (Lecture 17) chooses availability: it accepts conflicting writes and reconciles them later. Understanding this trade-off is the single most important intuition in distributed systems.

Consistency Models — The Ladder

From weakest to strongest. The labs require linearizability, the strongest practical guarantee:

ModelGuaranteeUsed By
Eventual consistencyAll replicas converge "eventually" — you might read stale dataDynamo, DNS, Cassandra
Causal consistencyCausally related operations are seen in order; concurrent operations may differCOPS (Lecture 17)
Sequential consistencyAll operations appear in some global order consistent with each client's program orderZooKeeper writes
LinearizabilityEvery operation appears to execute atomically at some point between its invocation and response. The strongest guarantee.Labs 2–5, Raft-based KV stores, Spanner
i

Linearizability in plain English: If client A's Put("x","1") returns before client B calls Get("x"), then B must see "1". Operations cannot appear to go backwards in time. Your KV servers in Labs 2, 4, and 5 must satisfy this.

The Replicated State Machine — The Core Idea

This single concept ties Labs 3, 4, and 5 together. Memorize it:

Replicated State Machine Architecture
  Client           Leader             Follower A          Follower B
    |                 |                    |                    |
    |--- Put(x,1) -->|                    |                    |
    |                 |-- AppendEntries -->|                    |
    |                 |-- AppendEntries ---|---------->         |
    |                 |                    |                    |
    |                 |<--- ACK -----------|                    |
    |                 |<--- ACK --------------------------------|
    |                 |                    |                    |
    |                 | (majority replied — entry is committed) |
    |                 |                    |                    |
    |                 | Apply: data["x"] = "1"                  |
    |                 |        data["x"] = "1"                  |
    |                 |                   data["x"] = "1"       |
    |<-- OK ---------|                    |                    |

Key insight: If every node starts with the same initial state and applies the same commands in the same order, they all end up in the same state. Raft's job is to make all nodes agree on that order. The KV server (Lab 4) is the state machine. Raft (Lab 3) is the log that ensures identical ordering.

Key Terms Glossary

TermDefinition
termA logical clock in Raft. Each election increments the term. A term has at most one leader. Stale terms are rejected.
commit_indexThe highest log entry known to be replicated on a majority. Safe to apply to the state machine.
last_appliedThe highest log entry already applied to the state machine. Always <= commit_index.
leader / follower / candidateThe three Raft node roles. Followers listen. Candidates run elections. The leader coordinates everything.
quorum / majorityMore than half the nodes. In a 5-node cluster, quorum = 3. All Raft decisions require a quorum.
idempotentAn operation that produces the same result whether applied once or multiple times. Essential for retry safety.
exactly-once semanticsEach client operation is applied exactly once, even if the RPC is retried. Implemented via client ID + sequence number deduplication.
next_index[i]Leader-only: the next log index to send to peer i. Initialized to leader's last log index + 1 on election.
match_index[i]Leader-only: the highest log index known to be replicated on peer i. Used to advance commit_index.

Recommended 12-Week Study Plan

This schedule assumes 10–15 hours per week. Adjust based on your Rust experience. The key rule: watch lectures and read the paper before starting the corresponding lab.

Weeks 1–2
Foundations + Lab 1 (MapReduce)
Watch Lectures 1–4 (Intro, RPC/Threads, GFS, Primary-Backup). Read the MapReduce paper. Set up your Rust workspace. Implement and test Lab 1.
Week 3
Exactly-Once Semantics + Lab 2 (KV Server)
Watch Lecture 5 (Go, Threads, and Raft — translate the Go patterns to Tokio). Implement Lab 2. This is a short lab but the dedup pattern is reused in Labs 4 and 5.
Weeks 4–5
Raft Part 1 + Lab 3A & 3B
Read the Raft Extended paper (all of it). Watch Lectures 6–7 (Raft Parts 1 & 2). Watch Diego Ongaro's 2014 talk. Implement leader election (3A), then log replication (3B). Run tests 500+ times each.
Week 6
Raft Part 2 + Lab 3C & 3D
Implement persistence (3C) and log compaction with snapshots (3D). Read the Students' Guide to Raft. Stress test everything.
Weeks 7–8
Replicated KV + Lab 4
Watch Lectures 8–11 (ZooKeeper, CRAQ, Aurora, Frangipani). Implement Lab 4A (KV without snapshots) then 4B (with snapshots). The applier pattern from Raft connects here.
Weeks 9–11
Sharding + Lab 5
Watch Lectures 12–17 (Distributed Transactions, Spanner, FaRM, Spark, Memcached, COPS). Implement Lab 5A (shard controller), 5B (sharded KV), 5C (shard migration), 5D (GC).
Week 12
Enrichment + Review
Watch Lectures 18–20 (Certificate Transparency, Bitcoin, Blockstack). Re-run all test suites. Read TiKV or etcd source code to see production Raft. Celebrate — you built a sharded, fault-tolerant database from scratch.
!

Lab 3 (Raft) will take longer than you think. Plan for 3–4 weeks even if you are experienced. Most bugs are intermittent timing issues that only appear after hundreds of test runs. Budget extra time.

Where to Actually Start

Here is exactly what to do on Day 1, step by step. Most people get confused because the official repo is Go-only. You'll use it for the lab specifications only, and write everything in Rust.

Bookmark the official course page

Go to pdos.csail.mit.edu/6.824. This is the live course page with lab specs, lecture notes, and papers. Click "Schedule" and read it top to bottom to understand the structure.

Clone the official Go repo — for specs only

You won't run Go code, but the repo contains lab spec HTML pages and test expectations you must match.

git clone git://g.csail.mit.edu/6.5840-golabs-2026 mit6824-spec
cd mit6824-spec/src
ls
# mr/       → Lab 1 skeleton + test scripts
# kvsrv/    → Lab 2 skeleton
# raft/     → Lab 3 skeleton (raft.go, test_test.go)
# kvraft/   → Lab 4 skeleton
# shardkv/  → Lab 5 skeleton
# shardctrler/ → Lab 5 shard controller

Read the *_test.go files — they describe exactly what your Rust code must do. Also read the lab HTML pages at pdos.csail.mit.edu/6.824/labs/lab-mr.html etc.

Create your Rust workspace
cargo new --lib mit6824-rust
cd mit6824-rust

# Create all crates
cargo new --lib common
cargo new --lib lab1-mapreduce
cargo new --lib lab2-kvsrv
cargo new --lib lab3-raft
cargo new --lib lab4-kvraft
cargo new --lib lab5-shardkv
Watch Lecture 1 before writing any code

The 2020 Robert Morris lectures on YouTube are the best. Watch Lec 1 (Introduction + MapReduce) and read the MapReduce paper before starting Lab 1. The lab will make much more sense.

Read a lab spec, then implement, then test

For each lab: (1) read the official HTML lab page fully, (2) read the Go skeleton code to understand what interfaces to implement, (3) write your Rust equivalent, (4) write tests that mirror the Go test file. Repeat until tests pass 100+ times in a row.

Complete YouTube Lecture Map

All 20 lectures are free on YouTube from the 2020 Spring semester (Robert Morris). This is the most complete and polished version. Watch lectures before the corresponding lab, not after. Each lecture is tagged with its lab relevance.

i

Main playlist (2020): youtube.com/playlist?list=PLrw6a1wE39_tb2fErI4-WkMbsvGQk9_UB — 20 lectures, ~1.5 hours each.

!

Lab numbering note: The 2020 course had a different lab numbering (Lab 2 = Raft). The modern course (2024–2026) uses Lab 3 = Raft. This guide uses the modern numbering throughout. The lecture content is identical.

Required for Lab 1 (MapReduce)

Context for Replication (Watch Before Labs 2–3)

Required for Lab 3 (Raft) — Most Important Lectures

+

Essential bonus: Also watch Diego Ongaro's 2014 Raft talk — the Raft author explains the subtle parts (election restriction, commit safety) far better than any blog post. Watch it before starting Lab 3.

Required for Lab 4 (Fault-Tolerant KV)

Required for Lab 5 (Sharded KV)

Enrichment (Watch After Labs or During Week 12)

Project Setup & Workspace Layout

# Cargo.toml (workspace root)
[workspace]
members = [
    "common",          # shared types: ApplyMsg, RPC, Persister
    "lab1-mapreduce",
    "lab2-kvsrv",
    "lab3-raft",
    "lab4-kvraft",
    "lab5-shardkv",
]
resolver = "2"

[workspace.dependencies]
tokio      = { version = "1", features = ["full"] }
serde      = { version = "1", features = ["derive"] }
serde_json = "1"
bincode    = "1"
rand       = "0.8"
tracing    = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

The common Crate — What Goes in It

This is your shared foundation used by every other lab. It replaces Go's labrpc and labgob packages.

// common/src/lib.rs
pub mod labrpc;    // fake network for testing
pub mod persister; // in-memory or file-backed durable storage
pub mod apply_msg; // ApplyMsg sent from Raft to KV state machine
pub mod errors;    // KvError, RaftError enums

ApplyMsg — The Raft → App Channel

This is how Raft tells the application layer that a log entry has been committed or a snapshot has arrived. Every lab from 3 onward uses this type.

// common/src/apply_msg.rs
use serde::{Serialize, Deserialize};

#[derive(Debug, Clone)]
pub enum ApplyMsg {
    /// A committed log entry. The KV server applies this to its state machine.
    Command {
        valid:        bool,
        command:      Vec<u8>,   // bincode-encoded Op
        command_index: u64,
        command_term:  u64,
    },
    /// A snapshot installed by Raft (from InstallSnapshot RPC).
    Snapshot {
        valid:          bool,
        snapshot:       Vec<u8>, // raw snapshot bytes
        snapshot_term:  u64,
        snapshot_index: u64,
    },
}

Persister — Durable State

Raft needs to save current_term, voted_for, and log to stable storage. The Persister trait abstracts over an in-memory store (for tests) or a real file (for production).

// common/src/persister.rs
use std::sync::{Arc, Mutex};

pub trait Persister: Send + Sync {
    fn read_raft_state(&self) -> Vec<u8>;
    fn save_raft_state(&self, state: Vec<u8>);
    fn read_snapshot(&self) -> Vec<u8>;
    fn save_state_and_snapshot(&self, state: Vec<u8>, snapshot: Vec<u8>);
    fn raft_state_size(&self) -> usize;
}

/// In-memory persister for tests
#[derive(Default)]
pub struct MemPersister {
    inner: Mutex<MemPersisterInner>,
}

#[derive(Default)]
struct MemPersisterInner {
    raft_state: Vec<u8>,
    snapshot:   Vec<u8>,
}

impl Persister for MemPersister {
    fn read_raft_state(&self) -> Vec<u8> {
        self.inner.lock().unwrap().raft_state.clone()
    }
    fn save_raft_state(&self, state: Vec<u8>) {
        self.inner.lock().unwrap().raft_state = state;
    }
    fn read_snapshot(&self) -> Vec<u8> {
        self.inner.lock().unwrap().snapshot.clone()
    }
    fn save_state_and_snapshot(&self, state: Vec<u8>, snapshot: Vec<u8>) {
        let mut inner = self.inner.lock().unwrap();
        inner.raft_state = state;
        inner.snapshot   = snapshot;
    }
    fn raft_state_size(&self) -> usize {
        self.inner.lock().unwrap().raft_state.len()
    }
}

Building the Fake Network (labrpc Replacement)

The Go labs use a custom labrpc package that can drop messages, delay them, and simulate partitions. You need to build this in Rust. This is the most important piece of test infrastructure.

// common/src/labrpc.rs
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, oneshot};
use tokio::time::Duration;
use rand::Rng;

/// One RPC in-flight
pub struct Envelope {
    pub method:   String,
    pub args:     Vec<u8>,
    pub reply_tx: oneshot::Sender<Option<Vec<u8>>>,
}

struct NetworkConfig {
    reliable:    bool,
    long_delays: bool,
    connected:   Vec<bool>,   // per-peer connectivity
}

pub struct Network {
    config:    Arc<Mutex<NetworkConfig>>,
    endpoints: Arc<Mutex<HashMap<usize, mpsc::Sender<Envelope>>>>,
}

impl Network {
    pub fn new(n: usize) -> Self {
        Network {
            config: Arc::new(Mutex::new(NetworkConfig {
                reliable:    true,
                long_delays: false,
                connected:   vec![true; n],
            })),
            endpoints: Arc::new(Mutex::new(HashMap::new())),
        }
    }

    pub fn set_reliable(&self, v: bool) { self.config.lock().unwrap().reliable = v; }
    pub fn disconnect(&self, peer: usize) { self.config.lock().unwrap().connected[peer] = false; }
    pub fn connect(&self, peer: usize)    { self.config.lock().unwrap().connected[peer] = true; }

    /// Call an RPC, respecting network config (drops, delays)
    pub async fn call(&self, to: usize, method: &str, args: Vec<u8>) -> Option<Vec<u8>> {
        let (reliable, connected, long_delays) = {
            let c = self.config.lock().unwrap();
            (c.reliable, c.connected[to], c.long_delays)
        };

        if !connected { return None; }

        // Simulate unreliable network: ~10% drop
        if !reliable && rand::thread_rng().gen_bool(0.1) { return None; }

        // Random short delay on unreliable network
        if !reliable {
            let ms = rand::thread_rng().gen_range(0..27);
            tokio::time::sleep(Duration::from_millis(ms)).await;
        }

        // Long delay scenario (simulates slow network)
        if long_delays && rand::thread_rng().gen_bool(0.1) {
            tokio::time::sleep(Duration::from_millis(500)).await;
        }

        let tx_opt = self.endpoints.lock().unwrap().get(&to).cloned();
        if let Some(tx) = tx_opt {
            let (reply_tx, reply_rx) = oneshot::channel();
            let env = Envelope { method: method.to_string(), args, reply_tx };
            let _ = tx.send(env).await;
            reply_rx.await.ok().flatten()
        } else { None }
    }
}
💡

Two RPC strategies: Use the channel-based fake network above for all labs (easiest, fastest tests). Alternatively, use tonic/gRPC over real TCP for production realism. Channel-based is strongly recommended — it lets you simulate partitions and drops in milliseconds without OS-level tricks.

Lab 1: MapReduce

Read before starting: MapReduce paper (2004) · Watch Lectures 1–2 · Read lab-mr.html

Mental Model

Imagine you need to count every word in a library of 10,000 books. One person would take forever. Instead, you hire a coordinator (head librarian) and 10 workers (assistants).

  • Map phase: The coordinator gives each worker a book. Each worker produces a list of (word, 1) pairs, sorted into buckets by first letter.
  • Reduce phase: The coordinator assigns each bucket to a worker. Each worker sums up all the counts for each word in their bucket.
  • Fault tolerance: If a worker disappears for 10 seconds, the coordinator re-assigns their task to someone else. Workers must write results atomically (temp file then rename) so partial output is never visible.

That is the entire lab. The coordinator is a state machine tracking task status. Workers are a loop: ask for a task, do it, report back, repeat.

Architecture

One Coordinator process manages task state. N Worker processes run in parallel, polling the coordinator for tasks. In real systems workers run on different machines; here they're different processes on the same machine talking via Unix socket or TCP.

// Task types
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum Task {
    Map    { id: usize, filename: String, n_reduce: usize },
    Reduce { id: usize, n_map: usize },
    Wait,   // All tasks in-progress, try again soon
    Done,   // All tasks complete, worker may exit
}

// Coordinator task tracking
#[derive(Debug, Clone, PartialEq)]
enum TaskStatus {
    Idle,
    InProgress { assigned_at: std::time::Instant },
    Completed,
}

pub struct Coordinator {
    n_reduce:      usize,
    map_tasks:     Vec<TaskStatus>,
    reduce_tasks:  Vec<TaskStatus>,
    input_files:   Vec<String>,
    phase:         Phase,
}

#[derive(PartialEq)]
enum Phase { Map, Reduce, Done }
Implement request_task() on the Coordinator

Scan map tasks first. Assign idle ones or re-assign InProgress ones older than 10 seconds. If all map tasks are Done, transition to Reduce phase. Return Task::Wait if all are InProgress but not done yet.

Worker Map loop

Read the input file, call the user's map_fn(filename, contents) → Vec<(String, String)>. Partition key-value pairs into N buckets using hash(key) % n_reduce. Write mr-{map_id}-{bucket} files as JSON lines. Use atomic rename (write to temp file, then rename).

Worker Reduce loop

Read all mr-*-{reduce_id} files. Merge, sort by key, call reduce_fn(key, values) → String for each unique key. Write final output to mr-out-{reduce_id}. Also atomic rename.

Implement the MrApp trait

Define a trait for user-provided map/reduce functions. Implement a WordCount app for testing.

// User-provided map/reduce functions as a trait object
pub trait MrApp: Send + Sync {
    fn map(&self, filename: &str, contents: &str) -> Vec<(String, String)>;
    fn reduce(&self, key: &str, values: &[String]) -> String;
}

// Word count implementation
pub struct WordCount;
impl MrApp for WordCount {
    fn map(&self, _: &str, contents: &str) -> Vec<(String, String)> {
        contents.split_whitespace()
            .map(|w| (w.to_lowercase(), "1".to_string()))
            .collect()
    }
    fn reduce(&self, _: &str, values: &[String]) -> String {
        values.len().to_string()
    }
}

// Hash key to reduce bucket
fn ihash(key: &str) -> usize {
    use std::hash::{Hash, Hasher, DefaultHasher};
    let mut h = DefaultHasher::new();
    key.hash(&mut h);
    (0x7fff_ffff & h.finish()) as usize
}
+

You're done with Lab 1 when: Your word-count, indexer, map-parallelism, reduce-parallelism, job-count, early-exit, and crash recovery tests all pass. Run them 50+ times in a row — MapReduce bugs are usually deterministic, so fewer iterations suffice here.

Lab 2: Single-Machine KV Server

Read before starting: Watch Lectures 3–4 · Read lab-kvsrv1.html

This lab teaches exactly-once semantics over an unreliable network without replication. It's the foundation for Lab 4's deduplication logic.

Mental Model

You order food via a delivery app. You tap "Place Order." The screen spins... and times out. Did the order go through? You tap again. Now you might have two orders. This is the exactly-once problem over an unreliable network.

The solution: every order has a unique ID. The restaurant checks: "Did I already process order #42?" If yes, return the cached result without doing it again. If no, process it and remember that you did.

What happens when the network eats your reply
  Client                          Server
    |                                |
    |--- Put(x,1) [seq=5] -------->|  Server applies Put(x,1)
    |                                |  Server caches: client_42, seq=5, reply="ok"
    |         (reply lost!)          |
    |                                |
    |--- Put(x,1) [seq=5] -------->|  Server sees: seq=5 already processed!
    |                                |  Returns cached "ok" (does NOT re-apply)
    |<-------- "ok" ----------------|
    |                                |

Key invariant: The client increments its sequence number only after receiving a successful reply. The server stores the last-processed sequence number per client. This pattern is reused in Labs 4 and 5.

The Problem

Client sends Put("x","1"). Network drops the reply. Client retries. Without deduplication, the value is set twice (or an Append runs twice). The server must be idempotent: applying the same operation twice must produce the same result as applying it once.

use std::collections::HashMap;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum Op {
    Get    { key: String },
    Put    { key: String, value: String },
    Append { key: String, value: String },
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Request {
    pub client_id: u64,
    pub seq_num:   u64,   // monotonically increasing per client
    pub op:        Op,
}

pub struct KvServer {
    data:  HashMap<String, String>,
    /// client_id → (last_processed_seq, cached_reply)
    dedup: HashMap<u64, (u64, String)>,
}

impl KvServer {
    pub fn apply(&mut self, req: Request) -> String {
        // Check dedup table
        if let Some((last_seq, reply)) = self.dedup.get(&req.client_id) {
            if req.seq_num == *last_seq { return reply.clone(); }
            if req.seq_num < *last_seq  { panic!("seq went backwards"); }
        }

        let result = match &req.op {
            Op::Get    { key }       => self.data.get(key).cloned().unwrap_or_default(),
            Op::Put    { key, value } => { self.data.insert(key.clone(), value.clone()); String::new() },
            Op::Append { key, value } => {
                let old = self.data.entry(key.clone()).or_default();
                old.push_str(value);
                old.clone()  // return value after append
            }
        };

        self.dedup.insert(req.client_id, (req.seq_num, result.clone()));
        result
    }
}

// Client-side: increment seq_num only after receiving reply
pub struct Clerk {
    client_id:   u64,
    seq_num:     u64,
    server_addr: String,
}

impl Clerk {
    pub async fn put(&mut self, key: String, value: String) {
        loop {  // retry until we get a reply
            let req = Request { client_id: self.client_id, seq_num: self.seq_num,
                                  op: Op::Put { key: key.clone(), value: value.clone() } };
            if let Ok(_) = self.rpc_call(req).await {
                self.seq_num += 1;  // only advance after successful reply
                return;
            }
            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
        }
    }
}
+

You're done with Lab 2 when: Concurrent Put/Append/Get operations with an unreliable network pass 100+ times. Verify that no duplicate applies occur — add a counter in your test that detects if an Append was applied twice.

Lab 3: Raft Consensus Algorithm

Read before starting: Raft extended paper (Ongaro 2014) — read Figure 2 as your implementation spec. Watch Lectures 6–7. Read the Students' Guide to Raft. Re-read Figure 2. Implement it exactly.

Mental Model

Raft is a protocol for getting N servers to agree on the same sequence of commands, even when some servers crash and restart. It works like a meeting with a chairperson:

  • One leader proposes all decisions. Everyone else (followers) just listens and records.
  • If the leader disappears, the group holds an election to pick a new one. Whoever has the most complete notes wins.
  • A decision is committed only when a majority has recorded it. Even if some servers crash, the committed decisions survive.
  • Each meeting has a term number. If someone shows up claiming to be leader from an old term, they are ignored.
Raft Node State Machine
                    times out,             receives majority
                    starts election         of votes
            ┌───────────────────┐    ┌─────────────────────┐
            │                   v    │                      v
        ┌───────────┐    ┌────────────────┐    ┌────────────────┐
   ──>  │ FOLLOWER  │    │   CANDIDATE    │    │     LEADER     │
        └───────────┘    └────────────────┘    └────────────────┘
            ^                   │                      │
            │   discovers       │  discovers           │
            │   higher term     │  higher term         │
            └───────────────────┘──────────────────────┘

The four sub-labs map to four capabilities: 3A = elections (picking a leader), 3B = log replication (the leader distributing decisions), 3C = persistence (surviving crashes), 3D = log compaction (not running out of memory).

!!

This is the hardest lab. Most bugs are off-by-one log index errors or wrong term comparisons. Plan for 3–4 weeks. Run every test 500+ times before declaring it done. Raft bugs are intermittent — passing once means nothing.

Core State Structure

use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;

#[derive(Debug, Clone, PartialEq)]
pub enum Role { Follower, Candidate, Leader }

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct LogEntry {
    pub term:    u64,
    pub command: Vec<u8>,
}

pub struct RaftState {
    // ── Persistent (must survive crash) ──────────────────
    pub current_term: u64,
    pub voted_for:    Option<usize>,
    pub log:          Vec<LogEntry>,  // index 0 is dummy (term=0)

    // ── Volatile ─────────────────────────────────────────
    pub commit_index:  u64,
    pub last_applied:  u64,
    pub role:          Role,

    // ── Leader only (reset on election) ──────────────────
    pub next_index:    Vec<u64>,  // next log index to send to each peer
    pub match_index:   Vec<u64>,  // highest log index known replicated on peer

    // ── Snapshot state (Lab 3D) ───────────────────────────
    pub last_snap_index: u64,
    pub last_snap_term:  u64,
}

pub struct Raft {
    me:        usize,
    n_peers:   usize,
    state:     Arc<Mutex<RaftState>>,
    persister: Arc<dyn Persister>,
    apply_tx:  mpsc::Sender<ApplyMsg>,
    network:   Arc<Network>,
}

Log Indexing with Snapshots — The #1 Source of Off-by-One Bugs

Before Lab 3D (snapshots), the log Vec index equals the Raft log index (with a dummy at position 0). After snapshots, the start of the Vec no longer corresponds to log index 0. You have discarded the first N entries. This means every log access must account for the offset:

Log Indexing With Snapshots
  Raft log indices:  1   2   3   4   5   6   7   8   9   10
                    [a] [b] [c] [d] [e] [f] [g] [h] [i] [j]
                     └── snapshotted ──┘  └── in self.log ──┘

  last_snap_index = 5
  self.log = [f, g, h, i, j]   (Vec indices 0..4)

  To convert Raft index → Vec index:
    vec_idx = raft_index - last_snap_index - 1

  To convert Vec index → Raft index:
    raft_index = vec_idx + last_snap_index + 1

  Last raft index = self.log.len() as u64 + last_snap_index

Apply this conversion everywhere you access self.log[...]. The three most common bugs:

  • Forgetting to subtract last_snap_index when indexing into the log Vec
  • Forgetting to add last_snap_index when computing the last log index
  • Accessing self.log[0] and expecting it to be log index 1 (after a snapshot, it is log index last_snap_index + 1)

Lab 3A: Leader Election

📄 Figure 2 — RequestVote RPC (implement exactly as written)
Arguments sent by candidate
  • term — candidate's current term
  • candidate_id — candidate requesting vote
  • last_log_index — index of candidate's last log entry
  • last_log_term — term of candidate's last log entry
Receiver rules (vote granting)
  • Reply false if term < currentTerm
  • Grant vote if votedFor is null or candidateId, AND candidate's log is at least as up-to-date as receiver's log
  • "At least as up-to-date": compare last log terms first; if equal, compare length (§5.4.1)
All servers: if RPC request/response contains term > currentTerm
  • Set currentTerm = term, convert to follower
// RequestVote RPC types
#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub struct RequestVoteArgs {
    pub term:           u64,
    pub candidate_id:   usize,
    pub last_log_index: u64,
    pub last_log_term:  u64,
}

#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub struct RequestVoteReply {
    pub term:         u64,
    pub vote_granted: bool,
}

impl Raft {
    // Handler called by receiver
    pub fn handle_request_vote(&self, args: RequestVoteArgs) -> RequestVoteReply {
        let mut state = self.state.lock().unwrap();

        // If higher term seen, update and convert to follower
        if args.term > state.current_term {
            state.current_term = args.term;
            state.voted_for    = None;
            state.role         = Role::Follower;
        }

        if args.term < state.current_term {
            return RequestVoteReply { term: state.current_term, vote_granted: false };
        }

        // Check if we already voted for someone else this term
        let can_vote = state.voted_for.is_none()
            || state.voted_for == Some(args.candidate_id);

        // Check candidate log is at least as up-to-date
        let my_last_term  = state.log.last().map_or(state.last_snap_term, |e| e.term);
        let my_last_index = state.log.len() as u64 + state.last_snap_index;
        let up_to_date = args.last_log_term > my_last_term
            || (args.last_log_term == my_last_term && args.last_log_index >= my_last_index);

        if can_vote && up_to_date {
            state.voted_for = Some(args.candidate_id);
            self.persist(&state);       // MUST persist before replying
            self.election_timer.reset();  // reset timer on granting vote
            RequestVoteReply { term: state.current_term, vote_granted: true }
        } else {
            RequestVoteReply { term: state.current_term, vote_granted: false }
        }
    }

    // Kick off election (called when election timer fires)
    pub async fn start_election(raft: Arc<Mutex<RaftState>>, me: usize, network: Arc<Network>, n: usize) {
        let (term, args) = {
            let mut s = raft.lock().unwrap();
            s.current_term += 1;
            s.role          = Role::Candidate;
            s.voted_for     = Some(me);
            // persist() here
            (s.current_term, RequestVoteArgs {
                term: s.current_term, candidate_id: me,
                last_log_index: s.log.len() as u64 + s.last_snap_index,
                last_log_term:  s.log.last().map_or(s.last_snap_term, |e| e.term),
            })
        }; // drop lock here — never hold across await

        let mut votes = 1usize; // vote for self
        let mut set = tokio::task::JoinSet::new();

        for peer in 0..n {
            if peer == me { continue; }
            let (net, a) = (network.clone(), args.clone());
            set.spawn(async move {
                let bytes = bincode::serialize(&a).unwrap();
                net.call(peer, "RequestVote", bytes).await
            });
        }

        while let Some(Ok(Some(bytes))) = set.join_next().await {
            let reply: RequestVoteReply = bincode::deserialize(&bytes).unwrap();
            let mut s = raft.lock().unwrap();
            if reply.term > s.current_term {
                s.current_term = reply.term; s.role = Role::Follower; return;
            }
            if s.role != Role::Candidate || s.current_term != term { return; }
            if reply.vote_granted { votes += 1; }
            if votes > n / 2 { // majority!
                s.role = Role::Leader;
                // Initialize leader state, send heartbeats
                break;
            }
        }
    }
}

Leader Heartbeat Loop (Critical — Must Implement)

The leader must send periodic empty AppendEntries (heartbeats) to all peers to prevent them from starting elections. Without this, followers time out and the cluster falls apart. This runs as a background task for the entire time a node is leader.

async fn heartbeat_loop(
    raft: Arc<Mutex<RaftState>>,
    me: usize,
    n_peers: usize,
    network: Arc<Network>,
) {
    let mut interval = tokio::time::interval(
        Duration::from_millis(100)  // must be < election timeout (150–300ms)
    );
    loop {
        interval.tick().await;

        let (term, role, entries_per_peer) = {
            let s = raft.lock().unwrap();
            if s.role != Role::Leader { return; }
            // Build AppendEntries args for each peer
            let mut per_peer = Vec::new();
            for peer in 0..n_peers {
                if peer == me { continue; }
                let next = s.next_index[peer];
                let prev_idx = next - 1;
                let prev_term = if prev_idx == s.last_snap_index {
                    s.last_snap_term
                } else if prev_idx > s.last_snap_index {
                    s.log[(prev_idx - s.last_snap_index - 1) as usize].term
                } else { 0 };
                // Include new entries if any
                let start = (next - s.last_snap_index - 1) as usize;
                let entries = s.log[start..min(s.log.len(), start + 100)].to_vec();
                per_peer.push((peer, AppendEntriesArgs {
                    term: s.current_term, leader_id: me,
                    prev_log_index: prev_idx, prev_log_term: prev_term,
                    entries, leader_commit: s.commit_index,
                }));
            }
            (s.current_term, s.role.clone(), per_peer)
        }; // lock dropped — never hold across await

        for (peer, args) in entries_per_peer {
            let (net, raft2) = (network.clone(), raft.clone());
            tokio::spawn(async move {
                let bytes = bincode::serialize(&args).unwrap();
                if let Some(reply_bytes) = net.call(peer, "AppendEntries", bytes).await {
                    let reply: AppendEntriesReply = bincode::deserialize(&reply_bytes).unwrap();
                    // Handle reply: update next_index/match_index, advance commit
                    // (see handle_ae_failure and maybe_advance_commit in 3B)
                }
            });
        }
    }
}
+

You're done with Lab 3A when: test_initial_election, test_reelection, and test_many_elections pass 500+ times. Leaders should be elected within a few seconds. If elections take more than 5s, your timer randomization range is wrong.

Lab 3B: Log Replication

📄 Figure 2 — AppendEntries RPC (implement exactly as written)
Arguments sent by leader
  • term, leader_id, leader_commit
  • prev_log_index — index of log entry immediately preceding new ones
  • prev_log_term — term of prev_log_index entry
  • entries[] — log entries to store (empty = heartbeat)
Receiver rules
  • Reply false if term < currentTerm
  • Reply false if log doesn't contain entry at prev_log_index with prev_log_term
  • If existing entry conflicts (same index, different term), delete it and all following entries
  • Append any new entries not already in the log
  • If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, last new entry index)

Fast Backup Optimization (Critical for 3B)

When a follower rejects AppendEntries, the naive approach decrements next_index[i] by 1 each time — O(N) round trips. The paper's optimization (§5.3 bottom of page 7) allows backing up by an entire term at once:

// Reply includes extra info for fast backup
#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub struct AppendEntriesReply {
    pub term:    u64,
    pub success: bool,
    // Fast backup fields (only meaningful on failure)
    pub conflict_term:  Option<u64>,   // term in the conflicting entry
    pub conflict_index: u64,            // first index in conflict_term
}

// Leader: on AppendEntries rejection, adjust next_index
fn handle_ae_failure(state: &mut RaftState, peer: usize, reply: &AppendEntriesReply) {
    match reply.conflict_term {
        None => {
            // Follower log is too short — jump to its last entry
            state.next_index[peer] = reply.conflict_index + 1;
        }
        Some(ct) => {
            // Find last entry in leader log with conflict_term
            let last_in_ct = state.log.iter().rposition(|e| e.term == ct);
            match last_in_ct {
                Some(idx) => state.next_index[peer] = idx as u64 + 2,
                None      => state.next_index[peer] = reply.conflict_index,
            }
        }
    }
    state.next_index[peer] = state.next_index[peer].max(state.last_snap_index + 1);
}

// Commit index advancement (leader only)
// RULE: Only advance commit on entries from CURRENT term (Figure 8)
fn maybe_advance_commit(state: &mut RaftState, n: usize) {
    let base = state.last_snap_index;
    for n_idx in (state.commit_index + 1..=state.log.len() as u64 + base).rev() {
        let log_idx = (n_idx - base - 1) as usize;
        // Only commit entries from current term!
        if state.log[log_idx].term != state.current_term { break; }
        let replicated = state.match_index.iter().filter(|&&m| m >= n_idx).count() + 1;
        if replicated > n / 2 {
            state.commit_index = n_idx;
            break;
        }
    }
}

Applier Loop — Sending Committed Entries to the State Machine

This is the glue between Raft and the application (KV server in Lab 4). A background task watches for commit_index > last_applied and sends committed entries via apply_tx. Without this, nothing actually gets applied.

async fn applier_loop(
    raft: Arc<Mutex<RaftState>>,
    apply_tx: mpsc::Sender<ApplyMsg>,
    snap_index: Arc<std::sync::atomic::AtomicU64>,
) {
    loop {
        tokio::time::sleep(Duration::from_millis(10)).await;

        let msgs = {
            let mut s = raft.lock().unwrap();
            let mut msgs = Vec::new();

            while s.last_applied < s.commit_index {
                s.last_applied += 1;
                let idx = s.last_applied;
                let log_offset = (idx - s.last_snap_index - 1) as usize;

                if log_offset >= s.log.len() { break; }
                let entry = &s.log[log_offset];
                msgs.push(ApplyMsg::Command {
                    valid: true,
                    command: entry.command.clone(),
                    command_index: idx,
                    command_term: entry.term,
                });
            }
            msgs
        }; // lock dropped before sending

        // Send outside lock — apply_tx.send() may block
        for msg in msgs {
            let _ = apply_tx.send(msg).await;
        }
    }
}
!

Never send on apply_tx while holding the Mutex. The receiver (KV server) may need to call back into Raft (e.g., to take a snapshot), causing a deadlock. Collect messages under the lock, drop it, then send.

+

You're done with Lab 3B when: All basic agreement, fail-agree, concurrent-starts, rejoin, backup, and count tests pass 500+ times. The backup test is especially important — it verifies fast log backtracking works.

Lab 3C: Persistence

Raft must survive crashes. Three fields must be saved to the Persister before replying to any RPC or sending any RPC reply: current_term, voted_for, log.

#[derive(serde::Serialize, serde::Deserialize)]
struct PersistData {
    current_term: u64,
    voted_for:    Option<usize>,
    log:          Vec<LogEntry>,
    last_snap_index: u64,
    last_snap_term:  u64,
}

impl Raft {
    fn persist(&self, s: &RaftState) {
        let data = PersistData {
            current_term:    s.current_term,
            voted_for:       s.voted_for,
            log:             s.log.clone(),
            last_snap_index: s.last_snap_index,
            last_snap_term:  s.last_snap_term,
        };
        let bytes = bincode::serialize(&data).unwrap();
        self.persister.save_raft_state(bytes);
    }

    fn restore(&self, s: &mut RaftState) {
        let bytes = self.persister.read_raft_state();
        if bytes.is_empty() { return; }
        if let Ok(data) = bincode::deserialize::<PersistData>(&bytes) {
            s.current_term    = data.current_term;
            s.voted_for       = data.voted_for;
            s.log             = data.log;
            s.last_snap_index = data.last_snap_index;
            s.last_snap_term  = data.last_snap_term;
            s.last_applied    = data.last_snap_index;
            s.commit_index    = data.last_snap_index;
        }
    }
}

// CALL persist() at EVERY place you modify these fields:
// - current_term  (any term update)
// - voted_for     (when granting a vote)
// - log           (when appending entries)
// 
// Where to call persist():
// ✅ handle_request_vote (before reply when granting vote)
// ✅ handle_append_entries (before reply, after modifying log)
// ✅ start() — when leader appends to its own log
// ✅ Any code that changes current_term

Lab 3D: Log Compaction (Snapshots)

The log grows forever without compaction. The application takes a snapshot of its state and tells Raft to discard log entries before a certain index. Lagging followers receive snapshots via InstallSnapshot RPC.

// Application (KvServer) calls this when it wants to snapshot
impl Raft {
    pub fn snapshot(&self, snap_index: u64, snap_data: Vec<u8>) {
        let mut s = self.state.lock().unwrap();
        if snap_index <= s.last_snap_index { return; }

        let offset = (snap_index - s.last_snap_index) as usize;
        if offset > s.log.len() { return; }

        s.last_snap_term  = s.log[offset - 1].term;
        s.last_snap_index = snap_index;
        s.log.drain(..offset);  // discard compacted entries

        let raft_state = bincode::serialize(&PersistData {
            current_term:    s.current_term,
            voted_for:       s.voted_for,
            log:             s.log.clone(),
            last_snap_index: s.last_snap_index,
            last_snap_term:  s.last_snap_term,
        }).unwrap();
        drop(s);
        self.persister.save_state_and_snapshot(raft_state, snap_data);
    }
}

// InstallSnapshot RPC args
#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub struct InstallSnapshotArgs {
    pub term:               u64,
    pub leader_id:          usize,
    pub last_included_index: u64,
    pub last_included_term:  u64,
    pub data:               Vec<u8>,  // entire snapshot
}

// Receiver: install snapshot, send ApplyMsg::Snapshot to app
pub fn handle_install_snapshot(&self, args: InstallSnapshotArgs) -> (u64,) {
    let mut s = self.state.lock().unwrap();
    if args.term < s.current_term { return (s.current_term,); }
    if args.term > s.current_term {
        s.current_term = args.term; s.role = Role::Follower; s.voted_for = None;
    }
    if args.last_included_index <= s.last_snap_index { return (s.current_term,); }

    // Discard entire log if snapshot covers it, or trim
    let snap_end = (args.last_included_index - s.last_snap_index) as usize;
    if snap_end <= s.log.len() && s.log[snap_end-1].term == args.last_included_term {
        s.log.drain(..snap_end);
    } else { s.log.clear(); }

    s.last_snap_index = args.last_included_index;
    s.last_snap_term  = args.last_included_term;
    s.commit_index    = s.commit_index.max(args.last_included_index);
    s.last_applied    = s.last_applied.max(args.last_included_index);

    let snap_data = args.data.clone();
    let cur_term  = s.current_term;
    drop(s);

    // Notify app to apply snapshot
    let _ = self.apply_tx.try_send(ApplyMsg::Snapshot {
        valid: true, snapshot: snap_data,
        snapshot_index: args.last_included_index,
        snapshot_term:  args.last_included_term,
    });
    (cur_term,)
}
+

You're done with Lab 3C+3D when: All persistence tests (crash, restart, re-elect, continue) and snapshot tests (install-snapshot, snapshot-size) pass 500+ times. Pay special attention to the Figure 8 unreliable test — it catches commit safety bugs.

Lab 4: Fault-Tolerant KV (Raft-backed)

Read before starting: Watch Lectures 8–9 · Read lab-kvraft1.html

Mental Model

You now combine Labs 2 and 3 into one system. Every client operation flows through this pipeline:

Lab 4 Request Flow
  Client                KV Server (Leader)           Raft Layer           KV Servers (All)
    |                        |                          |                      |
    |--- Put(x,1) --------->|                          |                      |
    |                        |--- raft.start(cmd) ---->|                      |
    |                        |                          |-- AppendEntries --->|
    |                        |                          |<-- majority ACK ----|
    |                        |                          |                      |
    |                        |<-- ApplyMsg(cmd,idx) ----|-- ApplyMsg -------->|
    |                        |                          |                      |
    |                        | apply to state machine   |  apply to state machine
    |                        | data["x"] = "1"          |  data["x"] = "1"    |
    |                        | check dedup table        |                      |
    |                        | wake pending client      |                      |
    |<------ "ok" ----------|                          |                      |

Key insight: The KV server never modifies its state directly. It submits every operation to Raft, waits for Raft to commit it, then applies it from the apply_ch. This ensures all replicas apply operations in the same order. The dedup table from Lab 2 prevents duplicate applies across leader changes.

Plug your Lab 3 Raft into a KV state machine. Every Put/Get/Append goes through Raft consensus before being applied. The cluster survives minority node failures.

use std::collections::HashMap;
use tokio::sync::{mpsc, oneshot};

pub struct KvServer {
    raft:         Raft,
    me:           usize,
    data:         HashMap<String, String>,
    dedup:        HashMap<u64, (u64, String)>, // client_id → (seq, reply)
    // log_index → waker waiting for that index to be applied
    pending:      HashMap<u64, oneshot::Sender<Result<String, KvError>>>,
    max_raft_state: Option<usize>,  // trigger snapshot at this size
    last_applied: u64,
}

#[derive(Debug)]
pub enum KvError { WrongLeader, Timeout, Internal }

impl KvServer {
    // RPC handler — called by client
    pub async fn handle_op(server: Arc<Mutex<KvServer>>, req: Request)
        -> Result<String, KvError>
    {
        // Submit to Raft
        let (index, rx) = {
            let mut s = server.lock().unwrap();
            let cmd = bincode::serialize(&req).unwrap();
            match s.raft.start(cmd) {
                Err(_) => return Err(KvError::WrongLeader),
                Ok((idx, _term)) => {
                    let (tx, rx) = oneshot::channel();
                    s.pending.insert(idx, tx);
                    (idx, rx)
                }
            }
        };

        // Wait for Raft to apply, with timeout
        tokio::time::timeout(
            std::time::Duration::from_millis(500),
            rx
        ).await
            .map_err(|_| KvError::Timeout)?
            .map_err(|_| KvError::Internal)?
    }
}

// Background applier — consumes apply_ch from Raft
async fn applier(server: Arc<Mutex<KvServer>>, mut rx: mpsc::Receiver<ApplyMsg>) {
    while let Some(msg) = rx.recv().await {
        let mut s = server.lock().unwrap();
        match msg {
            ApplyMsg::Command { command_index, command, .. } => {
                if command_index <= s.last_applied { continue; } // stale
                s.last_applied = command_index;

                let req: Request = bincode::deserialize(&command).unwrap();

                // Check duplicate
                let result = if let Some((last_seq, cached)) = s.dedup.get(&req.client_id) {
                    if req.seq_num == *last_seq { cached.clone() }
                    else { s.apply_and_cache(&req) }
                } else { s.apply_and_cache(&req) };

                // Wake waiting client handler
                if let Some(tx) = s.pending.remove(&command_index) {
                    let _ = tx.send(Ok(result));
                }

                // Lab 4B: trigger snapshot if log is too large
                if let Some(max_sz) = s.max_raft_state {
                    if s.raft.persister_size() > max_sz {
                        let snap = s.encode_snapshot();
                        s.raft.snapshot(command_index, snap);
                    }
                }
            }
            ApplyMsg::Snapshot { snapshot, snapshot_index, .. } => {
                s.install_snapshot(snapshot, snapshot_index);
            }
        }
    }
}
⚠️

Leader redirect: If raft.start() returns NotLeader, return WrongLeader to the client. The client tries every server in a round-robin until it finds the current leader. Cache the last known leader index for faster retries.

+

You're done with Lab 4 when: All basic, concurrent, unreliable, snapshot, and linearizability tests pass 200+ times. Lab 4B snapshot tests verify that Raft state size stays bounded after many operations. If it grows unboundedly, your snapshot trigger is not firing.

Lab 5: Sharded Key/Value Service

Read before starting: Watch Lectures 12–13, 16 · Read lab-shard.html

Mental Model

One Raft group (Lab 4) cannot handle all the data — it is limited by the leader's throughput. The solution: shard the data. Split all keys into 10 buckets (shards) using a hash function. Each bucket is owned by a different Raft group. Now you have 10x the throughput.

But who decides which group owns which shard? A dedicated shard controller (itself a Raft group) manages a sequence of configurations. When groups join or leave, the controller rebalances shards, and groups must migrate shard data to their new owners.

Lab 5 Architecture
  Client                 Shard Controller (Raft group)
    |                           |
    | key2shard("x") = 3       | Config 7: shard 3 → Group 101
    |                           |           shard 7 → Group 102
    v                           |           ...
  Group 101 (Raft)              |
  ┌──────────────────┐          |
  │ Shard 0: {...}   │          |
  │ Shard 3: {...} ◄─┼──── client sends Put(x,1) here
  │ Shard 6: {...}   │
  └──────────────────┘
  Group 102 (Raft)
  ┌──────────────────┐
  │ Shard 1: {...}   │
  │ Shard 7: {...}   │
  └──────────────────┘

The hard part is migration. When the controller reassigns shard 3 from Group 101 to Group 102, Group 102 must pull the shard data from Group 101 before it can serve requests for those keys. During migration, requests for shard 3 return "wrong group" and clients retry.

The full distributed database. Keys are divided into NShards = 10 buckets. Multiple replica groups (each running Raft from Lab 3) own subsets of shards. A dedicated shard controller determines the mapping.

Part 5A: Shard Controller

pub const NSHARDS: usize = 10;

#[derive(Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct Config {
    pub num:    u64,                                  // config number (starts at 1)
    pub shards: [u64; NSHARDS],                       // shards[i] = gid owning shard i (0 = unassigned)
    pub groups: HashMap<u64, Vec<String>>,            // gid → server list
}

pub enum CtlOp {
    Join  { gids: HashMap<u64, Vec<String>> },  // add new groups
    Leave { gids: Vec<u64> },                   // remove groups
    Move  { shard: usize, gid: u64 },            // manually assign shard
    Query { num: i64 },                           // -1 = latest config
}

// Rebalance: after join/leave, ensure shards are equally distributed
fn rebalance(shards: &mut [u64; NSHARDS], groups: &HashMap<u64, Vec<String>>) {
    if groups.is_empty() { shards.fill(0); return; }
    let n = groups.len();
    let base   = NSHARDS / n;
    let extras = NSHARDS % n;
    // Sort group ids for determinism
    let mut gids: Vec<u64> = groups.keys().cloned().collect();
    gids.sort();
    let target: HashMap<u64, usize> = gids.iter().enumerate()
        .map(|(i, &g)| (g, base + if i < extras { 1 } else { 0 })).collect();
    // Move shards from over-loaded to under-loaded groups greedily
    // (full greedy rebalance algorithm)
}

Part 5B–5C: Shard State Machine

Each shard has its own lifecycle status. This is the key data structure for migration:

// Per-shard status in the ShardKV server
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum ShardStatus {
    Serving,     // This group owns and is actively serving this shard
    Pulling,     // We need this shard — we're pulling data from old owner
    BePulling,   // Old owner: waiting for new owner to pull our data
    GCing,       // New owner confirmed — safe to delete our copy
}

pub struct Shard {
    pub data:   HashMap<String, String>,
    pub status: ShardStatus,
}

pub struct ShardKvServer {
    raft:        Raft,
    me:          usize,
    gid:         u64,
    shards:      [Shard; NSHARDS],
    config:      Config,        // current config
    last_config: Config,        // previous config (needed for pulling)
    dedup:       HashMap<u64, (u64, String)>,
}

The Four Background Tasks

The ShardKV server runs four background tasks alongside the Raft applier. All four are leader-only (check raft.is_leader() before acting):

Config poller (~100ms)

Query the shard controller for config current_config.num + 1. If found, submit it as a Raft log entry. Only advance one config at a time. Never skip from config 5 to config 7 — you must process config 6 first, because shard migration depends on knowing both the old and new owner.

Shard migration puller (~100ms)

For all shards in Pulling state, contact the old owner group (from last_config). Send a ShardMigration RPC to get the shard data + dedup table. Submit the received data as a Raft log entry on this group. Only pull when all shards in Pulling state can be fetched.

GC notifier (~100ms)

For shards in GCing state, notify the old owner they can delete their copy. The old owner moves shard from BePulling to deleted (via Raft log entry). On success, submit a GC-complete Raft entry to move our shard from GCing to Serving.

Request routing (every client RPC)

Check if this group owns the shard for that key in the current config. Only serve from shards in Serving status. If status is Pulling, BePulling, or GCing, return ErrWrongGroup. The client retries with a refreshed config.

Shard Migration State Machine

Each shard transitions through these states during a config change. This is the most confusing part of Lab 5 — memorize this lifecycle:

Shard Status Transitions (Config N → Config N+1)
  OLD OWNER (Group A):                    NEW OWNER (Group B):

  Config N: Shard 3 = Serving             Config N: Shard 3 = (not owned)
                    |                                     |
       config N+1 applied                    config N+1 applied
                    |                                     |
                    v                                     v
            Shard 3 = BePulling              Shard 3 = Pulling
                    |                                     |
                    |    ◄──── PullShard RPC ────────────  |
                    |    ────── shard data + dedup ─────►  |
                    |                                     |
                    |                            apply MigrationData
                    |                                     |
                    v                                     v
            Shard 3 = BePulling              Shard 3 = GCing
                    |                                     |
                    |    ◄──── GC notification ─────────   |
                    |                                     |
               apply GC                             apply GCDone
                    |                                     |
                    v                                     v
            Shard 3 = (deleted)              Shard 3 = Serving
!!

Critical rule: Do NOT poll for config N+2 until ALL shards are in Serving state under config N+1. If you advance configs while migration is in progress, you will lose shard data. This is the #1 bug in Lab 5.

Part 5B–5D: ShardKV Internal Operations

Unlike Lab 4 where all Raft entries are client ops, Lab 5 has four types of operations going through the Raft log:

// All operations submitted to Raft in the ShardKV server
#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub enum ShardOp {
    /// Normal client operation (Put/Get/Append)
    Client {
        client_id: u64,
        seq_num:   u64,
        key:       String,
        op:        Op,
    },
    /// New configuration detected — update shard ownership
    ConfigChange {
        config: Config,
    },
    /// Shard data pulled from old owner — install it
    MigrationData {
        config_num: u64,
        shard_id:   usize,
        data:       HashMap<String, String>,
        dedup:      HashMap<u64, (u64, String)>,
    },
    /// Old owner confirmed deletion — mark shard as fully serving
    ShardGC {
        config_num: u64,
        shard_id:   usize,
    },
}

Client Clerk for Sharded KV

The client must determine which group to contact for each key:

fn key2shard(key: &str) -> usize {
    let h = ihash(key);
    h % NSHARDS
}

pub struct ShardClerk {
    sm_clerk:  Clerk,         // clerk for shard controller
    config:    Config,        // cached latest config
    client_id: u64,
    seq_num:   u64,
    // Per-group: index of last known leader
    leaders:   HashMap<u64, usize>,
}

impl ShardClerk {
    pub async fn get(&mut self, key: String) -> String {
        loop {
            let shard = key2shard(&key);
            let gid = self.config.shards[shard];

            if gid == 0 {
                // No group owns this shard — refresh config
                self.config = self.sm_clerk.query(-1).await;
                continue;
            }

            let servers = &self.config.groups[&gid];
            let leader = self.leaders.get(&gid).copied().unwrap_or(0);

            // Try each server in the group, starting with cached leader
            for offset in 0..servers.len() {
                let idx = (leader + offset) % servers.len();
                match self.rpc_get(&servers[idx], &key).await {
                    Ok(val) => {
                        self.leaders.insert(gid, idx);
                        return val;
                    }
                    Err(KvError::WrongGroup) => break, // refresh config
                    Err(_) => continue,                // try next server
                }
            }

            // Refresh config from shard controller and retry
            tokio::time::sleep(Duration::from_millis(100)).await;
            self.config = self.sm_clerk.query(-1).await;
        }
    }
}
i

Dedup tables must migrate with shard data. When shard 3 moves from Group A to Group B, the dedup entries for clients that wrote to shard 3 must also move. Otherwise, duplicate detection breaks after migration.

+

You're done with Lab 5 when: All shard controller tests (join, leave, move, multi, minimal-transfers), sharded KV tests (basic, concurrent, unreliable), migration tests, and GC/challenge tests pass 200+ times. The unreliable + migration tests are the hardest — they combine network failures with shard movement. If you pass those, you have built a real distributed database.

Essential Resources (Bookmark These)

These are the most valuable supplementary resources for the labs, ranked by importance. Many students wish they had found these earlier.

MUST READ
Students' Guide to Raft — Jon Gjengset
The single most important supplementary resource for Lab 3. Written by a former 6.824 TA, it covers every common Raft bug: election timer issues, commit safety violations, off-by-one log indexing, and when to persist. Read this before starting Lab 3 and re-read it after every failed test run.
MUST USE
An animated visualization of Raft nodes communicating. Play with it for 30 minutes before writing any Lab 3 code. Click nodes to crash them, watch elections happen, see log replication in real time. It makes the paper concrete.
Print Figure 2. Tape it to your wall. It is the complete specification for your Raft implementation. Every field, every rule, every RPC argument. If your code disagrees with Figure 2, your code is wrong. This is not a suggestion — it is the literal implementation spec.
Official course advice on debugging, locking discipline, and code structure. Covers how to insert logging, how to think about lock ordering, and how to form hypotheses when tests fail. Applicable to the Rust implementation too.
A recent (2026) walkthrough of building Raft in Rust. Covers the specific Rust/Tokio patterns, async pitfalls, and serialization choices. Useful as a sanity check on your architecture.

Quick Links

ResourceURLWhen to Use
Course home pagepdos.csail.mit.edu/6.824Lab specs, papers, schedule
2020 YouTube playlistYouTube playlistAll 20 lectures
Raft paper (extended)PDFLab 3 spec
MapReduce paperPDFLab 1 background
Tokio tutorialtokio.rs/tokio/tutorialAsync Rust primer
Diego Ongaro's Raft talkYouTubeBest Raft explanation, watch before Lab 3

Key Rust Patterns for These Labs

Go → Rust Concurrency Mapping

Go PatternRust / Tokio Equivalent
go func() { ... }()tokio::spawn(async { ... })
time.Sleep(100ms)tokio::time::sleep(Duration::from_millis(100)).await
sync.MutexArc<std::sync::Mutex<T>> (use std, not tokio::sync)
sync.WaitGrouptokio::task::JoinSet
make(chan T, N)tokio::sync::mpsc::channel(N)
select {}tokio::select! {}
sync.Cond + Wait()tokio::sync::Notify
time.NewTimer()tokio::time::sleep_until(Instant) or interval()
atomic.AddInt64std::sync::atomic::AtomicU64

Election Timer Pattern

// Each peer has a reset-able election timer
use tokio::sync::Notify;
use tokio::time::{Duration, sleep};
use rand::Rng;

async fn election_timer_loop(notify: Arc<Notify>, raft: Arc<Mutex<RaftState>>) {
    loop {
        let timeout = Duration::from_millis(rand::thread_rng().gen_range(150..300));

        tokio::select! {
            _ = sleep(timeout) => {
                // Timeout fired — start election if follower/candidate
                let role = raft.lock().unwrap().role.clone();
                if role != Role::Leader {
                    tokio::spawn(start_election(raft.clone()));
                }
            }
            _ = notify.notified() => {
                // Heartbeat received — reset timer (loop again)
            }
        }
    }
}

// In AppendEntries handler, call:
// self.election_notify.notify_one();

Papers to Read (In Order)

PaperYearLabPriority
MapReduce (Dean & Ghemawat)2004Lab 1🔴 Required
The Google File System2003Context🟡 Read it
Raft Extended Paper (Ongaro)2014Lab 3🔴 Required × 3
ZooKeeper (Hunt et al.)2010Context🟡 Read it
Spanner (Corbett et al.)2012Context/Lab4🟠 Recommended
Dynamo (DeCandia et al.)2007Lab 5 context🟠 Recommended
COPS (Lloyd et al.)2011Lec 17🟡 Read it
FaRM (Dragojević et al.)2014Lec 14⚪ Optional
CRAQ (Terrace & Freedman)2009Lec 9🟡 Read it
Aurora (Verbitski et al.)2017Lec 10⚪ Optional
📎

All papers are linked from the schedule page: pdos.csail.mit.edu/6.824/schedule.html. Each lecture lists the paper to read beforehand.

Debugging Distributed Systems in Rust

Structured Logging

// Add to each crate's main or test setup
fn init_tracing() {
    let _ = tracing_subscriber::fmt()
        .with_env_filter("lab3_raft=debug,lab4_kvraft=info")
        .with_thread_ids(true)
        .with_target(true)
        .try_init();
}

// ALWAYS log: term, role, peer index, event name
tracing::debug!(
    term = s.current_term,
    role = ?s.role,
    me = self.me,
    from = args.candidate_id,
    granted = vote_granted,
    "RequestVote"
);

Stress Test Script

Never consider a Raft implementation correct until it passes 500 consecutive test runs. Use this script:

#!/bin/bash
# stress_test.sh — runs a test N times, stops on first failure
# Usage: ./stress_test.sh 500 lab3_raft test_initial_election

COUNT=$1; PKG=$2; TEST=$3

for i in $(seq 1 $COUNT); do
    if ! cargo test --package $PKG $TEST -- --nocapture 2>&1 | tail -1 | grep -q ok; then
        echo "❌ FAILED on run $i"
        exit 1
    fi
    echo "✅ Run $i/$COUNT passed"
done
echo "🎉 All $COUNT runs passed"

Common Bugs Table

BugSymptomFix
Hold Mutex across .awaitCompiler error: MutexGuard is not SendDrop guard before every .await point
Election timer not reset on AppendEntriesMultiple leaders in same termCall notify.notify_one() in ALL valid AE handlers, not just successful ones
Committing old-term entries directlyStale reads after leader change (Figure 8)log[N].term == currentTerm check before advancing commitIndex
Not persisting before RPC replyVote for two candidates same term after crashCall persist() before any return in RequestVote / AppendEntries handlers
next_index underflowIndex panic on subtractionnext_index[i] = max(1, next_index[i] - 1)
last_applied not reset after snapshotRe-applies committed entriesSet last_applied = snapshot_index in InstallSnapshot handler
KvServer applies duplicate ops after snapshotData corruption on dedup tableInclude dedup table in snapshot data; restore it on snapshot load
Lab 5: processing config N+1 before N migration doneMissing shard dataOnly poll next config when all shards are in Serving state

Async Rust Pitfalls in These Labs

1. std::sync::Mutex vs tokio::sync::Mutex

Use std::sync::Mutex for Raft state (never held across await). Use tokio::sync::Mutex only if you absolutely must hold across await — but try to avoid that entirely.

// ❌ WRONG — will fail to compile
async fn bad(state: Arc<std::sync::Mutex<RaftState>>) {
    let guard = state.lock().unwrap();
    some_async_fn().await; // guard is still alive here — NOT Send
}

// ✅ CORRECT — drop guard before await
async fn good(state: Arc<std::sync::Mutex<RaftState>>) {
    let value = {
        let guard = state.lock().unwrap();
        guard.some_field  // copy/clone what you need
    }; // guard dropped here
    some_async_fn(value).await;
}

2. tokio::spawn requires 'static + Send

All data passed into tokio::spawn must be 'static (owned, not borrowed) and Send. Clone Arcs before spawning, don't pass references.

// ❌ WRONG — cannot borrow across spawn
let raft_ref = &raft;
tokio::spawn(async move { raft_ref.something(); });

// ✅ CORRECT — clone the Arc
let raft2 = raft.clone();
tokio::spawn(async move { raft2.something(); });

3. select! cancellation

When a tokio::select! branch wins, the other branches' futures are dropped. If those futures were in the middle of work (e.g., sending an RPC), that work is cancelled. Design your futures to be cancel-safe, or use JoinSet instead.

tokio::select! {
    _ = sleep(timeout) => { /* timer fired */ }
    result = rpc_future => {
        // rpc_future may have sent the RPC but not gotten reply
        // — this is fine for Raft since RPCs are idempotent
    }
}

4. Deadlock: locking the same Mutex twice

In Raft, it's easy to call a method that locks the state mutex when you already hold it. Use a pattern where the lock is acquired only at the start of each RPC handler, work is done, then released.

// Avoid calling self.some_method() while holding self.state.lock()
// if some_method() also locks self.state

// Pattern: extract a free function that takes &mut RaftState
fn advance_commit_index(s: &mut RaftState, n_peers: usize) { /* ... */ }

// Call it with the lock held, but it never re-locks
let mut s = self.state.lock().unwrap();
advance_commit_index(&mut s, self.n_peers);
drop(s);

Final advice: Struggle genuinely with each lab before looking anything up. The distributed systems intuition you build while hunting an intermittent Raft bug for two days is irreplaceable. TiKV, etcd, CockroachDB, and Solana's consensus layer are all built on these exact ideas — you're not just doing exercises, you're learning the foundation of modern distributed infrastructure.