Chapter 31: Memory Management

Memory management stands as one of the most critical aspects of database engine design, directly impacting performance, stability, and scalability. A database engine must carefully orchestrate memory across multiple subsystems—storage layer caches, query execution buffers, operator state, and temporary results—while avoiding memory exhaustion that could crash the server or corrupt data. This chapter examines Cognica's comprehensive memory management architecture, from lock-free memory pools to cost-based spill decisions.

31.1 Memory Architecture Overview

Cognica's memory architecture spans multiple layers, each with distinct allocation patterns and lifecycle characteristics:

Loading diagram...

The memory hierarchy serves different access patterns:

  1. Hot Data: Block cache and row cache for frequently accessed data
  2. Execution State: Query operators, aggregation tables, sort buffers
  3. Temporary Results: Spill files, intermediate materialization
  4. Long-term Allocation: Schema metadata, connection state

Memory consumption follows the formula:

Mtotal=Mcache+Mexecution+Mtemp+MmetadataM_{total} = M_{cache} + M_{execution} + M_{temp} + M_{metadata}

where each component requires careful budgeting to prevent memory pressure.

31.2 Lock-Free Memory Pool

High-performance query execution demands efficient memory allocation without lock contention. Cognica implements a sophisticated lock-free memory pool that minimizes allocation overhead in multi-threaded environments.

31.2.1 Size Category Design

The memory pool organizes allocations into size categories based on powers of two:

class MemoryPool final {
private:
  struct BlockHeader {
    size_t category_index;

    explicit BlockHeader(size_t index) : category_index(index) {}
  };

public:
  explicit MemoryPool(size_t initial_blocks_per_category = 1024,
                      size_t alignment = alignof(std::max_align_t))
      : alignment_(alignment), min_block_size_(16), max_block_size_(8192),
        num_categories_(0),
        per_thread_capacity_(
            initial_blocks_per_category > 0
                ? std::max(1uz, initial_blocks_per_category
                                    / std::thread::hardware_concurrency())
                : 64) {
    // Determine number of categories and initialize them
    auto current_size = min_block_size_;
    while (current_size <= max_block_size_) {
      auto actual_block_size
          = std::max({current_size, alignment_, sizeof(BlockHeader)});
      if (actual_block_size % alignment_ != 0) {
        actual_block_size = (actual_block_size / alignment_ + 1) * alignment_;
      }
      categories_.emplace_back(actual_block_size);
      num_categories_++;
      if (current_size > max_block_size_ / 2) {
        break;
      }
      current_size <<= 1;
    }

    // Initial expansion for all categories
    for (auto i = 0uz; i < num_categories_; ++i) {
      expand_global_pool_(i, initial_blocks_per_category);
    }
  }
};

The category structure provides O(1) allocation by mapping request sizes to appropriate buckets:

category_index=log2(size)log2(min_block_size)\text{category\_index} = \lceil \log_2(\text{size}) \rceil - \lceil \log_2(\text{min\_block\_size}) \rceil

31.2.2 Lock-Free Free List

Each size category maintains an atomic free list using compare-and-swap operations:

class AtomicFreeList final {
public:
  void push(void* block) noexcept {
    if (!block) {
      return;
    }

    void* old_head = head_.load(std::memory_order_relaxed);
    do {
      // Store next pointer at block's beginning
      *static_cast<void**>(block) = old_head;
    } while (!head_.compare_exchange_weak(
        old_head, block, std::memory_order_release,
        std::memory_order_relaxed));
  }

  bool try_pop(void** result) noexcept {
    void* old_head = head_.load(std::memory_order_relaxed);
    do {
      if (!old_head) {
        return false;
      }
      *result = old_head;
    } while (!head_.compare_exchange_weak(
        old_head, *static_cast<void**>(old_head),
        std::memory_order_acquire, std::memory_order_relaxed));

    return true;
  }

private:
  std::atomic<void*> head_{nullptr};
};

The lock-free design ensures that multiple threads can allocate and deallocate concurrently without mutex contention. The memory ordering guarantees:

  • Release on push: Ensures block initialization is visible before the block becomes available
  • Acquire on pop: Ensures we see all writes to the block before using it

31.2.3 Thread-Local Caching

To further reduce contention on global free lists, each thread maintains local caches:

struct ThreadCache final {
  std::vector<std::vector<void*>> free_blocks_by_category;
  MemoryPool* pool;

  explicit ThreadCache(MemoryPool* p, size_t num_categories)
      : free_blocks_by_category(num_categories), pool(p) {
    for (auto& category_list : free_blocks_by_category) {
      category_list.reserve(64);
    }
  }

