Chapter 26: Raft Consensus Protocol

26.1 The Distributed Consensus Problem

26.1.1 Fundamental Challenges

Distributed databases face a fundamental tension between availability and consistency. When multiple nodes must agree on a shared state, network partitions, node failures, and message delays create scenarios where naive approaches produce incorrect results.

Consider a simple replicated key-value store with three nodes. A client writes x = 5 to node A, while simultaneously another client writes x = 7 to node B. Without coordination:

  • Node A believes x = 5
  • Node B believes x = 7
  • Node C may have either value, or neither

This split-brain scenario violates the fundamental guarantee users expect: that a database returns consistent results regardless of which node handles their query.

26.1.2 The FLP Impossibility Result

Fischer, Lynch, and Paterson proved in 1985 that no deterministic consensus protocol can guarantee termination in an asynchronous system where even a single node may fail. This FLP impossibility result establishes a fundamental limit:

ConsensusAsync NetworkFault ToleranceNon-Termination Possible\text{Consensus} \land \text{Async Network} \land \text{Fault Tolerance} \Rightarrow \text{Non-Termination Possible}

Practical systems circumvent FLP by introducing timing assumptions. Raft assumes a partially synchronous model where message delays are bounded most of the time, allowing progress during stable periods while maintaining safety always.

26.1.3 Safety vs. Liveness

Consensus protocols distinguish two properties:

Safety: Nothing bad happens. The system never returns inconsistent results.

Liveness: Something good eventually happens. The system eventually makes progress.

Raft prioritizes safety unconditionally—a Raft cluster will never acknowledge conflicting values for the same log position. Liveness depends on timing: if a majority of nodes can communicate within bounded time, the cluster elects a leader and makes progress.

26.1.4 The Replicated State Machine Model

Raft implements consensus through replicated state machines. Each node maintains:

  1. Log: An ordered sequence of commands
  2. State Machine: Deterministic function from commands to state
  3. Committed Index: Position up to which log entries are durable

The key invariant: if all nodes apply the same commands in the same order, they reach identical states:

i,j:Logi[1..k]=Logj[1..k]Statei=Statej\forall i, j : \text{Log}_i[1..k] = \text{Log}_j[1..k] \Rightarrow \text{State}_i = \text{State}_j

26.2 Raft Algorithm Fundamentals

26.2.1 Node Roles

Every Raft node operates in exactly one of three roles:

Leader: Handles all client requests, replicates log entries to followers, sends periodic heartbeats. At most one leader exists per term.

Follower: Passive participants that respond to leader RPCs. Followers redirect client requests to the current leader.

Candidate: Transitional role during elections. A follower becomes a candidate when it suspects the leader has failed.

Loading diagram...

26.2.2 Terms

Raft divides time into terms, monotonically increasing integers that act as logical clocks:

termt<termt+1\text{term}_t < \text{term}_{t+1}

Each term begins with an election. If the election succeeds, the winning candidate serves as leader for the remainder of the term. If the election fails (split vote), a new term begins immediately.

Terms enable distributed leader detection: any node that receives a message with a higher term immediately steps down to follower and adopts the new term. This mechanism ensures stale leaders cannot cause inconsistency.

26.2.3 Leader Election

When a follower's election timeout expires without receiving a heartbeat, it transitions to candidate:

  1. Increment current term
  2. Vote for itself
  3. Reset election timer
  4. Send RequestVote RPCs to all other nodes

A candidate wins the election if it receives votes from a majority of nodes. The voting rules ensure at most one winner per term:

Vote Granting Rules:

  • Each node votes for at most one candidate per term
  • A node grants its vote only if the candidate's log is at least as up-to-date as its own

The "up-to-date" comparison uses lexicographic ordering on (lastLogTerm, lastLogIndex):

UpToDate(A,B)(termA>termB)((termA=termB)(indexAindexB))\text{UpToDate}(A, B) \Leftrightarrow (\text{term}_A > \text{term}_B) \lor ((\text{term}_A = \text{term}_B) \land (\text{index}_A \geq \text{index}_B))

26.2.4 Log Replication

Once elected, the leader handles all client requests:

  1. Append command to local log
  2. Send AppendEntries RPC to each follower
  3. Wait for majority acknowledgment
  4. Commit entry (advance commit index)
  5. Apply to state machine
  6. Respond to client

The AppendEntries RPC includes:

  • prevLogIndex: Index of log entry immediately preceding new entries
  • prevLogTerm: Term of prevLogIndex entry
  • entries[]: Log entries to append
  • leaderCommit: Leader's commit index

Followers perform a consistency check: they accept entries only if their log contains an entry at prevLogIndex with term prevLogTerm. This check, combined with the Log Matching Property, ensures logs remain consistent:

Log Matching Property: If two logs contain an entry with the same index and term, then:

  1. They store the same command
  2. All preceding entries are identical

26.2.5 Safety Proof Sketch

Raft's safety rests on two invariants:

