Chapter 27: Transaction Processing
27.1 Transaction Fundamentals
27.1.1 The ACID Properties
Database transactions provide four fundamental guarantees, collectively known as ACID:
Atomicity: A transaction executes completely or not at all. If any operation fails, all preceding operations are rolled back, leaving the database unchanged. Atomicity transforms a sequence of operations into a single logical unit.
Consistency: Transactions transform the database from one valid state to another. All integrity constraints—uniqueness, foreign keys, check constraints—hold before and after the transaction. Violations abort the transaction.
Isolation: Concurrent transactions execute as if serialized. Each transaction sees a consistent snapshot of the database, unaffected by concurrent modifications. The isolation level determines exactly which anomalies are permitted.
Durability: Once a transaction commits, its effects persist despite subsequent failures. The database writes committed data to stable storage before acknowledging the commit.
27.1.2 Isolation Levels
SQL defines four standard isolation levels, each permitting progressively fewer anomalies:
| Isolation Level | Dirty Reads | Non-Repeatable Reads | Phantom Reads |
|---|---|---|---|
| Read Uncommitted | Possible | Possible | Possible |
| Read Committed | Prevented | Possible | Possible |
| Repeatable Read | Prevented | Prevented | Possible |
| Serializable | Prevented | Prevented | Prevented |
Dirty Read: Transaction T1 reads data written by T2 before T2 commits. If T2 aborts, T1 has read data that never existed.
Non-Repeatable Read: T1 reads a row, T2 modifies or deletes that row and commits, T1 re-reads and gets a different result.
Phantom Read: T1 reads rows matching a predicate, T2 inserts new rows matching the predicate and commits, T1 re-executes the query and sees additional rows.
27.1.3 Snapshot Isolation
Cognica implements Snapshot Isolation (SI), a consistency model stronger than Repeatable Read but weaker than Serializable:
Under snapshot isolation:
- Each transaction reads from a consistent snapshot taken at transaction start
- Writes are buffered until commit
- At commit, the system checks for write-write conflicts
- If two concurrent transactions modify the same row, the second to commit fails
Snapshot isolation prevents dirty reads, non-repeatable reads, and most phantom reads. It permits write skew, where two transactions read overlapping data and make disjoint updates that together violate an invariant:
Initial: x = y = 0, invariant: x + y >= 0
T1: read x (0), read y (0), write x = -1
T2: read x (0), read y (0), write y = -1
Both commit successfully
Final: x = -1, y = -1, invariant violated!
For most OLTP workloads, snapshot isolation provides excellent consistency with superior concurrency compared to serializable isolation.
27.2 Transaction Architecture
27.2.1 Component Overview
Cognica's transaction system spans multiple layers:
27.2.2 The Transaction Interface
The abstract Transaction interface (src/cognica/db/txn/transaction.hpp) defines the contract for all transaction implementations:
class Transaction {
public:
virtual ~Transaction() = default;
// Write operations
virtual auto put(const Slice& key, const Slice& value) -> Status = 0;
virtual auto put(ColumnFamilyHandle* cf, const Slice& key,
const Slice& value) -> Status = 0;
virtual auto merge(const Slice& key, const Slice& value) -> Status = 0;
virtual auto remove(const Slice& key) -> Status = 0;
// Read operations
virtual auto get(const Slice& key, std::string* value) -> Status = 0;
virtual auto get(const ReadOptions& options, const Slice& key,
PinnableSlice* value) -> Status = 0;
virtual auto multi_get(const std::vector<Slice>& keys,
std::vector<std::string>* values) -> std::vector<Status> = 0;
// Lifecycle
virtual auto commit() -> Status = 0;
virtual auto rollback() -> Status = 0;
// Two-phase commit
virtual auto set_name(const std::string& name) -> Status = 0;
virtual auto prepare() -> Status = 0;
// Savepoints
virtual auto set_save_point() -> void = 0;
virtual auto rollback_to_save_point() -> Status = 0;
virtual auto pop_save_point() -> Status = 0;
// Properties
virtual auto can_read_own_write() const -> bool = 0;
virtual auto get_write_batch() -> WriteBatch* = 0;
};
27.2.3 Transaction Implementations
Cognica provides several transaction implementations for different use cases:
SimpleTransaction: The primary implementation wrapping RocksDB's transaction API. Provides full ACID guarantees with snapshot isolation.
class SimpleTransaction final : public Transaction {
public:
SimpleTransaction(rocksdb::TransactionDB* db,
const rocksdb::WriteOptions& write_options,
const rocksdb::TransactionOptions& txn_options);
auto can_read_own_write() const -> bool override { return true; }
private:
std::unique_ptr<rocksdb::Transaction> txn_;
rocksdb::TransactionDB* db_;
};
PrefixedTransaction: Wraps another transaction with automatic key prefixing for keyspace isolation:
class PrefixedTransaction final : public Transaction {
public:
PrefixedTransaction(std::unique_ptr<Transaction> inner,
std::string prefix);
auto put(const Slice& key, const Slice& value) -> Status override {
auto prefixed_key = prefix_ + key.ToString();
return inner_->put(prefixed_key, value);
}
private:
std::unique_ptr<Transaction> inner_;
std::string prefix_;
};
NullTransaction: A read-only transaction that rejects all writes. Used for read-only queries:
class NullTransaction final : public Transaction {
public:
auto put(const Slice&, const Slice&) -> Status override {
return Status::NotSupported("NullTransaction is read-only");
}
auto commit() -> Status override {
return Status::OK(); // No-op
}
};
27.3 Transaction Lifecycle
27.3.1 Begin Phase
Transaction creation captures a consistent snapshot of the database:
auto TransactionDB::begin_transaction(const WriteOptions& write_opts,
const TransactionOptions& txn_opts)
-> std::unique_ptr<Transaction> {
// Create RocksDB transaction
auto* rdb_txn = db_->BeginTransaction(write_opts, txn_opts);
// Optionally set snapshot for consistent reads
if (txn_opts.set_snapshot) {
rdb_txn->SetSnapshot();
}
return std::make_unique<SimpleTransaction>(rdb_txn, db_);
}
The snapshot captures the database state at a specific sequence number. All subsequent reads see only data committed before this sequence:
27.3.2 Operations Phase
During execution, writes accumulate in a WriteBatch while reads query the snapshot:
Write Operations:
auto SimpleTransaction::put(const Slice& key, const Slice& value) -> Status {
// Buffer in WriteBatch (no disk I/O yet)
return txn_->Put(key, value);
}
auto SimpleTransaction::remove(const Slice& key) -> Status {
return txn_->Delete(key);
}
auto SimpleTransaction::merge(const Slice& key, const Slice& value) -> Status {
return txn_->Merge(key, value);
}
Read Operations:
auto SimpleTransaction::get(const Slice& key, std::string* value) -> Status {
ReadOptions read_opts;
read_opts.snapshot = txn_->GetSnapshot();
// First check WriteBatch for uncommitted writes
// Then query database snapshot
return txn_->Get(read_opts, key, value);
}
The WriteBatchWithIndex provides read-your-own-writes semantics: uncommitted writes are visible to subsequent reads within the same transaction.
27.3.3 Commit Phase
Commit validates the transaction and atomically applies all buffered writes:
auto SimpleTransaction::commit() -> Status {
// Notify observers (replication hook)
if (observer_) {
auto status = observer_->before_commit(this);
if (!status.ok()) {
return status;
}
}
// Atomic commit to RocksDB
auto status = txn_->Commit();
if (status.ok()) {
// Capture sequence number assigned by RocksDB
auto seq = db_->GetLatestSequenceNumber();
// Notify observers of successful commit
if (observer_) {
observer_->on_commit(seq, *txn_->GetWriteBatch()->GetWriteBatch(), this);
}
} else {
// Notify observers of failure
if (observer_) {
observer_->on_commit_failed(this);
}
}
return status;
}
27.3.4 Rollback
Rollback discards all buffered writes without modifying the database:
auto SimpleTransaction::rollback() -> Status {
return txn_->Rollback();
}
Since writes never reach the database until commit, rollback is instantaneous—it simply discards the WriteBatch.
27.4 Concurrency Control
27.4.1 Optimistic vs. Pessimistic Concurrency
Cognica employs optimistic concurrency control (OCC):
Optimistic Approach:
- Transactions execute without acquiring locks
- At commit, system validates for conflicts
- If conflict detected, transaction aborts
- Application retries with new snapshot
Pessimistic Approach (alternative):
- Transactions acquire locks before accessing data
- Locks held until commit or rollback
- Conflicts impossible but deadlocks possible
- Lower concurrency due to lock contention
OCC excels when conflicts are rare—the common case for most workloads. Under high contention, pessimistic locking may perform better by avoiding repeated retries.
27.4.2 Write-Write Conflict Detection
RocksDB's TransactionDB detects write-write conflicts using WriteBatchWithIndex:
// Internally, RocksDB tracks keys modified by each transaction
class WriteBatchWithIndex {
// Index of keys in this batch
std::map<std::string, WriteEntry> index_;
public:
auto Put(const Slice& key, const Slice& value) -> Status {
// Check for concurrent modification
if (is_key_locked_by_other_transaction(key)) {
return Status::Busy("Write conflict");
}
index_[key.ToString()] = WriteEntry{kPut, value};
return Status::OK();
}
};
When two transactions modify the same key:
T1: begin
T2: begin
T1: put("x", "1") // Succeeds, T1 "locks" key x
T2: put("x", "2") // Returns Status::Busy (conflict detected)
T1: commit // Succeeds
T2: rollback // T2 must abort and retry
27.4.3 MVCC and Sequence Numbers
Multi-Version Concurrency Control enables snapshot isolation without blocking readers:
Version Chain:
Key "x": [v3@seq=100] -> [v2@seq=50] -> [v1@seq=10]
|
newest version
Each value carries a sequence number indicating when it was committed. Reads select the appropriate version:
Example:
Snapshot at seq=75 reading key "x":
- v3@seq=100: invisible (100 > 75)
- v2@seq=50: visible! (50 <= 75)
- Result: v2
27.4.4 Deadlock Freedom
Optimistic concurrency control is inherently deadlock-free:
- No locks held during execution
- Conflicts detected at commit time
- Losing transaction aborts immediately
- No circular wait possible
This contrasts with pessimistic systems where deadlock detection and resolution add complexity.
27.5 Serializable Snapshot Isolation (SSI)
27.5.1 Beyond Snapshot Isolation
Snapshot Isolation (SI) provides strong consistency for most workloads, but it does not prevent all anomalies. The classic write skew anomaly occurs when two concurrent transactions each read a value, make a decision based on it, and write to different keys — producing a state that neither transaction would have allowed if it had seen the other's write:
T1: read(x) = 10, read(y) = 10 -- constraint: x + y >= 10
T2: read(x) = 10, read(y) = 10 -- constraint: x + y >= 10
T1: write(x = 0) -- believes y = 10, so x + y = 10 >= 10
T2: write(y = 0) -- believes x = 10, so x + y = 10 >= 10
-- Result: x = 0, y = 0, violating x + y >= 10
Under SI, both transactions commit successfully because they wrote to different keys — no write-write conflict exists. Serializable Snapshot Isolation (SSI) detects these anomalies by tracking read-write dependencies between concurrent transactions.
27.5.2 The Dangerous Structure
SSI is based on the observation (Cahill et al., 2008) that every non-serializable execution under SI contains a dangerous structure — a pattern of read-write anti-dependencies (rw-conflicts) involving three transactions:
where:
- read data that later wrote (outgoing rw-edge from )
- read data that later wrote (outgoing rw-edge from )
- is the pivot — it has both an incoming and outgoing rw-edge
When the pivot transaction has both in_conflict and out_conflict flags set, one transaction in the structure must be aborted to ensure serializability.
27.5.3 SIREAD Locks
SSI tracks reads using SIREAD locks — non-blocking read markers that record which documents a SERIALIZABLE transaction has read:
class SSIManager {
public:
// Record a read on a specific document
auto record_read(int64_t txn_id,
const std::string& collection,
const std::string& doc_id) -> void;
// Check if a write conflicts with existing reads
auto check_write(int64_t txn_id,
const std::string& collection,
const std::string& doc_id) -> Status;
// Final validation before commit
auto pre_commit(int64_t txn_id) -> Status;
private:
std::shared_mutex mutex_;
std::atomic<int64_t> next_txn_id_{1};
std::unordered_map<int64_t, TransactionState> transactions_;
};
SIREAD locks are fundamentally different from traditional locks:
- They do not block other transactions
- They are not released at commit time — they persist until all concurrent transactions have completed
- They serve as evidence of reads for conflict detection
27.5.4 Conflict Detection
When a transaction writes to a document, check_write() examines all active SSI transactions for SIREAD locks on that document:
auto SSIManager::check_write(int64_t txn_id,
const std::string& collection,
const std::string& doc_id) -> Status {
std::unique_lock lock{mutex_};
auto& writer = transactions_[txn_id];
writer.read_only = false;
writer.write_locks[collection].insert(doc_id);
// Check all other transactions for SIREAD locks
for (auto& [other_id, other_state] : transactions_) {
if (other_id == txn_id || other_state.aborted) {
continue;
}
bool has_read_lock =
other_state.promoted_collections.contains(collection) ||
other_state.read_locks[collection].contains(doc_id);
if (has_read_lock) {
// Create rw-conflict: other_state read, we're writing
create_rw_conflict_(other_id, txn_id);
}
}
// Check if this transaction is now a pivot
if (writer.in_conflict && writer.out_conflict && !writer.committed) {
writer.aborted = true;
return Status::Aborted(
"could not serialize access due to read/write "
"dependencies among transactions");
}
return Status::OK();
}
27.5.5 Lock Promotion
Per-document SIREAD locks consume memory proportional to the number of documents read. When a transaction accumulates more than 256 document-level locks in a single collection, they are promoted to a collection-level lock:
static constexpr size_t kLockPromotionThreshold = 256;
// After recording a read:
if (state.read_locks[collection].size() > kLockPromotionThreshold) {
state.promoted_collections.insert(collection);
state.read_locks.erase(collection);
}
After promotion, any write to the collection by another transaction creates a rw-conflict — a conservative but memory-efficient approach. This trades precision for bounded memory usage, following the same strategy used by PostgreSQL's SSI implementation (Ports & Grittner, 2012).
27.5.6 Read-Only Transaction Fast Path
Read-only transactions can never be the pivot of a dangerous structure because they have no outgoing writes — the in_conflict flag is never set. SSI exploits this:
auto SSIManager::pre_commit(int64_t txn_id) -> Status {
std::shared_lock lock{mutex_};
auto& state = transactions_[txn_id];
// Read-only transactions are always safe
if (state.read_only) {
return Status::OK();
}
// Check for dangerous structure
if (state.in_conflict && state.out_conflict) {
return Status::Aborted(...);
}
return Status::OK();
}
This optimization is significant — read-heavy SERIALIZABLE workloads pay almost no SSI overhead.
27.5.7 DEFERRABLE Mode
For SERIALIZABLE READ ONLY DEFERRABLE transactions, SSI tracking is entirely disabled. The transaction waits for a "safe snapshot" where no concurrent writers could produce a conflict:
if (isolation_level == IsolationLevel::kSerializable
&& read_only && deferrable && ssi_txn_id_ != 0) {
ssi_manager->on_abort(ssi_txn_id_);
ssi_txn_id_ = 0; // Disable SSI tracking entirely
}
This is useful for long-running read-only analytics queries that need SERIALIZABLE guarantees without contributing to conflict tracking overhead.
27.5.8 State Summarization and Cleanup
After a transaction commits, its per-document locks are summarized to collection-level to reduce memory:
auto SSIManager::on_commit(int64_t txn_id, uint64_t commit_seq) -> void {
auto& state = transactions_[txn_id];
state.committed = true;
state.commit_seq = commit_seq;
// Summarize: per-document -> collection-level
for (const auto& [collection, doc_ids] : state.read_locks) {
state.promoted_collections.insert(collection);
}
state.read_locks.clear();
for (const auto& [collection, doc_ids] : state.write_locks) {
state.written_collections.insert(collection);
}
state.write_locks.clear();
state.summarized = true;
// Cleanup old transactions
cleanup_finished_transactions_();
}
The cleanup routine removes committed transactions whose commit_seq is older than the minimum snapshot_seq of all active transactions — they can no longer conflict with any running transaction.
27.5.9 Error Handling
SSI aborts produce SQLSTATE 40001 (serialization_failure), the standard PostgreSQL error code for serialization conflicts. Applications should retry the entire transaction:
BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- ... operations ...
COMMIT;
-- If error 40001: retry the entire transaction from BEGIN
The SSI implementation guarantees that at least one transaction in a dangerous structure will succeed — the abort target is always an uncommitted transaction that can be safely retried.
27.6 Write-Ahead Logging
27.6.1 WAL Architecture
Cognica maintains two write-ahead logs:
- RocksDB WAL: Internal to RocksDB, records all mutations for crash recovery
- Transaction Log: Cognica's replication log, records complete transactions
The Transaction Log (src/cognica/replication/transaction/log.hpp) uses Protocol Buffers for serialization:
message TransactionLogEntry {
uint64 sequence = 1; // Unique identifier
string source_node = 2; // Originating node
int64 timestamp_ms = 3; // Unix timestamp
repeated LogOperation ops = 4; // Operations
uint64 term = 5; // Raft term (fencing)
}
message LogOperation {
OperationType type = 1; // Put, Delete, Merge, etc.
bytes key = 2;
bytes value = 3;
}
enum OperationType {
OP_PUT = 0;
OP_DELETE = 1;
OP_MERGE = 2;
OP_COMMIT = 3;
OP_ROLLBACK = 4;
OP_ABORT = 5;
}
27.6.2 Log Entry States
Each log entry progresses through states:
enum class PendingState : uint8_t {
kPending = 0, // Written to log, not yet committed
kCommitted = 1, // Successfully committed
kAborted = 2 // Commit failed, rolled back
};
State transitions:
27.6.3 Log Writer Implementation
The TransactionLogWriter (src/cognica/replication/transaction/log_writer.hpp) handles durable logging:
class TransactionLogWriter {
public:
// Assign sequence and write pending entry
auto prepare_entry_and_log_pending(Transaction* txn, SequenceNumber& out_seq)
-> ReplicationStatus {
// Atomically assign sequence
out_seq = ++latest_sequence_;
// Build log entry
auto entry = TransactionLogEntry{
.sequence = out_seq,
.source_node = node_id_,
.timestamp_ms = current_time_ms(),
.ops = extract_operations(txn),
.state = PendingState::kPending
};
// Write to log file
return write_entry_(entry);
}
// Mark entry as committed
auto finalize_as_committed(SequenceNumber seq) -> ReplicationStatus {
return update_state_(seq, PendingState::kCommitted);
}
// Mark entry as aborted
auto finalize_as_aborted(SequenceNumber seq) -> ReplicationStatus {
return update_state_(seq, PendingState::kAborted);
}
private:
std::atomic<SequenceNumber> latest_sequence_{0};
std::string current_log_path_;
static constexpr size_t kMaxLogSize = 64 * 1024 * 1024; // 64 MB
};
27.6.4 Log Rotation
When a log file exceeds the size threshold, the writer rotates to a new file:
auto TransactionLogWriter::rotate_log_if_needed_() -> ReplicationStatus {
if (current_log_size_ < kMaxLogSize) {
return ReplicationStatus::OK();
}
// Close current log
fsync(current_fd_);
close(current_fd_);
// Create new log file
auto new_path = fmt::format("{}/txn_{}.log", log_dir_,
format_timestamp(current_time()));
current_fd_ = open(new_path.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0644);
current_log_path_ = new_path;
current_log_size_ = 0;
// Update manifest
return update_manifest_();
}
The manifest tracks sequence ranges per log file, enabling efficient lookups during recovery:
manifest.json:
{
"logs": [
{"file": "txn_20240101120000.log", "start_seq": 1, "end_seq": 10000},
{"file": "txn_20240101130000.log", "start_seq": 10001, "end_seq": 20000},
...
]
}
27.7 Two-Phase Commit
27.7.1 The Coordination Problem
Distributed transactions must coordinate between local storage and remote replicas. Naive approaches fail:
Problem: Commit before replication
txn->commit(); // Data visible locally
replicate(txn); // Crash here = data lost on replicas
Problem: Replicate before commit
auto seq = replicate(txn); // Replicas have the data
txn->commit(); // Crash here = sequence gap on replicas
27.7.2 RocksDB Two-Phase Commit
RocksDB provides built-in 2PC support. In the PREPARE phase, writes become durable (in WAL) but invisible:
auto SimpleTransaction::prepare() -> Status {
// Writes go to WAL, not yet visible
return txn_->Prepare();
}
Prepared transactions survive crashes. On recovery, the database presents them for explicit commit or rollback:
void recover_prepared_transactions() {
std::vector<rocksdb::Transaction*> prepared;
db_->GetAllPreparedTransactions(&prepared);
for (auto* txn : prepared) {
// Application decides: commit or rollback
if (should_commit(txn)) {
txn->Commit();
} else {
txn->Rollback();
}
}
}
27.7.3 The Replication Protocol
Cognica's two-phase commit integrates RocksDB 2PC with Raft consensus:
Phase 1: PREPARE
1. Call txn->Prepare()
- Writes become durable in RocksDB WAL
- Data remains invisible
2. Assign replication sequence number
3. Set transaction name = sequence number
4. Write PENDING entry to transaction log
5. Append to Raft log
Phase 2: COMMIT
6. Wait for Raft consensus (majority acknowledgment)
7. Call txn->Commit()
- Data becomes visible
8. Write COMMITTED entry to transaction log
9. Return success to client
On Failure:
- Write ABORT entry to transaction log
- Call txn->Rollback()
- Return error to client
27.7.4 Implementation
The ReplicationManager orchestrates the two-phase protocol:
auto ReplicationManager::replicate_transaction(Transaction* txn)
-> CommitResult {
// Phase 1: Prepare
auto status = txn->Prepare();
if (!status.ok()) {
return CommitResult::PrepareError(status);
}
// Assign sequence number
SequenceNumber seq;
status = log_writer_->prepare_entry_and_log_pending(txn, seq);
if (!status.ok()) {
txn->Rollback();
return CommitResult::LogError(status);
}
// Set transaction name for recovery
txn->SetName(std::to_string(seq));
// Append to Raft
uint64_t log_idx;
status = append_to_raft_(txn, seq, &log_idx);
if (!status.ok()) {
log_writer_->finalize_as_aborted(seq);
txn->Rollback();
return CommitResult::RaftError(status);
}
// Phase 2: Wait for consensus
status = wait_for_raft_commit_(log_idx, commit_timeout_);
if (!status.ok()) {
log_writer_->finalize_as_aborted(seq);
txn->Rollback();
return CommitResult::TimeoutError(status);
}
// Commit locally
status = txn->Commit();
if (!status.ok()) {
log_writer_->finalize_as_aborted(seq);
return CommitResult::CommitError(status);
}
// Finalize as committed
log_writer_->finalize_as_committed(seq);
return CommitResult::Success(seq);
}
27.8 Crash Recovery
27.8.1 Recovery Scenarios
Crashes can occur at any point in the two-phase protocol. Each scenario requires specific handling:
Scenario 1: Crash after PREPARE, before LOG
- RocksDB has prepared transaction in WAL
- No entry in transaction log
- Recovery: Rollback (transaction never announced)
Scenario 2: Crash after LOG, before RAFT COMMIT
- Transaction log has PENDING entry
- Raft may or may not have the entry
- Recovery: Abort and write ABORT record
Scenario 3: Crash after RAFT COMMIT, before local COMMIT
- Transaction log has PENDING entry
- Raft has committed entry
- Recovery: Complete the commit
Scenario 4: Crash after local COMMIT, before LOG finalization
- Data is committed in RocksDB
- Transaction log may show PENDING
- Recovery: Finalize as COMMITTED
27.8.2 Recovery Implementation
void ReplicationManager::recover_() {
// Step 1: Handle RocksDB prepared transactions
std::vector<rocksdb::Transaction*> prepared;
db_->GetAllPreparedTransactions(&prepared);
for (auto* txn : prepared) {
auto name = txn->GetName();
if (name.empty()) {
// Scenario 1: No sequence assigned, just rollback
txn->Rollback();
continue;
}
auto seq = std::stoull(name);
// Check transaction log state
auto state = log_reader_->get_state(seq);
if (state == PendingState::kCommitted) {
// Scenario 4: Already committed in log, complete it
txn->Commit();
} else if (is_committed_in_raft_(seq)) {
// Scenario 3: Raft committed, complete locally
txn->Commit();
log_writer_->finalize_as_committed(seq);
} else {
// Scenario 2: Not committed anywhere, abort
txn->Rollback();
log_writer_->finalize_as_aborted(seq);
}
}
// Step 2: Handle unfinalized log entries
auto unfinalized = log_reader_->find_unfinalized_sequences();
for (auto [seq, entry] : unfinalized) {
if (is_committed_in_raft_(seq)) {
// Data exists in Raft, must have committed locally
log_writer_->finalize_as_committed(seq);
} else {
// Never reached consensus, abort
log_writer_->finalize_as_aborted(seq);
}
}
// Step 3: Initialize sequence counter
latest_sequence_ = log_reader_->get_latest_sequence();
}
27.8.3 Abort Records
When a transaction cannot complete, an ABORT record ensures replicas skip the sequence:
auto ReplicationManager::write_abort_record_(SequenceNumber seq) -> void {
auto entry = TransactionLogEntry{
.sequence = seq,
.source_node = node_id_,
.timestamp_ms = current_time_ms(),
.ops = {}, // Empty
.state = PendingState::kAborted
};
log_writer_->write_entry_(entry);
raft_server_->append_entries({serialize(entry)});
}
Replicas process abort records by advancing their sequence counter without applying operations:
auto TransactionApplier::apply_log_entry(const TransactionLogEntry& entry)
-> ReplicationStatus {
if (entry.state == PendingState::kAborted) {
// Skip this sequence, just update counter
last_applied_sequence_.store(entry.sequence, std::memory_order_release);
return ReplicationStatus::OK();
}
// Normal application...
}
27.9 Savepoints
27.9.1 Partial Rollback
Savepoints enable rolling back part of a transaction without aborting entirely:
txn->begin();
txn->put("a", "1");
txn->set_save_point(); // Mark position A
txn->put("b", "2");
txn->set_save_point(); // Mark position B
txn->put("c", "3");
txn->rollback_to_save_point(); // Undo "c"
txn->pop_save_point(); // Discard savepoint B
// Transaction now has: a=1, b=2
txn->commit();
27.9.2 Implementation
RocksDB maintains a stack of savepoints, each capturing the WriteBatch state:
class SimpleTransaction : public Transaction {
public:
auto set_save_point() -> void override {
txn_->SetSavePoint();
}
auto rollback_to_save_point() -> Status override {
return txn_->RollbackToSavePoint();
}
auto pop_save_point() -> Status override {
return txn_->PopSavePoint();
}
};
Internally, each savepoint records:
- WriteBatch size at savepoint time
- Index entries added since last savepoint
Rolling back truncates the WriteBatch and removes index entries.
27.9.3 Use Cases
Error Handling:
txn->set_save_point();
try {
perform_risky_operation(txn);
} catch (const std::exception& e) {
txn->rollback_to_save_point();
perform_fallback_operation(txn);
}
txn->pop_save_point();
txn->commit();
Nested Operations:
void outer_operation(Transaction* txn) {
txn->put("outer", "data");
txn->set_save_point();
auto status = inner_operation(txn);
if (!status.ok()) {
txn->rollback_to_save_point();
}
txn->pop_save_point();
}
27.10 The Commit Observer Pattern
27.10.1 Decoupling Storage and Replication
Cognica uses the Observer pattern to decouple transaction lifecycle from replication:
class CommitObserver {
public:
virtual ~CommitObserver() = default;
// Called before commit, can reject transaction
virtual auto before_commit(Transaction* txn) -> Status = 0;
// Called after successful commit with sequence number
virtual auto on_commit(SequenceNumber seq,
const WriteBatch& batch,
Transaction* txn) -> void = 0;
// Called if commit fails
virtual auto on_commit_failed(Transaction* txn) -> void = 0;
};
27.10.2 Replication Observer
The ReplicationCommitObserver integrates transactions with the replication layer:
class ReplicationCommitObserver final : public CommitObserver {
public:
explicit ReplicationCommitObserver(ReplicationManager* manager)
: manager_(manager) {}
auto before_commit(Transaction* txn) -> Status override {
// Validate transaction can be replicated
if (!manager_->is_leader()) {
return Status::NotSupported("Cannot commit on follower");
}
return Status::OK();
}
auto on_commit(SequenceNumber seq,
const WriteBatch& batch,
Transaction* txn) -> void override {
// Trigger async replication to followers
manager_->replicate_async(seq, batch);
}
auto on_commit_failed(Transaction* txn) -> void override {
// Log failure for monitoring
metrics_->commit_failures.fetch_add(1, std::memory_order_relaxed);
}
private:
ReplicationManager* manager_;
Metrics* metrics_;
};
27.10.3 Observer Chain
Multiple observers can be chained for different concerns:
class CompositeObserver final : public CommitObserver {
public:
void add_observer(std::unique_ptr<CommitObserver> observer) {
observers_.push_back(std::move(observer));
}
auto before_commit(Transaction* txn) -> Status override {
for (auto& obs : observers_) {
auto status = obs->before_commit(txn);
if (!status.ok()) {
return status; // First rejection wins
}
}
return Status::OK();
}
auto on_commit(SequenceNumber seq,
const WriteBatch& batch,
Transaction* txn) -> void override {
for (auto& obs : observers_) {
obs->on_commit(seq, batch, txn);
}
}
private:
std::vector<std::unique_ptr<CommitObserver>> observers_;
};
// Usage
auto composite = std::make_unique<CompositeObserver>();
composite->add_observer(std::make_unique<ReplicationCommitObserver>(mgr));
composite->add_observer(std::make_unique<AuditCommitObserver>(audit_log));
composite->add_observer(std::make_unique<MetricsCommitObserver>(metrics));
27.11 Transaction Application on Replicas
27.11.1 The Transaction Applier
Replica nodes apply committed transactions using the TransactionApplier (src/cognica/replication/transaction/applier.hpp):
class TransactionApplier {
public:
auto apply_log_entry(const TransactionLogEntry& entry) -> ReplicationStatus;
auto last_applied_sequence() const -> SequenceNumber {
return last_applied_sequence_.load(std::memory_order_acquire);
}
auto has_sequence_gap(SequenceNumber incoming) const -> bool {
auto expected = last_applied_sequence_.load() + 1;
return incoming > expected;
}
auto set_leader_term(uint64_t term) -> void {
current_leader_term_.store(term, std::memory_order_release);
}
private:
auto validate_sequence_(SequenceNumber seq) -> ReplicationStatus;
auto validate_term_(uint64_t term) -> ReplicationStatus;
auto convert_to_write_batch_(const TransactionLogEntry& entry) -> WriteBatch;
std::shared_ptr<rocksdb::DB> db_;
std::atomic<SequenceNumber> last_applied_sequence_{0};
std::atomic<uint64_t> current_leader_term_{0};
};
27.11.2 Application Validation Pipeline
Each incoming entry passes through validation:
auto TransactionApplier::apply_log_entry(const TransactionLogEntry& entry)
-> ReplicationStatus {
// Step 1: Validate Raft term (split-brain prevention)
auto status = validate_term_(entry.term);
if (!status.ok()) {
return status;
}
// Step 2: Validate sequence ordering
status = validate_sequence_(entry.sequence);
if (!status.ok()) {
return status;
}
// Step 3: Handle abort entries
if (entry.state == PendingState::kAborted) {
last_applied_sequence_.store(entry.sequence, std::memory_order_release);
return ReplicationStatus::OK();
}
// Step 4: Convert to WriteBatch
auto batch = convert_to_write_batch_(entry);
// Step 5: Apply atomically
rocksdb::WriteOptions write_opts;
write_opts.sync = true;
auto rdb_status = db_->Write(write_opts, &batch);
if (!rdb_status.ok()) {
return ReplicationStatus::StorageError(rdb_status.ToString());
}
// Step 6: Update sequence counter
last_applied_sequence_.store(entry.sequence, std::memory_order_release);
return ReplicationStatus::OK();
}
27.11.3 Term Validation (Fencing)
The Raft term prevents stale leaders from applying outdated transactions:
auto TransactionApplier::validate_term_(uint64_t entry_term) -> ReplicationStatus {
auto current = current_leader_term_.load(std::memory_order_acquire);
if (entry_term < current) {
// Reject entries from old leaders
return ReplicationStatus::StaleTerm(
fmt::format("Entry term {} < current term {}", entry_term, current));
}
if (entry_term > current) {
// New leader, update our term
current_leader_term_.store(entry_term, std::memory_order_release);
}
return ReplicationStatus::OK();
}
27.11.4 Sequence Validation
Sequence validation detects gaps and duplicates:
auto TransactionApplier::validate_sequence_(SequenceNumber seq) -> ReplicationStatus {
auto last = last_applied_sequence_.load(std::memory_order_acquire);
if (seq <= last) {
// Already applied (idempotent)
return ReplicationStatus::Duplicate();
}
if (seq > last + 1) {
// Gap detected
auto gap = seq - last - 1;
if (gap <= kSmallGapThreshold) {
return ReplicationStatus::LogSyncRequired(last + 1, seq);
} else {
return ReplicationStatus::SnapshotSyncRequired();
}
}
return ReplicationStatus::OK(); // Expected next sequence
}
27.12 Gap Detection and Recovery
27.12.1 Gap Sources
Sequence gaps occur when replicas miss transactions:
- Network partitions: Messages lost in transit
- Node restarts: Follower misses entries during downtime
- Leader failover: New leader has different commit history
27.12.2 Recovery Strategies
Gap size determines recovery strategy:
static constexpr size_t kSmallGapThreshold = 10;
static constexpr size_t kLargeGapThreshold = 100;
| Gap Size | Strategy | Mechanism |
|---|---|---|
| 1-10 | Log Sync | Fetch missing entries from leader |
| 11-100 | Batch Sync | Fetch entries in batches |
| > 100 | Snapshot | Full state transfer |
27.12.3 Log Sync
For small gaps, the replica requests missing log entries:
void TransactionReplicator::request_log_sync_(SequenceNumber start,
SequenceNumber end) {
auto request = SyncRequest{
.type = SyncType::LOG,
.start_sequence = start,
.end_sequence = end
};
send_to_leader_(serialize(request));
}
The leader responds with the requested entries:
void TransactionReplicator::handle_sync_request_(const SyncRequest& req) {
std::vector<TransactionLogEntry> entries;
log_reader_->read_range(req.start_sequence, req.end_sequence, &entries);
auto response = SyncResponse{
.entries = std::move(entries)
};
send_to_requester_(serialize(response));
}
27.12.4 Snapshot Sync
For large gaps, full state transfer is more efficient:
void TransactionReplicator::request_snapshot_sync_() {
state_.transition(Role::kSecondary, Role::kRecovering);
auto request = SyncRequest{
.type = SyncType::SNAPSHOT
};
send_to_leader_(serialize(request));
}
The leader sends a RocksDB checkpoint:
void TransactionReplicator::handle_snapshot_request_() {
// Create checkpoint
auto snapshot_path = create_checkpoint_();
// Stream checkpoint files
for (const auto& file : list_files(snapshot_path)) {
stream_file_to_requester_(file);
}
}
27.13 CommitResult and Error Handling
27.13.1 Distinguishing Commit Outcomes
The CommitResult type captures the nuanced outcomes of distributed commits:
class CommitResult {
public:
enum class Status {
kSuccess, // Fully committed and replicated
kCommittedLocally, // Committed but replication pending
kPrepareError, // Failed during prepare
kLogError, // Failed to write transaction log
kRaftError, // Failed to append to Raft
kTimeoutError, // Raft consensus timed out
kCommitError, // Local commit failed
kConflictError // Write-write conflict
};
auto is_committed() const -> bool {
return status_ == Status::kSuccess ||
status_ == Status::kCommittedLocally;
}
auto is_replicated() const -> bool {
return status_ == Status::kSuccess;
}
auto is_fully_successful() const -> bool {
return status_ == Status::kSuccess;
}
auto sequence() const -> std::optional<SequenceNumber> {
return sequence_;
}
auto error_message() const -> std::string_view {
return error_message_;
}
private:
Status status_;
std::optional<SequenceNumber> sequence_;
std::string error_message_;
};
27.13.2 Application Error Handling
Applications must handle different outcomes appropriately:
void handle_write(Request& req, Response& resp) {
auto txn = db_->begin_transaction();
// Perform operations
txn->put(req.key(), req.value());
// Commit with replication
auto result = replication_manager_->replicate_transaction(txn.get());
if (result.is_fully_successful()) {
resp.set_status(StatusCode::OK);
resp.set_sequence(result.sequence().value());
} else if (result.is_committed()) {
// Data saved locally but replication pending
resp.set_status(StatusCode::ACCEPTED);
resp.set_message("Committed locally, replication pending");
} else if (result.status() == CommitResult::Status::kConflictError) {
// Write conflict, client should retry
resp.set_status(StatusCode::CONFLICT);
resp.set_message("Write conflict, please retry");
} else {
// Other errors
resp.set_status(StatusCode::ERROR);
resp.set_message(result.error_message());
}
}
27.14 Performance Optimization
27.14.1 Write Batching
Multiple operations within a transaction share a single disk I/O:
txn->put("key1", "value1"); // Buffered
txn->put("key2", "value2"); // Buffered
txn->put("key3", "value3"); // Buffered
txn->commit(); // Single disk write
The WriteBatch accumulates operations in memory, then writes atomically:
27.14.2 Group Commit
Multiple concurrent transactions can share a single fsync:
class GroupCommitManager {
public:
auto commit(Transaction* txn) -> Status {
std::unique_lock lock(mutex_);
// Add to pending group
pending_.push_back(txn);
// Wait for group commit
auto batch_id = current_batch_id_;
cv_.wait(lock, [&] {
return committed_batch_id_ >= batch_id;
});
return pending_status_[txn];
}
private:
void commit_thread_() {
while (running_) {
std::vector<Transaction*> batch;
{
std::unique_lock lock(mutex_);
cv_.wait_for(lock, max_delay_, [&] {
return pending_.size() >= min_batch_size_;
});
std::swap(batch, pending_);
current_batch_id_++;
}
// Single fsync for entire batch
commit_batch_(batch);
{
std::lock_guard lock(mutex_);
committed_batch_id_++;
}
cv_.notify_all();
}
}
};
Group commit amortizes fsync latency across multiple transactions:
27.14.3 Pipelining
Overlapping phases of different transactions maximizes throughput:
Transaction 1: [Prepare][Raft...........][Commit]
Transaction 2: [Prepare][Raft...........][Commit]
Transaction 3: [Prepare][Raft...........][Commit]
The ReplicationManager processes transactions concurrently:
auto ReplicationManager::replicate_transaction_async(Transaction* txn)
-> std::future<CommitResult> {
return thread_pool_->enqueue([this, txn] {
return replicate_transaction(txn);
});
}
27.14.4 Read Optimization
Read-only transactions skip commit overhead:
auto Database::execute_read_only(const Query& query) -> Result {
// Use snapshot directly, no transaction needed
auto snapshot = db_->GetSnapshot();
ReadOptions opts;
opts.snapshot = snapshot;
auto result = execute_with_options_(query, opts);
db_->ReleaseSnapshot(snapshot);
return result;
}
27.15 Monitoring and Metrics
27.15.1 Key Metrics
struct TransactionMetrics {
// Counters
std::atomic<uint64_t> transactions_started{0};
std::atomic<uint64_t> transactions_committed{0};
std::atomic<uint64_t> transactions_aborted{0};
std::atomic<uint64_t> transactions_conflicted{0};
// Latency histograms
Histogram commit_latency_us;
Histogram prepare_latency_us;
Histogram raft_latency_us;
// Gauges
std::atomic<uint64_t> active_transactions{0};
std::atomic<uint64_t> pending_replication{0};
};
27.15.2 Health Indicators
struct TransactionHealth {
// Replication lag
SequenceNumber leader_sequence;
SequenceNumber follower_sequence;
uint64_t lag() const { return leader_sequence - follower_sequence; }
// Conflict rate
double conflict_rate() const {
return static_cast<double>(conflicts) / total_commits;
}
// Commit success rate
double success_rate() const {
return static_cast<double>(committed) / (committed + aborted);
}
};
27.15.3 Alerting Thresholds
| Metric | Warning | Critical |
|---|---|---|
| Replication lag | > 100 entries | > 1000 entries |
| Conflict rate | > 1% | > 10% |
| Commit latency p99 | > 100ms | > 1s |
| Active transactions | > 1000 | > 10000 |
27.16 Summary
Cognica's transaction processing system provides ACID guarantees in a distributed environment:
- Snapshot Isolation: Strong consistency without serialization overhead for READ COMMITTED and REPEATABLE READ isolation levels
- Serializable Snapshot Isolation: True SERIALIZABLE isolation via rw-conflict tracking, dangerous structure detection, and SIREAD locks — preventing write skew anomalies that SI alone cannot detect
- Optimistic Concurrency: High throughput under low contention; SSI adds zero overhead for non-SERIALIZABLE transactions
- Two-Phase Commit: Coordination between local storage and Raft consensus
- Comprehensive Recovery: Correct behavior across all failure scenarios
- Observer Pattern: Clean separation of concerns between storage and replication
Key implementation highlights:
- RocksDB Foundation: Leverages RocksDB's mature transaction API
- SSI with Lock Promotion: Per-document SIREAD locks promote to collection-level after 256 documents, bounding memory while maintaining conflict detection
- Read-Only Fast Path: SERIALIZABLE read-only transactions skip conflict tracking entirely
- DEFERRABLE Mode: Long-running analytics queries disable SSI tracking for zero overhead
- Sequence-Based Ordering: Globally unique sequences enable gap detection and recovery
- Term Fencing: Prevents split-brain corruption from stale leaders
- Savepoint Support: Enables partial rollback for complex operations
- Group Commit: Amortizes fsync latency across concurrent transactions
The transaction system forms the foundation for Cognica's consistency guarantees, ensuring that data remains correct even under concurrent access, network partitions, and node failures.