  ~ThreadCache() {
    if (pool) {
      for (auto i = 0uz; i < free_blocks_by_category.size(); ++i) {
        if (!free_blocks_by_category[i].empty()) {
          pool->return_category_blocks_(i, free_blocks_by_category[i]);
        }
      }
    }
    pool = nullptr;
  }
};

The allocation strategy follows a three-tier hierarchy:

Loading diagram...

When a thread's local cache for a category grows too large, excess blocks are returned to the global pool:

void balance_thread_local_cache_(size_t category_index) {
  auto& cache = get_thread_cache_();
  auto& category_cache = cache.free_blocks_by_category[category_index];

  if (category_cache.size() > per_thread_capacity_ * 2
      && per_thread_capacity_ > 0) {
    auto excess_count = category_cache.size() - per_thread_capacity_;
    auto& category = categories_[category_index];

    for (auto i = 0uz; i < excess_count; ++i) {
      auto* block_to_move = category_cache.back();
      category_cache.pop_back();
      category.global_free_list.push(block_to_move);
    }
  }
}

31.3 Cache Eviction Policies

Cognica implements two complementary cache eviction policies: LRU (Least Recently Used) for temporal locality and LFU (Least Frequently Used) for frequency-based patterns.

31.3.1 LRU Cache Implementation

The LRU cache uses a hash map combined with a doubly-linked list for O(1) operations:

template<typename Key, typename Value>
class LRUCache {
public:
  using KeyValueTuple = std::tuple<KeyType, ValueType>;
  using ListType = std::list<KeyValueTuple>;
  using MapType = std::unordered_map<KeyType, typename ListType::iterator,
                                     KeyHash, KeyPred>;

  void put(const KeyType& key, ValueType&& value) {
    auto it = map_.find(key);
    if (it == std::end(map_)) {
      if (size() >= capacity_) {
        evict_();
      }

      list_.emplace_front(std::forward_as_tuple(key, std::move(value)));
      map_[key] = list_.begin();
    } else {
      std::get<1>(*it->second) = std::move(value);
      relocate_to_front_(it);
    }
  }

  auto get(const K& key) const -> std::optional<ValueType> {
    auto it = map_.find(key);
    if (it == std::end(map_)) {
      return std::nullopt;
    }
    return get_value_and_relocate_(it);
  }

private:
  void evict_() {
    map_.erase(std::get<0>(list_.back()));
    list_.pop_back();
  }

  void relocate_to_front_(Iterator it) const {
    if (it->second == list_.begin()) {
      return;
    }
    list_.splice(list_.begin(), list_, it->second);
    map_[it->first] = list_.begin();
  }

private:
  mutable MapType map_{};
  mutable ListType list_{};
  size_t capacity_{};
};

The splice operation is critical for O(1) relocation—it moves a list node without memory allocation.

31.3.2 LFU Cache Implementation

The LFU cache tracks access frequency using a multimap ordered by frequency:

template<typename Key, typename Value>
class LFUCache {
public:
  using StorageType = std::multimap<size_t, KeyValueTuple>;

  void put(const KeyType& key, ValueType&& value) {
    if (size() >= capacity_) {
      evict_();
    }

    // Insert with frequency 1
    map_[key] = storage_.emplace_hint(
        std::begin(storage_), 1,
        std::forward_as_tuple(key, std::move(value)));
  }

  auto get(const K& key) const -> std::optional<ValueType> {
    auto it = map_.find(key);
    if (it == std::end(map_)) {
      return std::nullopt;
    }
    return get_value_and_update_freq_(it);
  }

private:
  auto get_value_and_update_freq_(Iterator it) const
      -> std::optional<ValueType> {
    const auto& key = it->first;
    const auto& value = it->second;

    // Remove and reinsert with incremented frequency
    storage_.erase(value);
    auto inserted = storage_.emplace_hint(
        std::end(storage_), value->first + 1, value->second);
    map_[key] = inserted;

    return std::get<1>(value->second);
  }

  void evict_() {
    // First element has lowest frequency
    auto it = std::begin(storage_);
    const auto& key = std::get<0>(it->second);
    map_.erase(key);
    storage_.erase(it);
  }

private:
  mutable MapType map_;
  mutable StorageType storage_;
  size_t capacity_;
};

The LFU implementation follows the O(1) algorithm described by Shah et al., using frequency-ordered storage for efficient eviction of the least frequently accessed items.

31.4 Storage Engine Memory Configuration

RocksDB's memory configuration significantly impacts Cognica's overall memory footprint. The storage engine initializes multiple cache tiers:

31.4.1 Block Cache Configuration

The block cache stores decompressed data blocks from SST files:

constexpr size_t kBlockCacheCapacity = 5_GB;
constexpr int32_t kBlockCacheShardBits = 6;

auto cache_options = rdb::HyperClockCacheOptions{
    cache_capacity,   0,
    cache_shard_bits, db_options.block_cache.strict_capacity_limit,
    nullptr,          rdb::kDefaultCacheMetadataChargePolicy,
};

if (db_options.secondary_cache.enabled) {
  auto secondary_cache_options = rdb::CompressedSecondaryCacheOptions{
      db_options.secondary_cache.cache_capacity,
      db_options.secondary_cache.cache_shard_bits,
      db_options.secondary_cache.strict_capacity_limit,
      0.5,
  };
  secondary_cache_options.enable_custom_split_merge
      = db_options.secondary_cache.enable_custom_split_merge;
  cache_options.secondary_cache
      = secondary_cache_options.MakeSharedSecondaryCache();
}

auto block_cache = cache_options.MakeSharedCache();
table_options.block_cache = std::move(block_cache);

Configuration options from model.hpp:

struct BlockCacheOptions {
  bool enabled = true;
  uint64_t cache_capacity = 2_GB;
  int32_t cache_shard_bits = 3;  // 2^3 = 8 shards
  bool strict_capacity_limit = false;
};

struct SecondaryCacheOptions {
  bool enabled = false;
  uint64_t cache_capacity = 2_GB;
  int32_t cache_shard_bits = 3;
  bool strict_capacity_limit = false;
  bool enable_custom_split_merge = false;
};

The HyperClockCache provides better concurrent performance than LRU cache through clock-based eviction with sharding. The shard count is 2shard_bits2^{\text{shard\_bits}}, distributing lock contention.

31.4.2 Write Buffer Configuration

Write buffers (MemTables) hold recently written data before flushing to SST files:

struct StorageOptions {
  size_t write_buffer_size = 256_MB;
  int32_t min_write_buffer_number_to_merge = 1;
  int32_t max_write_buffer_number = 16;
  size_t arena_block_size = 16_MB;
  size_t max_total_wal_size = 4_GB;
};

The total write buffer memory is bounded by:

Mwrite_bufferwrite_buffer_size×max_write_buffer_numberM_{write\_buffer} \leq \text{write\_buffer\_size} \times \text{max\_write\_buffer\_number}

With default settings: 256MB×16=4GB256\text{MB} \times 16 = 4\text{GB} maximum.

31.4.3 Table Options for Memory Efficiency

Block-based table options control memory usage for index and filter blocks:

struct BlockBasedTableOptions {
  uint64_t block_size = 32_KB;
  bool cache_index_and_filter_blocks = true;
  bool cache_index_and_filter_blocks_with_high_priority = true;
  bool partition_filters = true;
  bool decouple_partitioned_filters = true;
  uint64_t metadata_block_size = 4_KB;
  bool optimize_filters_for_memory = true;
};

Partitioned filters reduce memory spikes during filter loading by caching only the needed partitions rather than the entire filter.

31.5 Query Execution Memory Budget

Query execution requires careful memory allocation across operators. Cognica's memory budget allocator distributes available memory based on cardinality estimates and operator priorities.

31.5.1 Cardinality Estimation

The cardinality estimator predicts row counts through pipeline stages:

struct CardinalityEstimate {
  double rows = 0.0;
  double row_width = 512.0;  // Average bytes per row

  auto memory_estimate() const -> size_t {
    if (rows <= 0.0) {
      return 0;
    }
    return static_cast<size_t>(rows * row_width);
  }
};

class CardinalityEstimator {
public:
  auto propagate_sort(const CardinalityEstimate& input) const
      -> CardinalityEstimate {
    return input;  // Sort preserves cardinality
  }

  auto propagate_group(const CardinalityEstimate& input,
                       const std::vector<std::string>& group_keys) const
      -> CardinalityEstimate {
    auto groups = estimate_group_count(group_keys);
    return CardinalityEstimate{static_cast<double>(groups), input.row_width};
  }

  auto propagate_limit(const CardinalityEstimate& input, int64_t limit) const
      -> CardinalityEstimate {
    return CardinalityEstimate{
        std::min(input.rows, static_cast<double>(limit)), input.row_width};
  }

private:
  static constexpr double kDefaultEqualitySelectivity = 0.01;  // 1%
  static constexpr double kDefaultRangeSelectivity = 0.30;     // 30%
  static constexpr double kDefaultJoinSelectivity = 0.1;       // 10%
};

Selectivity estimation uses statistics when available:

selequality=1NDV(column)\text{sel}_{equality} = \frac{1}{\text{NDV}(column)}

For range predicates:

selrange=highlowmaxmin\text{sel}_{range} = \frac{\text{high} - \text{low}}{\text{max} - \text{min}}

31.5.2 Memory Budget Allocation

The MemoryBudgetAllocator distributes memory across pipeline stages:

struct OperatorMemoryBudget {
  size_t allocated = 0;
  bool will_spill = false;
  double spill_fraction = 0.0;
  spill::SpillOptions spill_options{};

  auto is_sufficient() const -> bool {
    return !will_spill;
  }
};

struct StageMemoryRequirement {
  PipelineStageType stage_type = PipelineStageType::kSort;
  size_t stage_index = 0;
  size_t estimated_memory = 0;
  bool is_blocking = false;
  double priority = 1.0;
};

class MemoryBudgetAllocator final {
public:
  explicit MemoryBudgetAllocator(size_t total_budget);

  auto allocate(const ParsedPipeline& pipeline,
                const std::vector<CardinalityEstimate>& estimates)
      -> std::vector<OperatorMemoryBudget>;

private:
  static constexpr double kSortPriority = 1.5;
  static constexpr double kGroupPriority = 1.2;
  static constexpr double kJoinPriority = 1.0;

  size_t total_budget_;
  size_t min_operator_budget_ = 16 * 1024 * 1024;  // 16MB minimum
};

The allocation algorithm:

  1. Analyze requirements: Estimate memory needs for each blocking stage
  2. Priority weighting: Apply stage-type priorities (sort > group > join)
  3. Proportional distribution: Allocate budget proportionally to weighted requirements
  4. Minimum guarantees: Ensure each operator receives at least min_operator_budget_
  5. Spill configuration: Set up spill options when budget is insufficient

The allocation formula:

allocatedi=max(min_budget,wirijwjrjBtotal)\text{allocated}_i = \max\left(\text{min\_budget}, \frac{w_i \cdot r_i}{\sum_j w_j \cdot r_j} \cdot B_{total}\right)

where wiw_i is the priority weight and rir_i is the estimated requirement.

31.6 Disk Spill Framework

When memory is insufficient for in-memory execution, Cognica spills intermediate results to disk. The spill framework provides unified infrastructure for sort, join, and aggregation operations.

31.6.1 Spill Options and Configuration

The spill system is configured through SpillOptions:

struct SpillOptions {
  // Memory management
  uint64_t memory_limit = 256 * 1024 * 1024;  // 256MB default
  uint64_t spill_batch_size = 10000;

  // Sort-specific
  int32_t max_merge_width = 16;
  bool enable_parallel_sort = true;

  // Hash join-specific
  int32_t num_partitions = 64;
  int32_t max_recursion_depth = 4;

  // I/O configuration
  size_t read_buffer_size = 64 * 1024;   // 64KB
  size_t write_buffer_size = 64 * 1024;  // 64KB

  // Compression and encryption
  CompressionType compression = CompressionType::kLZ4;
  std::optional<EncryptionOptions> encryption;

  // Temp file configuration
  std::filesystem::path temp_directory{};
  bool delete_temp_files_on_close = true;
};

31.6.2 Spill File Format

Spill files use a self-describing header format:

Loading diagram...

The header structure:

struct SpillFileHeader {
  uint32_t magic = kSpillFileMagic;  // "CSPL"
  uint8_t version = kSpillFileVersion;
  uint8_t flags = 0;
  uint16_t reserved = 0;

  void set_compression(CompressionType type) {
    flags = (flags & ~kFlagCompressionMask) | static_cast<uint8_t>(type);
  }

  auto has_encryption() const -> bool {
    return (flags & kFlagEncryptionBit) != 0;
  }
};

static_assert(sizeof(SpillFileHeader) == 8);

31.6.3 Spill Streams

The spill I/O layer provides buffered, compressed, and optionally encrypted streams:

class SpillOutputStream final {
public:
  SpillOutputStream(const std::filesystem::path& path,
                    CompressionType compression,
                    const std::optional<EncryptionOptions>& encryption,
                    size_t buffer_size = 64 * 1024);

  void write_record(const void* data, size_t size);
  void write_record(std::string_view data);
  void write_raw(const void* data, size_t size);
  void flush();
  void close();

  auto record_count() const -> uint64_t;
};

class SpillInputStream final {
public:
  SpillInputStream(const std::filesystem::path& path,
                   CompressionType compression,
                   const std::optional<EncryptionOptions>& encryption,
                   size_t buffer_size = 64 * 1024);

  auto read_record() -> std::optional<std::string>;
  auto read_raw(void* buffer, size_t size) -> size_t;
  void reset();
  bool eof() const;
};