Election Safety: At most one leader per term.

  • Proof: Majority voting with single vote per term guarantees uniqueness.

Leader Completeness: If a log entry is committed in term tt, it appears in the logs of all leaders for terms >t> t.

  • Proof: Committed entries exist on a majority. Election requires majority votes. Vote restriction ensures new leaders have all committed entries.

Together, these invariants ensure committed entries are never lost or overwritten.

26.3 Cognica's NuRaft Integration

26.3.1 Architecture Overview

Cognica integrates the NuRaft library for Raft consensus, adding database-specific components:

Loading diagram...

The ReplicationManager (src/cognica/replication/core/manager.hpp) orchestrates all components:

class ReplicationManager final {
public:
  ReplicationManager(Options options,
                     std::shared_ptr<TransactionApplier> applier);

  auto start() -> ReplicationStatus;
  auto stop() -> ReplicationStatus;

  auto append_transaction(Transaction* txn) -> ReplicationStatus;
  auto wait_for_commit(uint64_t log_idx) -> ReplicationStatus;

  auto is_leader() const -> bool;
  auto get_leader_id() const -> int32_t;

private:
  std::unique_ptr<nuraft::raft_server> raft_server_;
  std::unique_ptr<RaftLogStore> log_store_;
  std::unique_ptr<RaftStateManager> state_mgr_;
  std::unique_ptr<ReplicationStateMachine> state_machine_;

  std::thread writer_thread_;
  std::queue<CommitRequest> commit_queue_;
};

26.3.2 Configuration Parameters

Cognica exposes Raft parameters through its configuration system (src/cognica/replication/config/options.hpp):

ParameterDefaultDescription
heartbeat_interval1000 msLeader heartbeat frequency
election_timeout5000 msLower bound for election timeout
snapshot_distance100Entries between snapshots
log_sync_batch_size10Entries per sync batch

The election timeout uses randomization to prevent split votes:

TelectionUniform[Tlower,2Tlower]T_{\text{election}} \sim \text{Uniform}[T_{\text{lower}}, 2 \cdot T_{\text{lower}}]

With default settings, election timeouts range from 5 to 10 seconds, while heartbeats occur every second—ensuring followers receive multiple heartbeats per election period.

26.3.3 ASIO-Based Networking

Cognica uses Boost.ASIO for asynchronous network I/O, configured with a 4-thread pool:

void initialize_networking_() {
  asio_service_ = std::make_shared<nuraft::asio_service>(
      nuraft::asio_service_options{
          .thread_pool_size_ = 4,
          .enable_ssl_ = options_.tls.enable_ssl,
          .server_cert_ = options_.tls.server_cert_file,
          .server_key_ = options_.tls.server_key_file,
          .ca_cert_ = options_.tls.ca_cert_file
      });

  rpc_listener_ = asio_service_->create_rpc_listener(
      options_.listen_port,
      options_.logger);
}

The async I/O model allows a single thread to handle many concurrent connections, essential for clusters with numerous nodes.

26.4 Persistent Log Storage

26.4.1 Segmented Log Architecture

Cognica's RaftLogStore (src/cognica/replication/core/raft_log_store.hpp) implements persistent log storage using a segmented file structure:

{db_path}/raft/{node_id}/logs/
  metadata.bin       (16 bytes: start_idx, last_idx)
  segment_index.bin  (entry location index)
  segment_0000000001.log
  segment_0000000002.log
  ...

Each segment contains sequential log entries up to a size threshold:

class RaftLogStore : public nuraft::log_store {
public:
  static constexpr size_t kSegmentSizeThreshold = 64 * 1024 * 1024;  // 64 MB

  auto append(nuraft::ptr<nuraft::log_entry>& entry) -> uint64_t override;
  auto write_at(uint64_t index, nuraft::ptr<nuraft::log_entry>& entry) -> void override;
  auto log_entries(uint64_t start, uint64_t end) -> nuraft::ptr<std::vector<...>> override;
  auto compact(uint64_t last_log_idx) -> bool override;

private:
  auto rotate_segment_if_needed_() -> void;
  auto locate_entry_(uint64_t index) -> EntryLocation;

  std::map<uint64_t, SegmentInfo> segments_;
  std::map<uint64_t, EntryLocation> entry_index_;
  uint64_t start_idx_, last_idx_;
};

26.4.2 Entry Location Tracking

For efficient random access, the log maintains an in-memory index mapping log indices to physical locations:

struct EntryLocation {
  uint64_t segment_id;
  uint64_t offset;
  uint32_t size;
};

The index enables O(1)O(1) entry lookup regardless of log size, critical for:

  • Consistency checks (reading prevLogIndex entry)
  • Follower catch-up (reading arbitrary ranges)
  • Snapshot transfer (packing log segments)

26.4.3 Durability Guarantees

Log durability follows a strict protocol:

  1. Write entry to temporary file
  2. Call fsync() on file descriptor
  3. Atomically rename to final location
  4. Sync directory metadata
