Chapter 26: Raft Consensus Protocol
26.1 The Distributed Consensus Problem
26.1.1 Fundamental Challenges
Distributed databases face a fundamental tension between availability and consistency. When multiple nodes must agree on a shared state, network partitions, node failures, and message delays create scenarios where naive approaches produce incorrect results.
Consider a simple replicated key-value store with three nodes. A client writes x = 5 to node A, while simultaneously another client writes x = 7 to node B. Without coordination:
- Node A believes
x = 5 - Node B believes
x = 7 - Node C may have either value, or neither
This split-brain scenario violates the fundamental guarantee users expect: that a database returns consistent results regardless of which node handles their query.
26.1.2 The FLP Impossibility Result
Fischer, Lynch, and Paterson proved in 1985 that no deterministic consensus protocol can guarantee termination in an asynchronous system where even a single node may fail. This FLP impossibility result establishes a fundamental limit:
Practical systems circumvent FLP by introducing timing assumptions. Raft assumes a partially synchronous model where message delays are bounded most of the time, allowing progress during stable periods while maintaining safety always.
26.1.3 Safety vs. Liveness
Consensus protocols distinguish two properties:
Safety: Nothing bad happens. The system never returns inconsistent results.
Liveness: Something good eventually happens. The system eventually makes progress.
Raft prioritizes safety unconditionally—a Raft cluster will never acknowledge conflicting values for the same log position. Liveness depends on timing: if a majority of nodes can communicate within bounded time, the cluster elects a leader and makes progress.
26.1.4 The Replicated State Machine Model
Raft implements consensus through replicated state machines. Each node maintains:
- Log: An ordered sequence of commands
- State Machine: Deterministic function from commands to state
- Committed Index: Position up to which log entries are durable
The key invariant: if all nodes apply the same commands in the same order, they reach identical states:
26.2 Raft Algorithm Fundamentals
26.2.1 Node Roles
Every Raft node operates in exactly one of three roles:
Leader: Handles all client requests, replicates log entries to followers, sends periodic heartbeats. At most one leader exists per term.
Follower: Passive participants that respond to leader RPCs. Followers redirect client requests to the current leader.
Candidate: Transitional role during elections. A follower becomes a candidate when it suspects the leader has failed.
26.2.2 Terms
Raft divides time into terms, monotonically increasing integers that act as logical clocks:
Each term begins with an election. If the election succeeds, the winning candidate serves as leader for the remainder of the term. If the election fails (split vote), a new term begins immediately.
Terms enable distributed leader detection: any node that receives a message with a higher term immediately steps down to follower and adopts the new term. This mechanism ensures stale leaders cannot cause inconsistency.
26.2.3 Leader Election
When a follower's election timeout expires without receiving a heartbeat, it transitions to candidate:
- Increment current term
- Vote for itself
- Reset election timer
- Send RequestVote RPCs to all other nodes
A candidate wins the election if it receives votes from a majority of nodes. The voting rules ensure at most one winner per term:
Vote Granting Rules:
- Each node votes for at most one candidate per term
- A node grants its vote only if the candidate's log is at least as up-to-date as its own
The "up-to-date" comparison uses lexicographic ordering on (lastLogTerm, lastLogIndex):
26.2.4 Log Replication
Once elected, the leader handles all client requests:
- Append command to local log
- Send AppendEntries RPC to each follower
- Wait for majority acknowledgment
- Commit entry (advance commit index)
- Apply to state machine
- Respond to client
The AppendEntries RPC includes:
prevLogIndex: Index of log entry immediately preceding new entriesprevLogTerm: Term of prevLogIndex entryentries[]: Log entries to appendleaderCommit: Leader's commit index
Followers perform a consistency check: they accept entries only if their log contains an entry at prevLogIndex with term prevLogTerm. This check, combined with the Log Matching Property, ensures logs remain consistent:
Log Matching Property: If two logs contain an entry with the same index and term, then:
- They store the same command
- All preceding entries are identical
26.2.5 Safety Proof Sketch
Raft's safety rests on two invariants:
Election Safety: At most one leader per term.
- Proof: Majority voting with single vote per term guarantees uniqueness.
Leader Completeness: If a log entry is committed in term , it appears in the logs of all leaders for terms .
- Proof: Committed entries exist on a majority. Election requires majority votes. Vote restriction ensures new leaders have all committed entries.
Together, these invariants ensure committed entries are never lost or overwritten.
26.3 Cognica's NuRaft Integration
26.3.1 Architecture Overview
Cognica integrates the NuRaft library for Raft consensus, adding database-specific components:
The ReplicationManager (src/cognica/replication/core/manager.hpp) orchestrates all components:
class ReplicationManager final {
public:
ReplicationManager(Options options,
std::shared_ptr<TransactionApplier> applier);
auto start() -> ReplicationStatus;
auto stop() -> ReplicationStatus;
auto append_transaction(Transaction* txn) -> ReplicationStatus;
auto wait_for_commit(uint64_t log_idx) -> ReplicationStatus;
auto is_leader() const -> bool;
auto get_leader_id() const -> int32_t;
private:
std::unique_ptr<nuraft::raft_server> raft_server_;
std::unique_ptr<RaftLogStore> log_store_;
std::unique_ptr<RaftStateManager> state_mgr_;
std::unique_ptr<ReplicationStateMachine> state_machine_;
std::thread writer_thread_;
std::queue<CommitRequest> commit_queue_;
};
26.3.2 Configuration Parameters
Cognica exposes Raft parameters through its configuration system (src/cognica/replication/config/options.hpp):
| Parameter | Default | Description |
|---|---|---|
heartbeat_interval | 1000 ms | Leader heartbeat frequency |
election_timeout | 5000 ms | Lower bound for election timeout |
snapshot_distance | 100 | Entries between snapshots |
log_sync_batch_size | 10 | Entries per sync batch |
The election timeout uses randomization to prevent split votes:
With default settings, election timeouts range from 5 to 10 seconds, while heartbeats occur every second—ensuring followers receive multiple heartbeats per election period.
26.3.3 ASIO-Based Networking
Cognica uses Boost.ASIO for asynchronous network I/O, configured with a 4-thread pool:
void initialize_networking_() {
asio_service_ = std::make_shared<nuraft::asio_service>(
nuraft::asio_service_options{
.thread_pool_size_ = 4,
.enable_ssl_ = options_.tls.enable_ssl,
.server_cert_ = options_.tls.server_cert_file,
.server_key_ = options_.tls.server_key_file,
.ca_cert_ = options_.tls.ca_cert_file
});
rpc_listener_ = asio_service_->create_rpc_listener(
options_.listen_port,
options_.logger);
}
The async I/O model allows a single thread to handle many concurrent connections, essential for clusters with numerous nodes.
26.4 Persistent Log Storage
26.4.1 Segmented Log Architecture
Cognica's RaftLogStore (src/cognica/replication/core/raft_log_store.hpp) implements persistent log storage using a segmented file structure:
{db_path}/raft/{node_id}/logs/
metadata.bin (16 bytes: start_idx, last_idx)
segment_index.bin (entry location index)
segment_0000000001.log
segment_0000000002.log
...
Each segment contains sequential log entries up to a size threshold:
class RaftLogStore : public nuraft::log_store {
public:
static constexpr size_t kSegmentSizeThreshold = 64 * 1024 * 1024; // 64 MB
auto append(nuraft::ptr<nuraft::log_entry>& entry) -> uint64_t override;
auto write_at(uint64_t index, nuraft::ptr<nuraft::log_entry>& entry) -> void override;
auto log_entries(uint64_t start, uint64_t end) -> nuraft::ptr<std::vector<...>> override;
auto compact(uint64_t last_log_idx) -> bool override;
private:
auto rotate_segment_if_needed_() -> void;
auto locate_entry_(uint64_t index) -> EntryLocation;
std::map<uint64_t, SegmentInfo> segments_;
std::map<uint64_t, EntryLocation> entry_index_;
uint64_t start_idx_, last_idx_;
};
26.4.2 Entry Location Tracking
For efficient random access, the log maintains an in-memory index mapping log indices to physical locations:
struct EntryLocation {
uint64_t segment_id;
uint64_t offset;
uint32_t size;
};
The index enables entry lookup regardless of log size, critical for:
- Consistency checks (reading
prevLogIndexentry) - Follower catch-up (reading arbitrary ranges)
- Snapshot transfer (packing log segments)
26.4.3 Durability Guarantees
Log durability follows a strict protocol:
- Write entry to temporary file
- Call
fsync()on file descriptor - Atomically rename to final location
- Sync directory metadata
auto RaftLogStore::persist_entry_(const LogEntry& entry) -> Status {
auto temp_path = segment_path_ + ".tmp";
auto fd = open(temp_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
write(fd, entry.data(), entry.size());
fsync(fd); // Ensure data reaches disk
close(fd);
rename(temp_path.c_str(), segment_path_.c_str());
// Sync directory to persist rename
auto dir_fd = open(log_dir_.c_str(), O_RDONLY | O_DIRECTORY);
fsync(dir_fd);
close(dir_fd);
return Status::OK();
}
This protocol survives power failures: either the old file exists (write failed) or the new file exists (write succeeded). No intermediate states are observable after recovery.
26.4.4 Log Compaction
As logs grow unboundedly, compaction becomes necessary. Cognica supports two compaction strategies:
Prefix Truncation: After snapshot creation, entries before the snapshot index can be deleted:
auto RaftLogStore::compact(uint64_t last_log_idx) -> bool {
// Remove segments entirely contained before last_log_idx
for (auto it = segments_.begin(); it != segments_.end(); ) {
if (it->second.last_index < last_log_idx) {
remove_segment_(it->first);
it = segments_.erase(it);
} else {
++it;
}
}
start_idx_ = last_log_idx + 1;
persist_metadata_();
return true;
}
Segment Rotation: When a segment exceeds the size threshold, a new segment begins:
auto RaftLogStore::rotate_segment_if_needed_() -> void {
if (current_segment_size_ >= kSegmentSizeThreshold) {
close_current_segment_();
current_segment_id_++;
create_new_segment_();
current_segment_size_ = 0;
}
}
26.5 State Machine and Transaction Application
26.5.1 The Commit Interface
Cognica's state machine (src/cognica/replication/core/state_machine.hpp) implements NuRaft's state machine interface:
class ReplicationStateMachine : public nuraft::state_machine {
public:
auto commit(uint64_t log_idx, nuraft::buffer& data)
-> nuraft::ptr<nuraft::buffer> override;
auto pre_commit(uint64_t log_idx, nuraft::buffer& data)
-> nuraft::ptr<nuraft::buffer> override;
auto rollback(uint64_t log_idx, nuraft::buffer& data) -> void override;
auto last_commit_index() -> uint64_t override;
private:
std::shared_ptr<TransactionApplier> applier_;
std::atomic<uint64_t> last_committed_idx_{0};
};
The commit() method executes when Raft determines an entry is safely replicated:
auto ReplicationStateMachine::commit(uint64_t log_idx, nuraft::buffer& data)
-> nuraft::ptr<nuraft::buffer> {
// Deserialize log entry
auto entry = TransactionLogEntry::deserialize(data);
// Apply to database
auto status = applier_->apply_log_entry(entry);
if (!status.ok()) {
LOG_ERROR("Failed to apply log entry {}: {}", log_idx, status.message());
}
// Update commit index with release semantics
last_committed_idx_.store(log_idx, std::memory_order_release);
return nuraft::buffer::alloc(0); // Success
}
26.5.2 Transaction Log Entries
Each log entry encapsulates a complete transaction (src/cognica/replication/transaction/log.hpp):
struct TransactionLogEntry {
uint64_t sequence_number; // Monotonic sequence
uint64_t term; // Raft term (fencing token)
TransactionType type; // WRITE, DELETE, etc.
std::vector<Operation> ops; // Individual operations
auto serialize() const -> nuraft::ptr<nuraft::buffer>;
static auto deserialize(nuraft::buffer& buf) -> TransactionLogEntry;
};
struct Operation {
OperationType type;
std::string collection;
std::string key;
std::string value; // Empty for deletes
};
The term field acts as a fencing token, preventing stale leaders from applying outdated transactions (discussed in Section 26.8).
26.5.3 The Transaction Applier
The TransactionApplier (src/cognica/replication/transaction/applier.hpp) converts log entries to RocksDB operations:
class TransactionApplier {
public:
auto apply_log_entry(const TransactionLogEntry& entry) -> ReplicationStatus;
auto last_applied_sequence() const -> uint64_t;
auto set_leader_term(uint64_t term) -> void;
private:
auto validate_sequence_(uint64_t seq) -> ReplicationStatus;
auto validate_term_(uint64_t term) -> ReplicationStatus;
auto convert_to_write_batch_(const TransactionLogEntry& entry) -> WriteBatch;
std::shared_ptr<rocksdb::DB> db_;
std::atomic<uint64_t> last_applied_sequence_{0};
std::atomic<uint64_t> current_leader_term_{0};
};
Application follows a strict sequence:
auto TransactionApplier::apply_log_entry(const TransactionLogEntry& entry)
-> ReplicationStatus {
// 1. Validate sequence ordering
auto status = validate_sequence_(entry.sequence_number);
if (!status.ok()) {
return status;
}
// 2. Validate term (fencing)
status = validate_term_(entry.term);
if (!status.ok()) {
return status;
}
// 3. Convert to RocksDB WriteBatch
auto batch = convert_to_write_batch_(entry);
// 4. Apply atomically
auto write_options = rocksdb::WriteOptions();
write_options.sync = true;
auto rdb_status = db_->Write(write_options, &batch);
if (!rdb_status.ok()) {
return ReplicationStatus::StorageError(rdb_status.ToString());
}
// 5. Update last applied sequence
last_applied_sequence_.store(entry.sequence_number,
std::memory_order_release);
return ReplicationStatus::OK();
}
26.6 Leader Election Implementation
26.6.1 Election Timeout Management
Cognica configures election timeouts through NuRaft parameters:
void ReplicationManager::configure_raft_params_() {
raft_params_.election_timeout_lower_bound_ =
options_.election_timeout.count();
raft_params_.election_timeout_upper_bound_ =
options_.election_timeout.count() * 2;
raft_params_.heart_beat_interval_ =
options_.heartbeat_interval.count();
}
The randomized timeout prevents synchronized elections that could cause repeated split votes. With 5-10 second timeouts and 1 second heartbeats, the probability of split votes is:
For a 3-node cluster: .
26.6.2 State Transitions
Cognica tracks node roles independently of NuRaft for application-level logic (src/cognica/replication/core/state.hpp):
class ReplicationState {
public:
enum class Role { kPrimary, kSecondary, kRecovering };
auto transition(Role from, Role to) -> bool;
auto current_role() const -> Role;
auto is_primary() const -> bool;
auto is_secondary() const -> bool;
auto is_recovering() const -> bool;
using StateChangeCallback = std::function<void(Role, Role)>;
auto set_callback(StateChangeCallback cb) -> void;
private:
std::atomic<Role> role_{Role::kSecondary};
std::mutex mutex_;
StateChangeCallback callback_;
};
Valid transitions form a state machine:
auto ReplicationState::transition(Role from, Role to) -> bool {
static const std::set<std::pair<Role, Role>> valid_transitions = {
{Role::kSecondary, Role::kPrimary}, // Won election
{Role::kSecondary, Role::kRecovering}, // Starting sync
{Role::kPrimary, Role::kSecondary}, // Lost leadership
{Role::kPrimary, Role::kRecovering}, // Resync needed
{Role::kRecovering, Role::kSecondary}, // Sync completed
};
if (valid_transitions.count({from, to}) == 0) {
return false;
}
std::lock_guard lock(mutex_);
if (role_.load() != from) {
return false; // State changed concurrently
}
role_.store(to);
// Execute callback outside lock
if (callback_) {
callback_(from, to);
}
return true;
}
26.6.3 Becoming Leader
When NuRaft notifies Cognica of leadership acquisition:
void ReplicationManager::on_become_leader_() {
LOG_INFO("Node {} became leader for term {}",
current_node_->id(), raft_server_->get_term());
// Update role state
state_.transition(ReplicationState::Role::kSecondary,
ReplicationState::Role::kPrimary);
// Start replicator for async secondary updates
replicator_->start_as_primary();
// Update metrics
metrics_.leader_elections.fetch_add(1, std::memory_order_relaxed);
// Audit log
audit_logger_->log_event(AuditEvent::LeaderElected{
.node_id = current_node_->id(),
.term = raft_server_->get_term()
});
// Notify application callbacks
if (leader_callback_) {
leader_callback_(current_node_->id());
}
}
26.7 Snapshot Mechanism
26.7.1 RocksDB Checkpoint Integration
Cognica leverages RocksDB's checkpoint feature for snapshots, providing a consistent point-in-time view without blocking writes:
auto ReplicationStateMachine::create_snapshot(
nuraft::snapshot& s,
nuraft::async_result<bool>::handler_type& when_done) -> void {
auto log_idx = s.get_last_log_idx();
auto snapshot_dir = snapshot_path_ / fmt::format("snapshot_{}", log_idx);
// Create RocksDB checkpoint
rocksdb::Checkpoint* checkpoint = nullptr;
auto status = rocksdb::Checkpoint::Create(db_.get(), &checkpoint);
if (!status.ok()) {
when_done(false, nuraft::cmd_result_code::FAILED);
return;
}
status = checkpoint->CreateCheckpoint(snapshot_dir.string());
delete checkpoint;
if (!status.ok()) {
when_done(false, nuraft::cmd_result_code::FAILED);
return;
}
// Track snapshot
last_snapshot_ = std::make_shared<SnapshotInfo>(log_idx, snapshot_dir);
// Cleanup old snapshots (keep 3)
cleanup_old_snapshots_(3);
when_done(true, nuraft::cmd_result_code::OK);
}
26.7.2 Snapshot Directory Structure
{db_path}/raft/{node_id}/snapshots/
snapshot_1000/
MANIFEST-000001
000001.sst
000002.sst
...
OPTIONS-000001
snapshot_2000/
...
Each snapshot contains a complete RocksDB checkpoint, enabling followers to bootstrap from scratch.
26.7.3 Chunk-Based Transfer
For large snapshots, Cognica transfers data in 1 MB chunks:
auto ReplicationStateMachine::read_logical_snp_obj(
nuraft::snapshot& s,
void*& user_ctx,
uint64_t obj_id,
nuraft::ptr<nuraft::buffer>& data_out,
bool& is_last_obj) -> int {
static constexpr size_t kChunkSize = 1 * 1024 * 1024; // 1 MB
auto* ctx = static_cast<SnapshotReadContext*>(user_ctx);
if (!ctx) {
ctx = new SnapshotReadContext(s.get_last_log_idx());
user_ctx = ctx;
}
// Read next chunk
std::vector<char> buffer(kChunkSize);
auto bytes_read = ctx->read_chunk(buffer.data(), kChunkSize);
data_out = nuraft::buffer::alloc(bytes_read);
data_out->put_raw(buffer.data(), bytes_read);
is_last_obj = ctx->is_complete();
return 0;
}
26.7.4 Snapshot Application
Followers apply received snapshots:
auto ReplicationStateMachine::apply_snapshot(nuraft::snapshot& s) -> bool {
auto log_idx = s.get_last_log_idx();
auto snapshot_dir = snapshot_path_ / fmt::format("snapshot_{}", log_idx);
// Verify snapshot files exist
if (!std::filesystem::exists(snapshot_dir)) {
LOG_ERROR("Snapshot directory does not exist: {}", snapshot_dir);
return false;
}
// Extract last sequence number from snapshot metadata
auto last_seq = extract_sequence_from_snapshot_(snapshot_dir);
// Reset applier state
applier_->reset(last_seq);
// Update commit index
last_committed_idx_.store(log_idx, std::memory_order_release);
// Track as current snapshot
last_snapshot_ = std::make_shared<SnapshotInfo>(log_idx, snapshot_dir);
LOG_INFO("Applied snapshot at index {} with sequence {}", log_idx, last_seq);
return true;
}
26.8 Term-Based Fencing
26.8.1 The Split-Brain Problem
Network partitions can create scenarios where a stale leader continues operating:
Time 0: [A=Leader, B, C] - Cluster healthy
Time 1: Network partition isolates A
Time 2: B elected new leader (term 2)
Time 3: A still believes it's leader (term 1)
Without protection, both A and B could accept writes, causing divergent states.
26.8.2 Fencing Tokens
Cognica embeds the Raft term in every log entry as a fencing token:
struct TransactionLogEntry {
uint64_t term; // Fencing token
// ...
};
During application, the applier validates the term:
auto TransactionApplier::validate_term_(uint64_t entry_term) -> ReplicationStatus {
auto current_term = current_leader_term_.load(std::memory_order_acquire);
if (entry_term < current_term) {
return ReplicationStatus::StaleTerm(
fmt::format("Entry term {} < current term {}", entry_term, current_term));
}
if (entry_term > current_term) {
// New leader with higher term
current_leader_term_.store(entry_term, std::memory_order_release);
}
return ReplicationStatus::OK();
}
26.8.3 Preventing Stale Writes
The fencing mechanism ensures:
- Monotonic terms: Each leader has a strictly higher term than predecessors
- Term validation: Entries from old terms are rejected
- Automatic recovery: New leader's higher term invalidates stale entries
When the partitioned leader A reconnects:
Time 4: A discovers term 2 > term 1
Time 5: A steps down to follower
Time 6: A discards uncommitted entries from term 1
Time 7: A receives log from B (new leader)
26.9 Two-Phase Commit Integration
26.9.1 The Coordination Challenge
Cognica must coordinate Raft consensus with RocksDB transactions. Naive approaches fail:
Problem 1: Commit before Raft
// WRONG: Data visible before replication
txn->Commit();
raft->append(txn); // Crash here = data lost on secondaries
Problem 2: Raft before Commit
// WRONG: Sequence number unknown until commit
raft->append(txn); // What sequence number?
txn->Commit(); // RocksDB assigns sequence
26.9.2 Cognica's Two-Phase Protocol
Cognica uses RocksDB's two-phase commit (2PC) feature:
Phase 1: PREPARE
- Write transaction to WAL
- Data not yet visible
- Survives crash
Phase 2: COMMIT
- Make data visible
- Assign sequence number
The full replication flow:
auto ReplicationManager::replicate_transaction(Transaction* txn)
-> ReplicationStatus {
// Step 1: Prepare (durable but invisible)
auto status = txn->Prepare();
if (!status.ok()) {
return ReplicationStatus::PrepareError(status.ToString());
}
// Step 2: Assign sequence and append to Raft
uint64_t log_idx;
status = append_transaction_to_raft_(txn, &log_idx);
if (!status.ok()) {
txn->Rollback();
return status;
}
// Step 3: Wait for Raft commit (majority acknowledgment)
status = wait_for_raft_commit_(log_idx, options_.commit_timeout);
if (!status.ok()) {
write_abort_record_(txn);
txn->Rollback();
return status;
}
// Step 4: Commit locally (make visible)
status = txn->Commit();
if (!status.ok()) {
write_abort_record_(txn);
return ReplicationStatus::CommitError(status.ToString());
}
// Step 5: Async replication to secondaries
replicator_->replicate_transaction(txn, txn->sequence_number());
return ReplicationStatus::OK();
}
26.9.3 Sequence Number Management
A critical challenge: RocksDB assigns sequence numbers at commit time, but Raft needs the sequence number before commit (for ordering). Cognica solves this with a centralized sequence generator:
class SequenceGenerator {
public:
auto next_sequence() -> uint64_t {
return next_sequence_.fetch_add(1, std::memory_order_acq_rel);
}
auto set_sequence(uint64_t seq) -> void {
next_sequence_.store(seq, std::memory_order_release);
}
private:
std::atomic<uint64_t> next_sequence_{1};
};
The replication manager assigns sequences before Raft append:
auto ReplicationManager::append_transaction_to_raft_(
Transaction* txn,
uint64_t* out_log_idx) -> ReplicationStatus {
// Assign sequence BEFORE Raft append
auto seq_no = sequence_generator_->next_sequence();
txn->set_assigned_sequence(seq_no);
// Get current term for fencing
auto current_term = raft_server_->get_term();
// Build log entry
auto entry = TransactionLogEntry{
.sequence_number = seq_no,
.term = current_term,
.ops = txn->operations()
};
// Append to Raft
auto result = raft_server_->append_entries({entry.serialize()});
if (result->get_result_code() != nuraft::cmd_result_code::OK) {
return ReplicationStatus::RaftError(result->get_result_str());
}
*out_log_idx = raft_server_->get_last_log_idx();
return ReplicationStatus::OK();
}
26.9.4 Waiting for Raft Commit
After appending, the leader waits for majority acknowledgment:
auto ReplicationManager::wait_for_raft_commit_(
uint64_t log_idx,
std::chrono::milliseconds timeout) -> ReplicationStatus {
auto deadline = std::chrono::steady_clock::now() + timeout;
while (std::chrono::steady_clock::now() < deadline) {
auto committed_idx = raft_server_->get_committed_log_idx();
if (committed_idx >= log_idx) {
return ReplicationStatus::OK();
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
return ReplicationStatus::Timeout(
fmt::format("Timed out waiting for log index {} (committed: {})",
log_idx, raft_server_->get_committed_log_idx()));
}
The 1 ms polling interval balances responsiveness against CPU usage. Under normal operation, Raft commits within a few milliseconds.
26.10 Crash Recovery
26.10.1 Recovery Scenarios
Crashes can occur at any point in the two-phase protocol. Cognica handles each scenario:
Scenario 1: Crash after PREPARE, before Raft LOG
- RocksDB finds prepared transaction on restart
- No Raft log entry exists
- Action: Rollback (transaction never replicated)
Scenario 2: Crash after Raft LOG, before local COMMIT
- Raft log entry exists with assigned sequence
- RocksDB has prepared transaction
- Action: Abort and write ABORT record (secondaries skip this sequence)
Scenario 3: Crash after local COMMIT, before async replication
- Transaction committed locally
- Secondaries may not have received it
- Action: Raft consensus handles recovery (entry already committed)
26.10.2 Recovery Implementation
auto ReplicationManager::recover_uncommitted_transactions_() -> void {
// Get all prepared transactions from RocksDB
std::vector<Transaction*> prepared;
db_->GetAllPreparedTransactions(&prepared);
for (auto* txn : prepared) {
auto seq = txn->assigned_sequence();
// Check if sequence exists in Raft log
auto log_entry = log_store_->entry_at(find_by_sequence_(seq));
if (!log_entry) {
// Scenario 1: Not in Raft log, rollback
LOG_INFO("Rolling back prepared transaction with seq {}", seq);
txn->Rollback();
} else if (!is_committed_in_raft_(log_entry->index())) {
// Scenario 2: In log but not committed, abort
LOG_INFO("Aborting uncommitted transaction with seq {}", seq);
write_abort_record_(seq);
txn->Rollback();
} else {
// Scenario 3: Committed in Raft, commit locally
LOG_INFO("Completing committed transaction with seq {}", seq);
txn->Commit();
}
}
}
26.10.3 Abort Records
When a transaction cannot complete, Cognica writes an abort record to the Raft log:
auto ReplicationManager::write_abort_record_(uint64_t sequence) -> void {
auto entry = TransactionLogEntry{
.sequence_number = sequence,
.term = raft_server_->get_term(),
.type = TransactionType::ABORT
};
raft_server_->append_entries({entry.serialize()});
}
Secondary nodes process abort records by skipping the sequence:
auto TransactionApplier::apply_log_entry(const TransactionLogEntry& entry)
-> ReplicationStatus {
if (entry.type == TransactionType::ABORT) {
// Skip this sequence number
advance_sequence_past_(entry.sequence_number);
return ReplicationStatus::OK();
}
// Normal application...
}
26.11 Gap Detection and Synchronization
26.11.1 Sequence Gaps
Network issues or node failures can cause sequence gaps on secondary nodes:
Primary: [1, 2, 3, 4, 5, 6, 7]
Secondary: [1, 2, 3, _, _, 6, 7] // Gap at 4, 5
26.11.2 Gap Detection
The TransactionApplier detects gaps during application:
auto TransactionApplier::validate_sequence_(uint64_t incoming_seq)
-> ReplicationStatus {
auto expected = last_applied_sequence_.load() + 1;
if (incoming_seq == expected) {
return ReplicationStatus::OK();
}
if (incoming_seq < expected) {
// Duplicate, already applied
return ReplicationStatus::Duplicate();
}
// Gap detected
auto gap = incoming_seq - expected;
if (gap <= kSmallGapThreshold) {
// Small gap, can recover from log
return ReplicationStatus::LogSyncRequired(expected, incoming_seq);
}
if (gap <= kLargeGapThreshold) {
// Medium gap, try log sync first
return ReplicationStatus::LogSyncRequired(expected, incoming_seq);
}
// Large gap, need snapshot
return ReplicationStatus::SnapshotSyncRequired();
}
26.11.3 Recovery Strategies
Gap size determines recovery strategy:
| Gap Size | Strategy | Mechanism |
|---|---|---|
| 1-10 | Log Sync | Fetch missing entries from leader |
| 11-100 | Log Sync | Batch fetch with verification |
| > 100 | Snapshot | Full state transfer |
void ReplicationManager::handle_sync_required_(const ReplicationStatus& status) {
if (status.is_log_sync_required()) {
auto [start, end] = status.gap_range();
request_log_entries_(leader_id_, start, end);
} else if (status.is_snapshot_sync_required()) {
state_.transition(ReplicationState::Role::kSecondary,
ReplicationState::Role::kRecovering);
request_snapshot_(leader_id_);
}
}
26.12 Cluster Membership
26.12.1 Configuration Changes
Raft supports dynamic cluster membership through configuration changes. Cognica uses NuRaft's joint consensus approach:
auto ReplicationManager::add_node(const NodeInfo& node)
-> ReplicationStatus {
if (!is_leader()) {
return ReplicationStatus::NotLeader();
}
// Create server configuration
auto srv_config = nuraft::srv_config(
node.id,
node.endpoint,
/* learner */ false);
auto result = raft_server_->add_srv(srv_config);
if (result->get_result_code() != nuraft::cmd_result_code::OK) {
return ReplicationStatus::ConfigError(result->get_result_str());
}
return ReplicationStatus::OK();
}
26.12.2 Safe Membership Changes
Membership changes follow a two-phase protocol:
- Joint consensus: Both old and new configurations active
- New configuration: Only new configuration active
This ensures no split-brain during transitions:
26.12.3 Learner Nodes
Cognica supports learner (non-voting) nodes for:
- Geographic read replicas
- Backup nodes
- Staged rollouts
auto ReplicationManager::add_learner(const NodeInfo& node)
-> ReplicationStatus {
auto srv_config = nuraft::srv_config(
node.id,
node.endpoint,
/* learner */ true); // Non-voting
return add_server_(srv_config);
}
Learners receive log entries but don't participate in elections or quorum calculations.
26.13 Performance Considerations
26.13.1 Latency Breakdown
A typical write operation's latency:
| Phase | Typical Latency | Description |
|---|---|---|
| Prepare | 0.1-0.5 ms | RocksDB WAL write |
| Raft Append | 0.1-0.2 ms | Local log write |
| Replication | 1-10 ms | Network RTT to majority |
| Commit | 0.1-0.3 ms | Make visible |
| Total | 1.5-11 ms | End-to-end |
Network latency dominates in distributed settings.
26.13.2 Throughput Optimization
Cognica employs several techniques for high throughput:
Batching: Multiple transactions share a single Raft round-trip:
class TransactionBatcher {
public:
void add(Transaction* txn) {
std::lock_guard lock(mutex_);
pending_.push_back(txn);
if (pending_.size() >= max_batch_size_ ||
deadline_reached_()) {
flush_();
}
}
private:
void flush_() {
auto batch = std::move(pending_);
// Single Raft append for entire batch
raft_->append_batch(batch);
}
size_t max_batch_size_ = 100;
std::chrono::milliseconds max_delay_ = 10ms;
};
Pipelining: Overlapping Raft phases:
Transaction 1: [Prepare][Append][Wait.......][Commit]
Transaction 2: [Prepare][Append][Wait.......][Commit]
Transaction 3: [Prepare][Append][Wait.......][Commit]
Async Replication: Secondary updates don't block primary:
void Replicator::replicate_transaction(Transaction* txn, uint64_t seq) {
// Fire and forget
thread_pool_->enqueue([=] {
for (auto& node : secondary_nodes_) {
send_batch_(node, {txn});
}
});
}
26.13.3 Disk I/O Optimization
Raft's durability requirements demand careful I/O management:
Group Commit: Batch multiple entries before fsync:
void RaftLogStore::flush_pending_() {
// Write all pending entries
for (auto& entry : pending_entries_) {
write_entry_(entry);
}
// Single fsync for all entries
fsync(current_segment_fd_);
pending_entries_.clear();
}
Direct I/O: Bypass OS page cache for predictable latency:
int open_segment_(const std::string& path) {
return open(path.c_str(),
O_RDWR | O_CREAT | O_DIRECT,
0644);
}
26.14 Monitoring and Observability
26.14.1 Key Metrics
Cognica exposes Raft metrics for monitoring:
struct RaftMetrics {
std::atomic<uint64_t> leader_elections{0};
std::atomic<uint64_t> append_entries_sent{0};
std::atomic<uint64_t> append_entries_received{0};
std::atomic<uint64_t> commits{0};
std::atomic<uint64_t> snapshots_created{0};
std::atomic<uint64_t> snapshots_applied{0};
std::atomic<uint64_t> commit_latency_us{0};
std::atomic<uint64_t> replication_lag_entries{0};
};
26.14.2 Health Checks
Regular health assessment:
auto ReplicationManager::health_status() const -> HealthStatus {
HealthStatus status;
status.is_leader = is_leader();
status.current_term = raft_server_->get_term();
status.commit_index = raft_server_->get_committed_log_idx();
status.last_log_index = log_store_->last_entry()->get_idx();
status.cluster_size = raft_server_->get_srv_config_all().size();
// Check for concerning conditions
status.warnings = {};
auto lag = status.last_log_index - status.commit_index;
if (lag > 100) {
status.warnings.push_back("High commit lag: " + std::to_string(lag));
}
if (!is_leader() && !received_heartbeat_recently_()) {
status.warnings.push_back("No recent heartbeat from leader");
}
return status;
}
26.14.3 Audit Logging
Security-relevant events are logged for compliance:
class AuditLogger {
public:
void log_event(const AuditEvent& event) {
auto json = serialize_event_(event);
auto checksum = compute_checksum_(json);
writer_.write({
.timestamp = std::chrono::system_clock::now(),
.event = json,
.checksum = checksum
});
}
private:
AsyncWriter writer_;
};
Audit events include:
- Leader elections
- Configuration changes
- Authentication attempts
- Authorization decisions
- Replication errors
26.15 Summary
Cognica's Raft implementation provides strong consistency guarantees for distributed database operations:
- Safety: Committed transactions are never lost, even under failures
- Linearizability: Operations appear atomic and ordered
- Availability: Cluster tolerates minority node failures
- Recovery: Automatic failover and state synchronization
The integration with RocksDB's two-phase commit, combined with term-based fencing and sequence management, ensures correctness across the complex interaction between local storage and distributed consensus.
Key implementation highlights:
- NuRaft foundation with custom log storage and state machine
- Segmented persistent log with 64 MB segments and crash-safe writes
- RocksDB checkpoint-based snapshots for efficient state transfer
- Centralized sequence generation solving the prepare-before-commit ordering problem
- Comprehensive crash recovery handling all failure scenarios
The result is a production-grade distributed storage system that maintains ACID guarantees across multiple nodes while achieving sub-10ms commit latencies under normal operation.