Records are length-prefixed for efficient streaming:

Loading diagram...

31.6.4 Temporary File Management

The TempFileManager provides thread-safe temporary file creation and cleanup:

class TempFileManager final {
public:
  explicit TempFileManager(const std::filesystem::path& base_directory = {},
                           const std::string& prefix = "spill");
  ~TempFileManager();

  auto create_temp_path() -> std::filesystem::path;
  auto create_temp_path(const std::string& suffix) -> std::filesystem::path;

  void remove_file(const std::filesystem::path& path);
  void cleanup_all();

  auto file_count() const -> size_t;

private:
  std::filesystem::path base_directory_;
  std::string prefix_;
  uint64_t manager_id_;
  std::atomic<uint32_t> file_counter_{0};

  mutable std::mutex mutex_;
  std::vector<std::filesystem::path> tracked_files_;

  static std::atomic<uint64_t> manager_id_counter_;
};

The manager ensures automatic cleanup on destruction, preventing temp file leaks.

31.7 External Sort Implementation

External sort handles ORDER BY operations that exceed memory limits using a k-way merge sort algorithm.

31.7.1 External Sort Architecture

Loading diagram...

The external sorter implementation:

class ExternalSorter final {
public:
  using ThreeWayComparator = std::function<int32_t(
      const Document&, const Document&)>;

  ExternalSorter(const ExternalSortConfig& config,
                 ThreeWayComparator three_way_comparator);

  void add(Document&& doc) {
    buffer_.push_back({std::move(doc), next_insertion_order_++});
    buffer_memory_ += estimate_document_size_(buffer_.back().doc);

    if (buffer_memory_ >= config_.memory_limit) {
      spill_to_disk_();
    }
  }

  void finalize() {
    if (!runs_.empty()) {
      // External sort path: create merge cursor
      spill_to_disk_();  // Spill remaining buffer
      merge_cursor_ = std::make_unique<MergeCursor>(
          std::move(runs_), three_way_comparator_);
    } else {
      // In-memory sort path
      std::sort(buffer_.begin(), buffer_.end(), /* stable sort */);
      in_memory_it_ = buffer_.begin();
      in_memory_mode_ = true;
    }
    finalized_ = true;
  }

  auto next() -> std::optional<Document> {
    if (in_memory_mode_) {
      if (in_memory_it_ == buffer_.end()) {
        return std::nullopt;
      }
      return std::move((in_memory_it_++)->doc);
    }
    return merge_cursor_->next();
  }

private:
  void spill_to_disk_() {
    // Sort buffer in memory
    std::stable_sort(buffer_.begin(), buffer_.end(),
        [this](const auto& a, const auto& b) {
          auto cmp = three_way_comparator_(a.doc, b.doc);
          if (cmp != 0) return cmp < 0;
          return a.insertion_order < b.insertion_order;  // Stable
        });

    // Write to sorted run file
    auto path = temp_file_manager_->create_temp_path();
    auto writer = SortedRunWriter{path, config_.compression,
                                   config_.encryption};
    for (const auto& entry : buffer_) {
      writer.write(entry.doc);
    }
    runs_.push_back(writer.finalize());

    buffer_.clear();
    buffer_memory_ = 0;
  }

private:
  std::vector<SortEntry> buffer_;
  size_t buffer_memory_ = 0;
  std::vector<SortedRun> runs_;
  std::unique_ptr<MergeCursor> merge_cursor_;
};

31.7.2 K-Way Merge with Min-Heap

The merge cursor uses a min-heap for efficient k-way merging:

class MergeCursor final {
public:
  MergeCursor(std::vector<SortedRun>&& runs,
              ThreeWayComparator three_way_comparator)
      : runs_(std::move(runs)),
        three_way_comparator_(std::move(three_way_comparator)) {}

  auto next() -> std::optional<Document> {
    if (!initialized_) {
      initialize_heap_();
      initialized_ = true;
    }

    if (heap_.empty()) {
      return std::nullopt;
    }

    // Pop minimum element
    std::pop_heap(heap_.begin(), heap_.end(), heap_compare_);
    auto result = std::move(heap_.back().doc);
    auto run_idx = heap_.back().run_index;
    heap_.pop_back();

    // Refill from same run
    if (auto doc = runs_[run_idx].next()) {
      heap_.push_back({run_idx, std::move(*doc)});
      std::push_heap(heap_.begin(), heap_.end(), heap_compare_);
    }

    return result;
  }

private:
  void initialize_heap_() {
    for (size_t i = 0; i < runs_.size(); ++i) {
      if (auto doc = runs_[i].next()) {
        heap_.push_back({i, std::move(*doc)});
      }
    }
    std::make_heap(heap_.begin(), heap_.end(), heap_compare_);
  }

private:
  std::vector<SortedRun> runs_;
  ThreeWayComparator three_way_comparator_;
  std::vector<RunEntry> heap_;
};