auto RaftLogStore::persist_entry_(const LogEntry& entry) -> Status {
  auto temp_path = segment_path_ + ".tmp";
  auto fd = open(temp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);

  write(fd, entry.data(), entry.size());
  fsync(fd);  // Ensure data reaches disk
  close(fd);

  rename(temp_path.c_str(), segment_path_.c_str());

  // Sync directory to persist rename
  auto dir_fd = open(log_dir_.c_str(), O_RDONLY | O_DIRECTORY);
  fsync(dir_fd);
  close(dir_fd);

  return Status::OK();
}

This protocol survives power failures: either the old file exists (write failed) or the new file exists (write succeeded). No intermediate states are observable after recovery.

26.4.4 Log Compaction

As logs grow unboundedly, compaction becomes necessary. Cognica supports two compaction strategies:

Prefix Truncation: After snapshot creation, entries before the snapshot index can be deleted:

auto RaftLogStore::compact(uint64_t last_log_idx) -> bool {
  // Remove segments entirely contained before last_log_idx
  for (auto it = segments_.begin(); it != segments_.end(); ) {
    if (it->second.last_index < last_log_idx) {
      remove_segment_(it->first);
      it = segments_.erase(it);
    } else {
      ++it;
    }
  }
  start_idx_ = last_log_idx + 1;
  persist_metadata_();
  return true;
}

Segment Rotation: When a segment exceeds the size threshold, a new segment begins:

auto RaftLogStore::rotate_segment_if_needed_() -> void {
  if (current_segment_size_ >= kSegmentSizeThreshold) {
    close_current_segment_();
    current_segment_id_++;
    create_new_segment_();
    current_segment_size_ = 0;
  }
}

26.5 State Machine and Transaction Application

26.5.1 The Commit Interface

Cognica's state machine (src/cognica/replication/core/state_machine.hpp) implements NuRaft's state machine interface:

class ReplicationStateMachine : public nuraft::state_machine {
public:
  auto commit(uint64_t log_idx, nuraft::buffer& data)
      -> nuraft::ptr<nuraft::buffer> override;

  auto pre_commit(uint64_t log_idx, nuraft::buffer& data)
      -> nuraft::ptr<nuraft::buffer> override;

  auto rollback(uint64_t log_idx, nuraft::buffer& data) -> void override;

  auto last_commit_index() -> uint64_t override;

private:
  std::shared_ptr<TransactionApplier> applier_;
  std::atomic<uint64_t> last_committed_idx_{0};
};

The commit() method executes when Raft determines an entry is safely replicated:

auto ReplicationStateMachine::commit(uint64_t log_idx, nuraft::buffer& data)
    -> nuraft::ptr<nuraft::buffer> {
  // Deserialize log entry
  auto entry = TransactionLogEntry::deserialize(data);

  // Apply to database
  auto status = applier_->apply_log_entry(entry);
  if (!status.ok()) {
    LOG_ERROR("Failed to apply log entry {}: {}", log_idx, status.message());
  }

  // Update commit index with release semantics
  last_committed_idx_.store(log_idx, std::memory_order_release);

  return nuraft::buffer::alloc(0);  // Success
}

26.5.2 Transaction Log Entries

Each log entry encapsulates a complete transaction (src/cognica/replication/transaction/log.hpp):

struct TransactionLogEntry {
  uint64_t sequence_number;     // Monotonic sequence
  uint64_t term;                // Raft term (fencing token)
  TransactionType type;         // WRITE, DELETE, etc.
  std::vector<Operation> ops;   // Individual operations

  auto serialize() const -> nuraft::ptr<nuraft::buffer>;
  static auto deserialize(nuraft::buffer& buf) -> TransactionLogEntry;
};

struct Operation {
  OperationType type;
  std::string collection;
  std::string key;
  std::string value;  // Empty for deletes
};

The term field acts as a fencing token, preventing stale leaders from applying outdated transactions (discussed in Section 26.8).

26.5.3 The Transaction Applier

The TransactionApplier (src/cognica/replication/transaction/applier.hpp) converts log entries to RocksDB operations:

class TransactionApplier {
public:
  auto apply_log_entry(const TransactionLogEntry& entry) -> ReplicationStatus;

  auto last_applied_sequence() const -> uint64_t;
  auto set_leader_term(uint64_t term) -> void;

private:
  auto validate_sequence_(uint64_t seq) -> ReplicationStatus;
  auto validate_term_(uint64_t term) -> ReplicationStatus;
  auto convert_to_write_batch_(const TransactionLogEntry& entry) -> WriteBatch;

  std::shared_ptr<rocksdb::DB> db_;
  std::atomic<uint64_t> last_applied_sequence_{0};
  std::atomic<uint64_t> current_leader_term_{0};
};

Application follows a strict sequence:

auto TransactionApplier::apply_log_entry(const TransactionLogEntry& entry)
    -> ReplicationStatus {
  // 1. Validate sequence ordering
  auto status = validate_sequence_(entry.sequence_number);
  if (!status.ok()) {
    return status;
  }

  // 2. Validate term (fencing)
  status = validate_term_(entry.term);
  if (!status.ok()) {
    return status;
  }

  // 3. Convert to RocksDB WriteBatch
  auto batch = convert_to_write_batch_(entry);

  // 4. Apply atomically
  auto write_options = rocksdb::WriteOptions();
  write_options.sync = true;
  auto rdb_status = db_->Write(write_options, &batch);

  if (!rdb_status.ok()) {
    return ReplicationStatus::StorageError(rdb_status.ToString());
  }

  // 5. Update last applied sequence
  last_applied_sequence_.store(entry.sequence_number,
                               std::memory_order_release);

  return ReplicationStatus::OK();
}

26.6 Leader Election Implementation

26.6.1 Election Timeout Management

Cognica configures election timeouts through NuRaft parameters:

void ReplicationManager::configure_raft_params_() {
  raft_params_.election_timeout_lower_bound_ =
      options_.election_timeout.count();
  raft_params_.election_timeout_upper_bound_ =
      options_.election_timeout.count() * 2;
  raft_params_.heart_beat_interval_ =
      options_.heartbeat_interval.count();
}

The randomized timeout prevents synchronized elections that could cause repeated split votes. With 5-10 second timeouts and 1 second heartbeats, the probability of split votes is:

P(split)(TheartbeatTelection)n1P(\text{split}) \approx \left(\frac{T_{\text{heartbeat}}}{T_{\text{election}}}\right)^{n-1}

For a 3-node cluster: P(split)(0.1)2=0.01P(\text{split}) \approx (0.1)^2 = 0.01.

26.6.2 State Transitions

Cognica tracks node roles independently of NuRaft for application-level logic (src/cognica/replication/core/state.hpp):

class ReplicationState {
public:
  enum class Role { kPrimary, kSecondary, kRecovering };

  auto transition(Role from, Role to) -> bool;
  auto current_role() const -> Role;

  auto is_primary() const -> bool;
  auto is_secondary() const -> bool;
  auto is_recovering() const -> bool;

  using StateChangeCallback = std::function<void(Role, Role)>;
  auto set_callback(StateChangeCallback cb) -> void;

private:
  std::atomic<Role> role_{Role::kSecondary};
  std::mutex mutex_;
  StateChangeCallback callback_;
};

Valid transitions form a state machine:

auto ReplicationState::transition(Role from, Role to) -> bool {
  static const std::set<std::pair<Role, Role>> valid_transitions = {
      {Role::kSecondary, Role::kPrimary},     // Won election
      {Role::kSecondary, Role::kRecovering},  // Starting sync
      {Role::kPrimary, Role::kSecondary},     // Lost leadership
      {Role::kPrimary, Role::kRecovering},    // Resync needed
      {Role::kRecovering, Role::kSecondary},  // Sync completed
  };

  if (valid_transitions.count({from, to}) == 0) {
    return false;
  }

  std::lock_guard lock(mutex_);
  if (role_.load() != from) {
    return false;  // State changed concurrently
  }

  role_.store(to);

  // Execute callback outside lock
  if (callback_) {
    callback_(from, to);
  }

  return true;
}

26.6.3 Becoming Leader

When NuRaft notifies Cognica of leadership acquisition:

void ReplicationManager::on_become_leader_() {
  LOG_INFO("Node {} became leader for term {}",
           current_node_->id(), raft_server_->get_term());

  // Update role state
  state_.transition(ReplicationState::Role::kSecondary,
                    ReplicationState::Role::kPrimary);

  // Start replicator for async secondary updates
  replicator_->start_as_primary();

  // Update metrics
  metrics_.leader_elections.fetch_add(1, std::memory_order_relaxed);

  // Audit log
  audit_logger_->log_event(AuditEvent::LeaderElected{
      .node_id = current_node_->id(),
      .term = raft_server_->get_term()
  });

  // Notify application callbacks
  if (leader_callback_) {
    leader_callback_(current_node_->id());
  }
}

26.7 Snapshot Mechanism

26.7.1 RocksDB Checkpoint Integration

Cognica leverages RocksDB's checkpoint feature for snapshots, providing a consistent point-in-time view without blocking writes:

auto ReplicationStateMachine::create_snapshot(
    nuraft::snapshot& s,
    nuraft::async_result<bool>::handler_type& when_done) -> void {

  auto log_idx = s.get_last_log_idx();
  auto snapshot_dir = snapshot_path_ / fmt::format("snapshot_{}", log_idx);

  // Create RocksDB checkpoint
  rocksdb::Checkpoint* checkpoint = nullptr;
  auto status = rocksdb::Checkpoint::Create(db_.get(), &checkpoint);
  if (!status.ok()) {
    when_done(false, nuraft::cmd_result_code::FAILED);
    return;
  }

  status = checkpoint->CreateCheckpoint(snapshot_dir.string());
  delete checkpoint;

  if (!status.ok()) {
    when_done(false, nuraft::cmd_result_code::FAILED);
    return;
  }

  // Track snapshot
  last_snapshot_ = std::make_shared<SnapshotInfo>(log_idx, snapshot_dir);

  // Cleanup old snapshots (keep 3)
  cleanup_old_snapshots_(3);

  when_done(true, nuraft::cmd_result_code::OK);
}