The number of merge passes for NN bytes of data with memory MM and merge width kk:

passes=logk(NM)\text{passes} = \lceil \log_k \left( \frac{N}{M} \right) \rceil

With k=16k = 16 and typical settings, most sorts complete in 1-2 passes.

31.8 Grace Hash Join

For joins that exceed memory, Cognica implements the Grace Hash Join algorithm with recursive partitioning.

31.8.1 Spillable Hash Table

class SpillableHashTable final {
public:
  SpillableHashTable(const SpillConfig& config, size_t num_partitions = 64);

  void insert(const VMValue& key, db::document::Document* doc) {
    auto partition_idx = compute_partition_(key);
    auto hash = compute_hash_(key);
    auto& partition = partitions_[partition_idx];

    partition.entries.emplace(hash, *doc);
    partition.memory_estimate += SpillConfig::kEstimatedDocSize;
    total_memory_ += SpillConfig::kEstimatedDocSize;

    // Check for spill
    if (total_memory_ > config_.memory_limit) {
      // Find largest in-memory partition and spill it
      size_t max_partition = 0;
      size_t max_size = 0;
      for (size_t i = 0; i < num_partitions_; ++i) {
        if (!partitions_[i].spilled &&
            partitions_[i].memory_estimate > max_size) {
          max_partition = i;
          max_size = partitions_[i].memory_estimate;
        }
      }
      spill_partition_(max_partition);
    }
  }

  auto probe(const VMValue& key) -> std::vector<db::document::Document*> {
    auto partition_idx = compute_partition_(key);
    auto hash = compute_hash_(key);
    auto& partition = partitions_[partition_idx];

    if (partition.spilled) {
      load_partition_(partition_idx);
    }

    auto range = partition.entries.equal_range(hash);
    std::vector<db::document::Document*> results;
    for (auto it = range.first; it != range.second; ++it) {
      results.push_back(&it->second);
    }
    return results;
  }

private:
  struct Partition {
    std::unordered_multimap<int64_t, Document> entries;
    size_t memory_estimate = 0;
    bool spilled = false;
    std::filesystem::path spill_path;
  };

  auto compute_partition_(const VMValue& key) const -> size_t {
    return static_cast<size_t>(compute_hash_(key)) % num_partitions_;
  }

private:
  std::vector<Partition> partitions_;
  size_t total_memory_ = 0;
};

31.8.2 Grace Hash Join Algorithm

The Grace Hash Join proceeds in phases:

  1. Build Phase: Partition build-side rows by hash, spilling overflow partitions
  2. Probe Phase: For each probe row, look up matching partition
  3. Recursive Join: If a partition is still too large, recursively partition
Loading diagram...

The partition count is chosen based on expected data size:

partitions=min(64,max(4,data_sizememory_budget))\text{partitions} = \min\left(64, \max\left(4, \frac{\text{data\_size}}{\text{memory\_budget}}\right)\right)

31.9 Spillable Aggregation

GROUP BY operations with many distinct groups require spillable aggregation tables.

31.9.1 Spillable Aggregation Table

class SpillableAggTable final {
public:
  SpillableAggTable(const SpillConfig& config,
                    const std::vector<AggregateFunction>& functions,
                    size_t num_partitions = 64);

  void accumulate(const VMValue& group_key, const VMValue& value,
                  size_t agg_index) {
    auto* group = get_or_create_group_(group_key);
    group->states[agg_index]->accumulate(value);
  }

  void finalize() {
    for (auto& partition : partitions_) {
      if (partition.spilled) {
        load_partition_(partition_idx);
      }
      for (auto& [hash, groups] : partition.groups) {
        for (auto& group : groups) {
          for (auto& state : group.states) {
            state->finalize();
          }
        }
      }
    }
    finalized_ = true;
  }

  auto iter_next() -> std::optional<std::pair<VMValue, std::vector<VMValue>>> {
    // Iterate through partitions and groups
    // Load spilled partitions on demand
  }

private:
  struct GroupEntry {
    VMValue key;
    std::string owned_key_string;
    std::vector<std::unique_ptr<AggregateState>> states;
  };

  struct Partition {
    std::unordered_map<int64_t, std::vector<GroupEntry>> groups;
    size_t memory_estimate = 0;
    bool spilled = false;
  };

private:
  std::vector<AggregateFunction> functions_;
  std::vector<Partition> partitions_;
};

The aggregation table partitions groups by key hash, enabling partial spilling while keeping hot groups in memory.

31.10 Cost-Based Spill Decisions

The spill decision framework uses cost analysis to choose optimal execution strategies.

31.10.1 Spill Decision Framework

enum class SpillStrategy : uint8_t {
  kInMemory = 0,
  kExternalSort = 1,
  kGraceHashJoin = 2,
  kHashPartitionAgg = 3,
};

struct SpillDecision {
  bool will_spill = false;
  SpillStrategy strategy = SpillStrategy::kInMemory;
  size_t recommended_memory = 0;
  double estimated_spill_cost = 0.0;
  int32_t num_partitions = 0;
  int32_t merge_passes = 0;
};

class SpillDecisionFramework final {
public:
  auto decide(PipelineStageType stage_type,
              const CardinalityEstimate& input_estimate,
              size_t available_memory) const -> SpillDecision {
    switch (stage_type) {
      case PipelineStageType::kSort:
        return decide_sort_(input_estimate, available_memory);
      case PipelineStageType::kGroup:
        return decide_group_(input_estimate, available_memory);
      case PipelineStageType::kJoin:
        return decide_join_(input_estimate, available_memory);
      // ...
    }
  }

private:
  auto decide_sort_(const CardinalityEstimate& estimate, size_t memory) const
      -> SpillDecision {
    auto required = estimate.memory_estimate();
    auto threshold = static_cast<size_t>(memory * spill_threshold_);

    if (required <= threshold) {
      return SpillDecision{false, SpillStrategy::kInMemory, required, 0.0};
    }

    auto passes = estimate_merge_passes_(required, memory, max_merge_width_);
    auto cost = estimate_external_sort_cost_(required, passes);

    return SpillDecision{
        true, SpillStrategy::kExternalSort,
        memory, cost, 0, passes};
  }

  auto estimate_merge_passes_(size_t data_size, size_t memory, int32_t k) const
      -> int32_t {
    // passes = ceil(log_k(data_size / memory))
    auto runs = static_cast<double>(data_size) / memory;
    return static_cast<int32_t>(std::ceil(std::log(runs) / std::log(k)));
  }

private:
  double spill_threshold_ = 0.8;
  int32_t max_merge_width_ = 16;

  static constexpr double kDiskReadCostPerByte = 1e-6;
  static constexpr double kDiskWriteCostPerByte = 1.5e-6;
};

31.10.2 Cost Model

The cost model estimates I/O overhead for different spill strategies:

External Sort Cost:

Csort=2Np(Cread+Cwrite)C_{sort} = 2 \cdot N \cdot p \cdot (C_{read} + C_{write})

where NN is data size, pp is number of passes, and Cread,CwriteC_{read}, C_{write} are per-byte I/O costs.

Grace Hash Join Cost:

Cjoin=2(R+S)(Cread+Cwrite)(1+overhead)C_{join} = 2 \cdot (|R| + |S|) \cdot (C_{read} + C_{write}) \cdot (1 + \text{overhead})

where R|R| and S|S| are relation sizes and overhead accounts for partitioning.

Hash Aggregation Cost:

Cagg=inputCwrite+output(Cread+Cwrite)C_{agg} = |input| \cdot C_{write} + |output| \cdot (C_{read} + C_{write})

The framework chooses in-memory execution when:

MrequiredαMavailableM_{required} \leq \alpha \cdot M_{available}

where α=0.8\alpha = 0.8 is the spill threshold.

31.11 CVM Spillable Operators

The CVM execution engine integrates spillable buffers for memory-intensive operations.

31.11.1 Spillable Sort Buffer

class SpillableSortBuffer final {
public:
  SpillableSortBuffer(const SpillConfig& config, ThreeWayComparator comparator);

  void add(const db::document::Document& doc);
  void add(db::document::Document&& doc);
  void finalize();

  auto next() -> db::document::Document*;
  bool has_next() const;

  auto memory_used() const -> size_t;
  auto spill_count() const -> size_t;
  bool has_spilled() const;

private:
  SpillConfig config_;
  ThreeWayComparator comparator_;
  std::unique_ptr<db::document::ExternalSorter> sorter_;
  std::optional<db::document::Document> current_doc_;
  bool finalized_ = false;
};

31.11.2 Spillable Window Buffer

Window functions require buffering all partition rows:

class SpillableWindowBuffer final {
public:
  explicit SpillableWindowBuffer(const SpillConfig& config);

  void add(const db::document::Document& doc) {
    rows_.push_back(doc);
    total_memory_ += SpillConfig::kEstimatedDocSize;
    check_memory_and_spill_();
  }