26.7.2 Snapshot Directory Structure

{db_path}/raft/{node_id}/snapshots/
  snapshot_1000/
    MANIFEST-000001
    000001.sst
    000002.sst
    ...
    OPTIONS-000001
  snapshot_2000/
    ...

Each snapshot contains a complete RocksDB checkpoint, enabling followers to bootstrap from scratch.

26.7.3 Chunk-Based Transfer

For large snapshots, Cognica transfers data in 1 MB chunks:

auto ReplicationStateMachine::read_logical_snp_obj(
    nuraft::snapshot& s,
    void*& user_ctx,
    uint64_t obj_id,
    nuraft::ptr<nuraft::buffer>& data_out,
    bool& is_last_obj) -> int {

  static constexpr size_t kChunkSize = 1 * 1024 * 1024;  // 1 MB

  auto* ctx = static_cast<SnapshotReadContext*>(user_ctx);
  if (!ctx) {
    ctx = new SnapshotReadContext(s.get_last_log_idx());
    user_ctx = ctx;
  }

  // Read next chunk
  std::vector<char> buffer(kChunkSize);
  auto bytes_read = ctx->read_chunk(buffer.data(), kChunkSize);

  data_out = nuraft::buffer::alloc(bytes_read);
  data_out->put_raw(buffer.data(), bytes_read);

  is_last_obj = ctx->is_complete();

  return 0;
}

26.7.4 Snapshot Application

Followers apply received snapshots:

auto ReplicationStateMachine::apply_snapshot(nuraft::snapshot& s) -> bool {
  auto log_idx = s.get_last_log_idx();
  auto snapshot_dir = snapshot_path_ / fmt::format("snapshot_{}", log_idx);

  // Verify snapshot files exist
  if (!std::filesystem::exists(snapshot_dir)) {
    LOG_ERROR("Snapshot directory does not exist: {}", snapshot_dir);
    return false;
  }

  // Extract last sequence number from snapshot metadata
  auto last_seq = extract_sequence_from_snapshot_(snapshot_dir);

  // Reset applier state
  applier_->reset(last_seq);

  // Update commit index
  last_committed_idx_.store(log_idx, std::memory_order_release);

  // Track as current snapshot
  last_snapshot_ = std::make_shared<SnapshotInfo>(log_idx, snapshot_dir);

  LOG_INFO("Applied snapshot at index {} with sequence {}", log_idx, last_seq);

  return true;
}

26.8 Term-Based Fencing

26.8.1 The Split-Brain Problem

Network partitions can create scenarios where a stale leader continues operating:

Time 0: [A=Leader, B, C] - Cluster healthy
Time 1: Network partition isolates A
Time 2: B elected new leader (term 2)
Time 3: A still believes it's leader (term 1)

Without protection, both A and B could accept writes, causing divergent states.

26.8.2 Fencing Tokens

Cognica embeds the Raft term in every log entry as a fencing token:

struct TransactionLogEntry {
  uint64_t term;  // Fencing token
  // ...
};

During application, the applier validates the term:

auto TransactionApplier::validate_term_(uint64_t entry_term) -> ReplicationStatus {
  auto current_term = current_leader_term_.load(std::memory_order_acquire);

  if (entry_term < current_term) {
    return ReplicationStatus::StaleTerm(
        fmt::format("Entry term {} < current term {}", entry_term, current_term));
  }

  if (entry_term > current_term) {
    // New leader with higher term
    current_leader_term_.store(entry_term, std::memory_order_release);
  }

  return ReplicationStatus::OK();
}

26.8.3 Preventing Stale Writes

The fencing mechanism ensures:

  1. Monotonic terms: Each leader has a strictly higher term than predecessors
  2. Term validation: Entries from old terms are rejected
  3. Automatic recovery: New leader's higher term invalidates stale entries

When the partitioned leader A reconnects:

Time 4: A discovers term 2 > term 1
Time 5: A steps down to follower
Time 6: A discards uncommitted entries from term 1
Time 7: A receives log from B (new leader)

26.9 Two-Phase Commit Integration

26.9.1 The Coordination Challenge

Cognica must coordinate Raft consensus with RocksDB transactions. Naive approaches fail:

Problem 1: Commit before Raft

// WRONG: Data visible before replication
txn->Commit();
raft->append(txn);  // Crash here = data lost on secondaries

Problem 2: Raft before Commit

// WRONG: Sequence number unknown until commit
raft->append(txn);  // What sequence number?
txn->Commit();      // RocksDB assigns sequence

26.9.2 Cognica's Two-Phase Protocol

Cognica uses RocksDB's two-phase commit (2PC) feature:

Phase 1: PREPARE
  - Write transaction to WAL
  - Data not yet visible
  - Survives crash

Phase 2: COMMIT
  - Make data visible
  - Assign sequence number

The full replication flow:

auto ReplicationManager::replicate_transaction(Transaction* txn)
    -> ReplicationStatus {
  // Step 1: Prepare (durable but invisible)
  auto status = txn->Prepare();
  if (!status.ok()) {
    return ReplicationStatus::PrepareError(status.ToString());
  }

  // Step 2: Assign sequence and append to Raft
  uint64_t log_idx;
  status = append_transaction_to_raft_(txn, &log_idx);
  if (!status.ok()) {
    txn->Rollback();
    return status;
  }

  // Step 3: Wait for Raft commit (majority acknowledgment)
  status = wait_for_raft_commit_(log_idx, options_.commit_timeout);
  if (!status.ok()) {
    write_abort_record_(txn);
    txn->Rollback();
    return status;
  }

  // Step 4: Commit locally (make visible)
  status = txn->Commit();
  if (!status.ok()) {
    write_abort_record_(txn);
    return ReplicationStatus::CommitError(status.ToString());
  }

  // Step 5: Async replication to secondaries
  replicator_->replicate_transaction(txn, txn->sequence_number());

  return ReplicationStatus::OK();
}

26.9.3 Sequence Number Management

A critical challenge: RocksDB assigns sequence numbers at commit time, but Raft needs the sequence number before commit (for ordering). Cognica solves this with a centralized sequence generator:

class SequenceGenerator {
public:
  auto next_sequence() -> uint64_t {
    return next_sequence_.fetch_add(1, std::memory_order_acq_rel);
  }

  auto set_sequence(uint64_t seq) -> void {
    next_sequence_.store(seq, std::memory_order_release);
  }

private:
  std::atomic<uint64_t> next_sequence_{1};
};

The replication manager assigns sequences before Raft append:

auto ReplicationManager::append_transaction_to_raft_(
    Transaction* txn,
    uint64_t* out_log_idx) -> ReplicationStatus {

  // Assign sequence BEFORE Raft append
  auto seq_no = sequence_generator_->next_sequence();
  txn->set_assigned_sequence(seq_no);

  // Get current term for fencing
  auto current_term = raft_server_->get_term();

  // Build log entry
  auto entry = TransactionLogEntry{
      .sequence_number = seq_no,
      .term = current_term,
      .ops = txn->operations()
  };

  // Append to Raft
  auto result = raft_server_->append_entries({entry.serialize()});
  if (result->get_result_code() != nuraft::cmd_result_code::OK) {
    return ReplicationStatus::RaftError(result->get_result_str());
  }

  *out_log_idx = raft_server_->get_last_log_idx();
  return ReplicationStatus::OK();
}

26.9.4 Waiting for Raft Commit

After appending, the leader waits for majority acknowledgment:

auto ReplicationManager::wait_for_raft_commit_(
    uint64_t log_idx,
    std::chrono::milliseconds timeout) -> ReplicationStatus {

  auto deadline = std::chrono::steady_clock::now() + timeout;

  while (std::chrono::steady_clock::now() < deadline) {
    auto committed_idx = raft_server_->get_committed_log_idx();
    if (committed_idx >= log_idx) {
      return ReplicationStatus::OK();
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
  }

  return ReplicationStatus::Timeout(
      fmt::format("Timed out waiting for log index {} (committed: {})",
                  log_idx, raft_server_->get_committed_log_idx()));
}

The 1 ms polling interval balances responsiveness against CPU usage. Under normal operation, Raft commits within a few milliseconds.

26.10 Crash Recovery

26.10.1 Recovery Scenarios

Crashes can occur at any point in the two-phase protocol. Cognica handles each scenario:

Scenario 1: Crash after PREPARE, before Raft LOG

  • RocksDB finds prepared transaction on restart
  • No Raft log entry exists
  • Action: Rollback (transaction never replicated)

Scenario 2: Crash after Raft LOG, before local COMMIT

  • Raft log entry exists with assigned sequence
  • RocksDB has prepared transaction
  • Action: Abort and write ABORT record (secondaries skip this sequence)

Scenario 3: Crash after local COMMIT, before async replication

  • Transaction committed locally
  • Secondaries may not have received it
  • Action: Raft consensus handles recovery (entry already committed)

26.10.2 Recovery Implementation

auto ReplicationManager::recover_uncommitted_transactions_() -> void {
  // Get all prepared transactions from RocksDB
  std::vector<Transaction*> prepared;
  db_->GetAllPreparedTransactions(&prepared);

  for (auto* txn : prepared) {
    auto seq = txn->assigned_sequence();

    // Check if sequence exists in Raft log
    auto log_entry = log_store_->entry_at(find_by_sequence_(seq));

    if (!log_entry) {
      // Scenario 1: Not in Raft log, rollback
      LOG_INFO("Rolling back prepared transaction with seq {}", seq);
      txn->Rollback();
    } else if (!is_committed_in_raft_(log_entry->index())) {
      // Scenario 2: In log but not committed, abort
      LOG_INFO("Aborting uncommitted transaction with seq {}", seq);
      write_abort_record_(seq);
      txn->Rollback();
    } else {
      // Scenario 3: Committed in Raft, commit locally
      LOG_INFO("Completing committed transaction with seq {}", seq);
      txn->Commit();
    }
  }
}

26.10.3 Abort Records

When a transaction cannot complete, Cognica writes an abort record to the Raft log:

auto ReplicationManager::write_abort_record_(uint64_t sequence) -> void {
  auto entry = TransactionLogEntry{
      .sequence_number = sequence,
      .term = raft_server_->get_term(),
      .type = TransactionType::ABORT
  };

  raft_server_->append_entries({entry.serialize()});
}

Secondary nodes process abort records by skipping the sequence:

auto TransactionApplier::apply_log_entry(const TransactionLogEntry& entry)
    -> ReplicationStatus {
  if (entry.type == TransactionType::ABORT) {
    // Skip this sequence number
    advance_sequence_past_(entry.sequence_number);
    return ReplicationStatus::OK();
  }
  // Normal application...
}

26.11 Gap Detection and Synchronization

26.11.1 Sequence Gaps

Network issues or node failures can cause sequence gaps on secondary nodes:

Primary:   [1, 2, 3, 4, 5, 6, 7]
Secondary: [1, 2, 3, _, _, 6, 7]  // Gap at 4, 5

26.11.2 Gap Detection

The TransactionApplier detects gaps during application:

auto TransactionApplier::validate_sequence_(uint64_t incoming_seq)
    -> ReplicationStatus {
  auto expected = last_applied_sequence_.load() + 1;

  if (incoming_seq == expected) {
    return ReplicationStatus::OK();
  }

  if (incoming_seq < expected) {
    // Duplicate, already applied
    return ReplicationStatus::Duplicate();
  }

  // Gap detected
  auto gap = incoming_seq - expected;

  if (gap <= kSmallGapThreshold) {
    // Small gap, can recover from log
    return ReplicationStatus::LogSyncRequired(expected, incoming_seq);
  }

  if (gap <= kLargeGapThreshold) {
    // Medium gap, try log sync first
    return ReplicationStatus::LogSyncRequired(expected, incoming_seq);
  }

  // Large gap, need snapshot
  return ReplicationStatus::SnapshotSyncRequired();
}

26.11.3 Recovery Strategies

Gap size determines recovery strategy:

Gap SizeStrategyMechanism
1-10Log SyncFetch missing entries from leader
11-100Log SyncBatch fetch with verification
> 100SnapshotFull state transfer
void ReplicationManager::handle_sync_required_(const ReplicationStatus& status) {
  if (status.is_log_sync_required()) {
    auto [start, end] = status.gap_range();
    request_log_entries_(leader_id_, start, end);
  } else if (status.is_snapshot_sync_required()) {
    state_.transition(ReplicationState::Role::kSecondary,
                      ReplicationState::Role::kRecovering);
    request_snapshot_(leader_id_);
  }
}

26.12 Cluster Membership

26.12.1 Configuration Changes

Raft supports dynamic cluster membership through configuration changes. Cognica uses NuRaft's joint consensus approach:

auto ReplicationManager::add_node(const NodeInfo& node)
    -> ReplicationStatus {
  if (!is_leader()) {
    return ReplicationStatus::NotLeader();
  }

  // Create server configuration
  auto srv_config = nuraft::srv_config(
      node.id,
      node.endpoint,
      /* learner */ false);

  auto result = raft_server_->add_srv(srv_config);
  if (result->get_result_code() != nuraft::cmd_result_code::OK) {
    return ReplicationStatus::ConfigError(result->get_result_str());
  }

  return ReplicationStatus::OK();
}

26.12.2 Safe Membership Changes

Membership changes follow a two-phase protocol:

  1. Joint consensus: Both old and new configurations active
  2. New configuration: Only new configuration active

This ensures no split-brain during transitions:

MajorityoldMajoritynew\text{Majority}_{\text{old}} \cap \text{Majority}_{\text{new}} \neq \emptyset

26.12.3 Learner Nodes

Cognica supports learner (non-voting) nodes for:

  • Geographic read replicas
  • Backup nodes
  • Staged rollouts
auto ReplicationManager::add_learner(const NodeInfo& node)
    -> ReplicationStatus {
  auto srv_config = nuraft::srv_config(
      node.id,
      node.endpoint,
      /* learner */ true);  // Non-voting

  return add_server_(srv_config);
}

Learners receive log entries but don't participate in elections or quorum calculations.

26.13 Performance Considerations

26.13.1 Latency Breakdown

A typical write operation's latency:

PhaseTypical LatencyDescription
Prepare0.1-0.5 msRocksDB WAL write
Raft Append0.1-0.2 msLocal log write
Replication1-10 msNetwork RTT to majority
Commit0.1-0.3 msMake visible
Total1.5-11 msEnd-to-end

Network latency dominates in distributed settings.

26.13.2 Throughput Optimization

Cognica employs several techniques for high throughput:

Batching: Multiple transactions share a single Raft round-trip:

class TransactionBatcher {
public:
  void add(Transaction* txn) {
    std::lock_guard lock(mutex_);
    pending_.push_back(txn);
    if (pending_.size() >= max_batch_size_ ||
        deadline_reached_()) {
      flush_();
    }
  }

private:
  void flush_() {
    auto batch = std::move(pending_);
    // Single Raft append for entire batch
    raft_->append_batch(batch);
  }

  size_t max_batch_size_ = 100;
  std::chrono::milliseconds max_delay_ = 10ms;
};

Pipelining: Overlapping Raft phases:

Transaction 1: [Prepare][Append][Wait.......][Commit]
Transaction 2:          [Prepare][Append][Wait.......][Commit]
Transaction 3:                   [Prepare][Append][Wait.......][Commit]

Async Replication: Secondary updates don't block primary:

void Replicator::replicate_transaction(Transaction* txn, uint64_t seq) {
  // Fire and forget
  thread_pool_->enqueue([=] {
    for (auto& node : secondary_nodes_) {
      send_batch_(node, {txn});
    }
  });
}

26.13.3 Disk I/O Optimization

Raft's durability requirements demand careful I/O management:

Group Commit: Batch multiple entries before fsync:

void RaftLogStore::flush_pending_() {
  // Write all pending entries
  for (auto& entry : pending_entries_) {
    write_entry_(entry);
  }

  // Single fsync for all entries
  fsync(current_segment_fd_);

  pending_entries_.clear();
}

Direct I/O: Bypass OS page cache for predictable latency:

int open_segment_(const std::string& path) {
  return open(path.c_str(),
              O_RDWR | O_CREAT | O_DIRECT,
              0644);
}

26.14 Monitoring and Observability

26.14.1 Key Metrics

Cognica exposes Raft metrics for monitoring:

struct RaftMetrics {
  std::atomic<uint64_t> leader_elections{0};
  std::atomic<uint64_t> append_entries_sent{0};
  std::atomic<uint64_t> append_entries_received{0};
  std::atomic<uint64_t> commits{0};
  std::atomic<uint64_t> snapshots_created{0};
  std::atomic<uint64_t> snapshots_applied{0};

  std::atomic<uint64_t> commit_latency_us{0};
  std::atomic<uint64_t> replication_lag_entries{0};
};

26.14.2 Health Checks

Regular health assessment:

auto ReplicationManager::health_status() const -> HealthStatus {
  HealthStatus status;

  status.is_leader = is_leader();
  status.current_term = raft_server_->get_term();
  status.commit_index = raft_server_->get_committed_log_idx();
  status.last_log_index = log_store_->last_entry()->get_idx();
  status.cluster_size = raft_server_->get_srv_config_all().size();

  // Check for concerning conditions
  status.warnings = {};

  auto lag = status.last_log_index - status.commit_index;
  if (lag > 100) {
    status.warnings.push_back("High commit lag: " + std::to_string(lag));
  }

  if (!is_leader() && !received_heartbeat_recently_()) {
    status.warnings.push_back("No recent heartbeat from leader");
  }

  return status;
}

26.14.3 Audit Logging

Security-relevant events are logged for compliance:

class AuditLogger {
public:
  void log_event(const AuditEvent& event) {
    auto json = serialize_event_(event);
    auto checksum = compute_checksum_(json);

    writer_.write({
        .timestamp = std::chrono::system_clock::now(),
        .event = json,
        .checksum = checksum
    });
  }

private:
  AsyncWriter writer_;
};

Audit events include:

  • Leader elections
  • Configuration changes
  • Authentication attempts
  • Authorization decisions
  • Replication errors

26.15 Summary

Cognica's Raft implementation provides strong consistency guarantees for distributed database operations:

  1. Safety: Committed transactions are never lost, even under failures
  2. Linearizability: Operations appear atomic and ordered
  3. Availability: Cluster tolerates minority node failures
  4. Recovery: Automatic failover and state synchronization

The integration with RocksDB's two-phase commit, combined with term-based fencing and sequence management, ensures correctness across the complex interaction between local storage and distributed consensus.

Key implementation highlights:

  • NuRaft foundation with custom log storage and state machine
  • Segmented persistent log with 64 MB segments and crash-safe writes
  • RocksDB checkpoint-based snapshots for efficient state transfer
  • Centralized sequence generation solving the prepare-before-commit ordering problem
  • Comprehensive crash recovery handling all failure scenarios

The result is a production-grade distributed storage system that maintains ACID guarantees across multiple nodes while achieving sub-10ms commit latencies under normal operation.

Copyright (c) 2023-2026 Cognica, Inc.