  void compute(uint16_t spec_index, SpillableWindowComputeFunc callback) {
    if (rows_spilled_) {
      load_rows_from_disk_();
    }

    std::vector<Document*> row_ptrs;
    for (auto& row : rows_) {
      row_ptrs.push_back(&row);
    }

    results_ = callback(row_ptrs, spec_index);
    computed_ = true;

    check_memory_and_spill_();
  }

  auto next() -> db::document::Document* {
    if (current_index_ >= rows_.size()) {
      return nullptr;
    }
    return &rows_[current_index_++];
  }

  auto get_current_results() const -> const WindowResultMap* {
    if (current_index_ == 0 || current_index_ > results_.size()) {
      return nullptr;
    }
    return &results_[current_index_ - 1];
  }

private:
  std::vector<db::document::Document> rows_;
  std::vector<WindowResultMap> results_;
  bool rows_spilled_ = false;
  bool results_spilled_ = false;
  size_t total_memory_ = 0;
};

31.11.3 Set Operation Buffer

UNION, INTERSECT, and EXCEPT operations use spillable set buffers:

enum class SetOpType : uint8_t {
  kUnion = 0,
  kIntersect = 1,
  kExcept = 2
};

class SpillableSetOpBuffer final {
public:
  SpillableSetOpBuffer(const SpillConfig& config, SetOpType type, bool all,
                       size_t num_partitions = 64);

  void add(const db::document::Document& doc, uint8_t source);
  void finalize();
  auto next() -> db::document::Document*;

private:
  struct HashPartition {
    // Hash -> (count, source_mask)
    std::unordered_map<uint64_t, std::pair<int64_t, uint8_t>> hash_info;
    size_t memory_estimate = 0;
    bool spilled = false;
  };

private:
  SetOpType type_;
  bool all_;
  std::vector<HashPartition> hash_partitions_;
  std::vector<ResultPartition> result_partitions_;
};

31.12 Memory Monitoring and Diagnostics

Effective memory management requires visibility into usage patterns.

31.12.1 Configuration Options

The configuration system exposes memory settings at multiple levels:

struct DocumentDBOptimizerOptions {
  bool enabled = true;
  uint64_t memory_budget = 256_MB;
  uint32_t max_optimization_passes = 10;
  bool enable_filter_pushdown = true;
  bool enable_topk_optimization = true;
  bool enable_index_selection = true;
  bool enable_cost_based_join = true;
  std::string spill_directory{};
};

struct DocumentDBCVMOptions {
  bool enabled = true;
  uint64_t memory_limit = 256_MB;
  double spill_threshold = 0.8;
  std::string temp_directory{};
  size_t cache_max_entries = 1024;
};

31.12.2 Storage Statistics

RocksDB statistics provide insight into storage memory:

struct StorageOptions {
  bool dump_malloc_stats = true;
  bool report_bg_io_stats = true;
  bool dump_storage_stats = true;
};

When enabled, the database periodically logs:

  • Block cache hit/miss rates
  • Write buffer utilization
  • Compaction I/O statistics
  • Memory allocator statistics (via jemalloc/tcmalloc)

31.13 Summary

This chapter examined Cognica's comprehensive memory management architecture:

  1. Lock-Free Memory Pool: Thread-local caching with atomic free lists eliminates allocation contention, enabling efficient memory allocation in multi-threaded query execution.

  2. Cache Eviction Policies: LRU and LFU caches provide O(1) operations for different access patterns—temporal locality versus frequency-based access.

  3. Storage Engine Memory: Careful configuration of block cache, row cache, and write buffers balances read performance against memory consumption.

  4. Query Memory Budgeting: The MemoryBudgetAllocator distributes available memory across pipeline operators based on cardinality estimates and operator priorities.

  5. Disk Spill Framework: External sort, Grace Hash Join, and hash-partitioned aggregation handle workloads that exceed memory limits.

  6. Cost-Based Decisions: The SpillDecisionFramework uses I/O cost models to choose optimal execution strategies.

  7. CVM Integration: Spillable buffers for sort, aggregation, window functions, and set operations provide memory-safe execution in the CVM.

The key insight is that effective memory management is not about avoiding memory pressure—it's about gracefully handling it. By implementing cost-based spill decisions and efficient disk I/O, Cognica maintains query throughput even when working sets exceed available memory.

The memory architecture follows a layered design:

  • Fast path: Thread-local allocation and caching for common operations
  • Shared path: Lock-free global pools for thread coordination
  • Overflow path: Disk spill for memory-intensive operations

This layered approach ensures that the common case is fast while the uncommon case (memory overflow) is handled correctly rather than catastrophically.

Copyright (c) 2023-2026 Cognica, Inc.