Chapter 10: Physical Planning and Execution Strategies

10.1 From Logical to Physical Plans

The transition from logical to physical planning marks a critical boundary in query processing. While logical plans describe what operations to perform using abstract relational algebra, physical plans specify how to execute those operations using concrete algorithms and data structures.

This chapter explores Cognica's physical planning system, which transforms optimized logical plans into executable physical plans by making three fundamental decisions:

  1. Access Path Selection: How to retrieve data (sequential scan, index scan, or point lookup)
  2. Algorithm Selection: Which algorithm to use for each operation (hash join vs. merge join, quicksort vs. external sort)
  3. Resource Allocation: How to distribute memory budgets and handle spill to disk

10.1.1 Physical Plan Properties

A physical plan differs from a logical plan in several important ways:

PropertyLogical PlanPhysical Plan
AbstractionRelational algebraExecutable algorithms
Cost awarenessNoneFull cost model
Memory awarenessNoneBudget allocation
OrderingImplicitExplicit tracking
ParallelismUnspecifiedExecution model defined

Physical operators carry rich metadata that guides execution:

struct OperatorProperties {
  CardinalityEstimate cardinality {};      // Expected output rows
  std::vector<PhysicalOrdering> ordering {}; // Output sort order
  size_t memory_budget = 0;                // Allocated memory
  bool will_spill = false;                 // Spill prediction
  std::optional<SpillOptions> spill_options {};
  bool preserves_ordering = false;         // Ordering guarantee
};

10.1.2 Physical Planning Architecture

The physical planning process follows a systematic flow:

Loading diagram...

10.2 Physical Operator Types

Cognica defines a comprehensive set of physical operators, each representing a specific execution algorithm.

10.2.1 Operator Classification

Physical operators are classified by their function:

enum class PhysicalOpType : uint8_t {
  // === Scan Operators ===
  kSeqScan,           // Full collection sequential scan
  kIndexScan,         // B-tree index range scan with heap fetch
  kIndexOnlyScan,     // Covering index scan (no heap fetch)
  kPrimaryKeyLookup,  // Direct point lookup by primary key

  // === Filter Operators ===
  kFilter,            // Interpreted predicate evaluation
  kBytecodeFilter,    // CVM bytecode compiled filter

  // === Sort Operators ===
  kSort,              // In-memory quicksort
  kTopKSort,          // Heap-based top-K selection
  kExternalSort,      // External merge sort (disk spill)
  kIndexSort,         // Leverage index ordering

  // === Limit/Skip ===
  kLimit,             // Output row cap (constant or expression)

  // === Aggregate Operators ===
  kHashAggregate,     // Hash-based grouping
  kSortAggregate,     // Sort-based grouping
  kStreamAggregate,   // Streaming on sorted input

  // === Join Operators ===
  kHashJoin,          // Build/probe hash join
  kIndexNestedLoop,   // Index lookup on inner table
  kNestedLoopJoin,    // Simple nested loop
  kMergeJoin,         // Sort-merge join

  // === Set Operators ===
  kUnionAll,          // Concatenate without dedup
  kHashUnion,         // Union with hash deduplication

  // === Transform Operators ===
  kProject,           // Field selection/transformation
  kLiteral,           // Constant field injection

  // === Subquery and Table Function Operators ===
  kSubquery,          // Derived table (inline view)
  kTableFunc,         // Table-valued function (e.g., generate_series)
  kUnnest,            // Array unnesting to rows

  // === Search Operators ===
  kFullTextSearch,    // Full-text search
  kVectorSearch       // Vector similarity search
};

10.2.2 Operator Hierarchy

All physical operators inherit from a common base class:

Loading diagram...

10.2.3 Scan Operators

Scan operators provide the foundation for data access:

Sequential Scan (SeqScanOp):

  • Reads all documents in a collection
  • Optionally applies a pushed-down filter
  • Cost: O(n)O(n) where nn is collection size
  • Best when: High selectivity or no suitable index

Index Scan (IndexScanOp):

  • Uses B-tree index to locate matching documents
  • Fetches documents from heap storage
  • Cost: O(klogn+k)O(k \log n + k) where kk is result size
  • Best when: Low selectivity with matching index

Index-Only Scan (IndexOnlyScanOp):

  • Reads data directly from index (covering index)
  • No heap fetch required
  • Cost: O(klogn)O(k \log n)
  • Best when: Index contains all required columns

Primary Key Lookup (PrimaryKeyLookupOp):

  • Direct point lookup by primary key
  • Cost: O(1)O(1) per lookup
  • Best when: Equality predicate on primary key

10.2.4 Join Operators

Join operators combine data from multiple sources:

Hash Join (HashJoinOp):

Phase 1 (Build): Hash smaller relation into hash table
Phase 2 (Probe): Scan larger relation, probe hash table
  • Build cost: O(R)O(|R|)
  • Probe cost: O(S)O(|S|)
  • Memory: O(min(R,S))O(\min(|R|, |S|))
  • Best when: No useful indexes, adequate memory

Index Nested Loop (IndexNestedLoopOp):

For each row in outer:
    Look up matching rows via index on inner
  • Cost: O(R×logS)O(|R| \times \log |S|)
  • Memory: O(1)O(1)
  • Best when: Small outer table, index on inner join key

Merge Join (MergeJoinOp):

Advance both sorted inputs in lockstep
Emit matching pairs
  • Cost: O(R+S)O(|R| + |S|) (assuming pre-sorted)
  • Memory: O(1)O(1)
  • Best when: Both inputs already sorted on join key

Nested Loop Join (NestedLoopJoinOp):

For each row in outer:
    For each row in inner:
        If predicate matches, emit
  • Cost: O(R×S)O(|R| \times |S|)
  • Memory: O(1)O(1)
  • Best when: Very small relations or fallback

10.2.5 Aggregate Operators

Aggregate operators compute grouped summaries:

Hash Aggregate (HashAggregateOp):

  • Groups rows using hash table keyed by group columns
  • Maintains aggregate state per group
  • Cost: O(n)O(n)
  • Memory: O(g)O(g) where gg is number of groups

Stream Aggregate (StreamAggregateOp):

  • Requires input sorted on group keys
  • Processes groups in single pass
  • Cost: O(n)O(n)
  • Memory: O(1)O(1) (single group state)

Sort Aggregate (SortAggregateOp):

  • Sorts input, then streams
  • Cost: O(nlogn)O(n \log n) for sort + O(n)O(n) for aggregate
  • Memory: O(n)O(n) for sort

10.2.6 Sort Operators

Sort operators order result sets:

In-Memory Sort (PhysicalSortOp):

  • QuickSort implementation
  • Cost: O(nlogn)O(n \log n)
  • Memory: O(n)O(n)
  • Best when: Data fits in memory

Top-K Sort (TopKSortOp):

  • Heap-based selection
  • Cost: O(nlogk)O(n \log k)
  • Memory: O(k)O(k)
  • Best when: Small LIMIT clause

External Sort (ExternalSortOp):

  • External merge sort with disk spill
  • Cost: O(nlogn)O(n \log n) comparisons + I/O
  • Memory: Configurable budget
  • Best when: Data exceeds memory

Index Sort (IndexSortOp):

  • Leverages index ordering
  • Cost: O(0)O(0) (no actual sort)
  • Best when: Index provides required ordering

10.2.7 Subquery and Table Function Operators

Subquery (PhysicalSubquery):

The physical subquery operator wraps a complete physical plan that was produced from a derived table (FROM (SELECT ...) AS t). When the logical SubqueryOp carries column aliases (the AS t(col1, col2) syntax), the physical planner inserts a rename projection layer above the child plan. This rename layer is a PhysicalProject (or BatchProject in batch mode) whose target list maps each original output column to its alias name. This approach is more general than mutating column names on a specific child node type, because it works regardless of whether the child is a PhysicalValues, a sort, a join, or any other operator.

After inserting the optional rename layer, the subquery alias is propagated to the outermost physical node via set_output_alias(), which allows parent join operators to build CompositeRow objects with properly qualified field names.

Table Function (PhysicalTableFunc):

Table-valued functions such as generate_series(), json_each(), or graph query functions produce a set of rows from their arguments. The physical operator stores the function name, argument expressions, and declared output columns. At execution time, the operator evaluates its arguments, invokes the registered table function implementation, and produces rows through the standard iterator interface.

Table function operators participate in the transactional execution context. The ExecutionContext carries an optional DocumentDB handle and DocumentDBTransaction pointer so that write-capable table functions (such as Cypher mutation functions) can participate in the active SQL transaction instead of falling back to global database state.

Unnest (PhysicalUnnest):

The unnest operator expands an array-valued expression into a set of rows, one per element. It is the physical counterpart of UNNEST(array_expr) in the FROM clause.

10.2.8 Limit Operator

The PhysicalLimit operator supports both constant and expression-based limits. When the logical plan contains a constant LIMIT or OFFSET, the values are stored directly. When the value comes from a parameter or expression, the operator stores the unevaluated AST expression in limit_expr / offset_expr and evaluates it at open() time to produce an effective limit and offset. This allows parameterized queries like LIMIT $1 OFFSET $2 to work correctly without recompilation.

10.3 Execution Models

Cognica implements a hybrid execution architecture supporting three execution models, each optimized for different scenarios.

10.3.1 Iterator (Volcano) Model

The primary execution model follows the classic Volcano iterator pattern:

Loading diagram...

Each physical operator implements the Cursor interface:

class Cursor {
public:
  virtual auto next() -> Document* = 0;
  virtual auto has_next() const -> bool = 0;
  virtual void close() = 0;
};

The to_cursor() method converts a physical operator to an executable cursor:

class PhysicalOperator {
public:
  virtual auto to_cursor(ExecutionContext& ctx) -> std::unique_ptr<Cursor> = 0;
};

Cursor Implementations:

Cursor TypePurpose
ScanCursorIterates over collection
FilterCursorApplies predicate to input
ProjectCursorTransforms fields
SortCursorBuffers and sorts input
LimitCursorCaps output count
HashJoinOperatorExecutes hash join
UnionCursorCombines multiple inputs

Advantages:

  • Simple, composable design
  • Low memory overhead for streaming operations
  • Natural pipelining of operators

Disadvantages:

  • High per-row overhead from virtual function calls
  • Poor cache utilization (row-at-a-time processing)

10.3.2 CVM Bytecode Execution

For SQL queries, Cognica compiles physical plans to CVM (Cognica Virtual Machine) bytecode:

Loading diagram...

Bytecode Generation Example:

For a simple filter query:

SELECT name FROM users WHERE age > 21

Generated bytecode:

CURSOR_OPEN slot_0, "users"
LABEL loop:
  CURSOR_NEXT R0, slot_0
  JUMP_NULL R0, done

  ; Filter: age > 21
  FIELD_GET R1, R0, "age"
  LOAD_CONST R2, 21
  CMP_GT R3, R1, R2
  JUMP_IF_FALSE R3, loop

  ; Project: name
  FIELD_GET R4, R0, "name"
  EMIT_ROW R4
  JUMP loop

LABEL done:
  CURSOR_CLOSE slot_0
  HALT

Lowering Result Structure:

struct PlanLoweringResult {
  bool success = false;
  std::unique_ptr<BytecodeModule> module;
  std::vector<std::string> output_columns;
  uint8_t cursor_slots_used = 0;
  uint8_t registers_used = 0;

  // Index queries for cursor initialization
  std::unordered_map<uint8_t, Document> index_queries;

  // Registered subqueries
  std::vector<std::unique_ptr<SubqueryExpr>> subqueries;

  // Optimization hints for external tables
  std::unordered_map<uint8_t, int64_t> limit_hints;
  std::unordered_map<uint8_t, int64_t> offset_hints;
};

Register Allocation:

The lowering phase manages a limited set of virtual registers:

class PlanLowering {
private:
  static constexpr size_t kMaxRegisters = 32;
  std::bitset<kMaxRegisters> register_in_use_;
  std::bitset<kMaxRegisters> register_reserved_;
  std::deque<uint8_t> allocation_order_;  // FIFO for eviction
  uint8_t next_cursor_slot_ = 0;

  auto allocate_register_() -> uint8_t;
  auto allocate_cursor_slot_() -> uint8_t;
  auto evict_register_() -> uint8_t;  // Spill to stack if needed
};

Advantages:

  • Eliminates virtual function overhead
  • Computed-goto dispatch for fast interpretation
  • Amenable to JIT compilation

10.3.3 Vectorized Execution

For columnar data processing, Cognica supports vectorized execution using Apache Arrow:

class VectorizedContext {
  auto get_batch(uint8_t reg) -> ColumnBatch*;
  auto get_column(uint8_t batch_reg, size_t col_idx) -> ColumnData*;
};

class ColumnBatch {
  size_t row_count_;
  std::vector<ColumnData> columns_;
};

Vectorized operations process data in batches (typically 1024-4096 rows), enabling:

  • SIMD instruction utilization
  • Better cache locality
  • Reduced interpretation overhead

Vectorized Lowering Methods:

class PlanLowering {
  void lower_seq_scan_vectorized_(const SeqScanOp* op);
  void lower_filter_vectorized_(const FilterOp* op);
  void lower_project_vectorized_(const ProjectOp* op);
  void lower_batch_hash_join_(const HashJoinOp* op);
  void lower_batch_sort_(const SortOp* op);
};

10.4 Physical Plan Builder

The PhysicalPlanBuilder transforms logical plans into physical plans through a recursive descent process.

10.4.1 Builder Configuration

class PhysicalPlanBuilder final {
public:
  struct Config {
    std::string collection_name;
    const IndexDescriptor* index_desc = nullptr;
    const IndexStatisticsManager* stats_mgr = nullptr;
    const LSMCostModel* lsm_cost = nullptr;
    int64_t total_rows = 0;
    double avg_row_width = 512.0;
    size_t memory_budget = 256 * 1024 * 1024;  // 256 MB
    std::vector<std::string> output_fields {};
  };

  explicit PhysicalPlanBuilder(Config config);
  auto build(const LogicalPlan& logical_plan) -> PhysicalPlan;

private:
  auto build_(const LogicalOperator* op) -> PhysicalOpPtr;
  auto build_scan_(const ScanOp* scan) -> PhysicalOpPtr;
  auto build_filter_(const FilterOp* filter) -> PhysicalOpPtr;
  auto build_sort_(const SortOp* sort) -> PhysicalOpPtr;
  auto build_group_(const GroupOp* group) -> PhysicalOpPtr;
  auto build_join_(const JoinOp* join) -> PhysicalOpPtr;
};

10.4.2 Recursive Plan Construction

The builder processes the logical plan tree bottom-up:

auto PhysicalPlanBuilder::build_(const LogicalOperator* op) -> PhysicalOpPtr {
  switch (op->type()) {
    case LogicalOpType::kScan:
      return build_scan_(static_cast<const ScanOp*>(op));

    case LogicalOpType::kFilter:
      return build_filter_(static_cast<const FilterOp*>(op));

    case LogicalOpType::kSort:
      return build_sort_(static_cast<const SortOp*>(op));

    case LogicalOpType::kGroup:
      return build_group_(static_cast<const GroupOp*>(op));

    case LogicalOpType::kJoin:
      return build_join_(static_cast<const JoinOp*>(op));

    // ... other operator types
  }
}

10.4.3 Access Path Selection

For scan operators, the builder enumerates and costs all viable access paths:

auto PhysicalPlanBuilder::build_scan_(const ScanOp* scan) -> PhysicalOpPtr {
  auto enumerator = AccessPathEnumerator {
    .collection_name = config_.collection_name,
    .index_desc = config_.index_desc,
    .stats_mgr = config_.stats_mgr,
    .lsm_cost = config_.lsm_cost,
    .total_rows = config_.total_rows,
    .avg_row_width = config_.avg_row_width,
    .output_fields = config_.output_fields
  };

  // Get filter from parent if available
  auto filter = get_pushed_filter_();

  // Select lowest-cost access path
  auto best_path = enumerator.select_best(filter);
  return best_path.op;
}

10.4.4 Strategy Selection Integration

For blocking operators, the builder consults strategy selectors:

auto PhysicalPlanBuilder::build_sort_(const SortOp* sort) -> PhysicalOpPtr {
  // Build child first
  auto child = build_(sort->children()[0].get());
  auto child_props = child->properties();

  // Configure strategy selection
  auto sort_config = SortConfig {
    .input_rows = child_props.cardinality.rows,
    .row_width = child_props.cardinality.row_width,
    .sort_keys = sort->sort_keys(),
    .limit_hint = current_limit_hint_,
    .memory_budget = remaining_memory_budget_,
    .input_ordering = child_props.ordering,
    .available_indexes = get_available_indexes_()
  };

  // Select strategy
  auto strategy = SortStrategySelector::select(sort_config);

  // Create appropriate physical operator
  switch (strategy) {
    case SortStrategy::kNoSort:
      return child;  // Already sorted

    case SortStrategy::kTopKHeap:
      return std::make_unique<TopKSortOp>(
          *current_limit_hint_, sort->sort_keys(), std::move(child));

    case SortStrategy::kInMemorySort:
      return std::make_unique<PhysicalSortOp>(
          sort->sort_keys(), std::move(child));

    case SortStrategy::kExternalSort:
      return std::make_unique<ExternalSortOp>(
          sort->sort_keys(), std::move(child), spill_config_);

    case SortStrategy::kIndexScan:
      return create_index_sort_(sort->sort_keys());
  }
}

10.4.5 Scan Projection Pushdown

After the physical plan tree is constructed, the planner performs a top-down pass to propagate column references down to scan operators. The goal is to ensure that each scan reads only the columns that are actually needed by the operators above it, reducing I/O and memory consumption.

The set_scan_projections_() method walks the physical plan tree and accumulates the set of referenced columns at each operator by inspecting:

  • Filter operators: columns referenced in the predicate expression.
  • Project operators: columns referenced in the target list.
  • Join operators (Hash, NestedLoop, Merge): columns referenced in the join condition.
  • Sort operators: columns referenced in ORDER BY expressions.
  • Aggregate operators (Hash, Stream): columns in GROUP BY keys, aggregate function arguments, aggregate ordering clauses, and HAVING predicates.
  • DistinctOn operators: columns in the DISTINCT ON expression list.
  • Unnest operators: columns referenced in the unnest expression.
  • TableFunc operators: columns referenced in function arguments.

When the pass reaches a PhysicalSeqScan or PhysicalIndexScan, it sets the scan's projection list to the accumulated column set.

LATERAL Dependency Propagation: Lateral joins introduce a special challenge for projection pushdown. The right side of a lateral join may reference columns from the left side that do not appear in the join condition itself. To handle this, the planner uses a separate collect_plan_referenced_columns_() method that recursively collects all column references from the entire right subtree of a lateral nested-loop join. These columns are added to the needed-columns set before the pass descends into the left child, ensuring that the left scan includes all columns the right side depends on.

10.5 Access Path Enumeration

Access path enumeration generates all viable ways to read data from a table.

10.5.1 Access Path Structure

struct AccessPath {
  PhysicalOpPtr op;       // The physical operator
  optimizer::Cost cost;   // Estimated cost
  bool is_covering;       // True if index-only scan possible
};

10.5.2 Enumeration Process

The AccessPathEnumerator systematically considers each access method:

auto AccessPathEnumerator::enumerate(const ast::Expression* filter)
    -> std::vector<AccessPath> {

  auto paths = std::vector<AccessPath> {};

  // Sequential scan is always available (fallback)
  paths.push_back(create_seq_scan_());

  // Check for primary key equality condition
  if (is_pk_equality_(filter)) {
    paths.push_back(create_pk_lookup_(filter));
  }

  // Consider each available index
  for (const auto& index : indexes_) {
    // Extract bounds that match this index
    if (auto bounds = extract_index_bounds_(filter, index)) {
      // Standard index scan (with heap fetch)
      paths.push_back(create_index_scan_(index, *bounds));

      // Index-only scan (if covering)
      if (is_covering_index_(index)) {
        paths.push_back(create_index_only_scan_(index, *bounds));
      }
    }
  }

  // Sort by cost (cheapest first)
  std::sort(paths.begin(), paths.end(),
            [](const auto& a, const auto& b) {
              return a.cost.total() < b.cost.total();
            });

  return paths;
}

10.5.3 Index Bound Extraction

The enumerator analyzes filter predicates to extract index bounds:

auto AccessPathEnumerator::extract_index_bounds_(
    const ast::Expression* filter,
    const IndexInfo& index) -> std::optional<IndexBounds> {

  if (!filter) {
    return std::nullopt;
  }

  auto bounds = IndexBounds {};

  // Collect equality conditions on index columns
  auto equalities = collect_equality_fields_(filter);

  // Match against index prefix
  for (const auto& idx_col : index.columns) {
    if (auto it = equalities.find(idx_col); it != equalities.end()) {
      bounds.add_equality(idx_col, it->second);
    } else {
      break;  // Index prefix broken
    }
  }

  // Check for range condition on next column
  if (bounds.prefix_length() < index.columns.size()) {
    auto next_col = index.columns[bounds.prefix_length()];
    if (auto range = extract_range_(filter, next_col)) {
      bounds.set_range(*range);
    }
  }

  return bounds.is_useful() ? std::optional(bounds) : std::nullopt;
}

10.5.4 Cost Comparison

The cost model determines which access path wins:

Sequential Scan Cost:

Cseq=R×(cputuple+iopagetuplesper_page)C_{seq} = |R| \times (cpu_{tuple} + \frac{io_{page}}{tuples_{per\_page}})

Index Scan Cost:

Cidx=R×sel×(cputuple+iorandom+ioheap)C_{idx} = |R| \times sel \times (cpu_{tuple} + io_{random} + io_{heap})

Index-Only Scan Cost:

Cidx_only=R×sel×(cputuple+iorandom)C_{idx\_only} = |R| \times sel \times (cpu_{tuple} + io_{random})

Primary Key Lookup Cost:

Cpk=k×(cpulookup+iopoint)C_{pk} = k \times (cpu_{lookup} + io_{point})

Where kk is the number of keys to look up.

Break-Even Analysis:

Index scan beats sequential scan when:

sel<cputuple+ioseqcputuple+iorandom+ioheapsel < \frac{cpu_{tuple} + io_{seq}}{cpu_{tuple} + io_{random} + io_{heap}}

With typical values:

sel<0.01+0.10.01+4.0+0.52.4%sel < \frac{0.01 + 0.1}{0.01 + 4.0 + 0.5} \approx 2.4\%

For highly selective queries (< ~2.5% of rows), index scans typically win.

10.6 Strategy Selection

Strategy selectors choose the best algorithm for each operator type based on input characteristics, available resources, and physical properties.

10.6.1 Sort Strategy Selection

enum class SortStrategy {
  kNoSort,         // Input already sorted
  kTopKHeap,       // Heap-based for small LIMIT
  kInMemorySort,   // QuickSort in memory
  kExternalSort,   // External merge sort
  kIndexScan       // Use index ordering
};

Selection Logic:

Loading diagram...

Algorithm Complexity Comparison:

StrategyTimeSpaceI/O
TopK HeapO(nlogk)O(n \log k)O(k)O(k)0
In-MemoryO(nlogn)O(n \log n)O(n)O(n)0
ExternalO(nlogn)O(n \log n)O(B)O(B)O(nBlogMnB)O(\frac{n}{B} \log_M \frac{n}{B})

Where BB is buffer size and MM is merge width.

10.6.2 Join Strategy Selection

enum class JoinStrategy {
  kHashJoin,          // Build/probe hash join
  kIndexNestedLoop,   // Index lookup on inner
  kMergeJoin,         // Sort-merge join
  kNestedLoop         // Simple nested loop
};

Selection Logic:

Loading diagram...

Threshold Constants:

static constexpr double kINLOuterThreshold = 1000.0;
static constexpr double kHashBuildCostPerRow = 1.0;
static constexpr double kHashProbeCostPerRow = 1.2;
static constexpr double kNestLoopCostPerRow = 10.0;
static constexpr double kSortMergeCostPerRow = 2.0;

10.6.3 Aggregate Strategy Selection

enum class AggregateStrategy {
  kStreamAggregate,   // Input sorted on group keys
  kHashAggregate,     // Hash-based grouping
  kSortAggregate      // Sort then stream
};

Selection Logic:

auto AggregateStrategySelector::select(const AggConfig& config) -> AggregateStrategy {
  // If input already sorted on group keys, stream is optimal
  if (is_sorted_on_(config.input_ordering, config.group_keys)) {
    return AggregateStrategy::kStreamAggregate;
  }

  // Estimate hash table memory requirement
  auto estimated_groups = config.estimated_groups.value_or(
      config.input_rows * kDefaultGroupReduction);
  auto hash_memory = estimated_groups * kBytesPerGroupState;

  // Hash aggregate if fits in memory
  if (hash_memory <= config.memory_budget) {
    return AggregateStrategy::kHashAggregate;
  }

  // Sort-aggregate for large group counts
  return AggregateStrategy::kSortAggregate;
}

Memory Constants:

static constexpr size_t kBytesPerGroupState = 256;  // Per-group state
static constexpr double kDefaultGroupReduction = 0.1;  // 10% reduction

10.7 Memory Management

Memory management is critical for physical execution. Cognica implements sophisticated memory budgeting with automatic spill handling.

10.7.1 Memory Budget Allocation

The MemoryBudgetAllocator distributes memory across operators:

struct OperatorMemoryBudget {
  size_t allocated = 0;
  bool will_spill = false;
  double spill_fraction = 0.0;
  SpillOptions spill_options {};
};

class MemoryBudgetAllocator final {
public:
  auto allocate(const ParsedPipeline& pipeline,
                const std::vector<CardinalityEstimate>& estimates)
      -> std::vector<OperatorMemoryBudget>;

private:
  size_t total_budget_;
  std::string spill_directory_;
};

Allocation Strategy:

  1. Identify Blocking Operators: Sort, Join, Aggregate require memory
  2. Calculate Requirements: Estimate memory per operator
  3. Weighted Distribution: Allocate proportionally with priority weighting
  4. Apply Constraints: Enforce minimum (16 MB) and maximum bounds
  5. Configure Spill: Set up spill options for operators exceeding budget

Priority Weights:

static constexpr double kSortPriority = 1.5;   // Sort is memory-hungry
static constexpr double kGroupPriority = 1.2;  // Groups vary widely
static constexpr double kJoinPriority = 1.0;   // Base priority

10.7.2 Spill Configuration

When operators cannot fit data in memory, they spill to disk:

struct SpillConfig {
  size_t memory_limit = 256 * 1024 * 1024;  // 256 MB
  size_t spill_batch_size = 10000;          // Rows per spill batch
  int32_t max_merge_width = 16;             // Merge fan-in
  std::filesystem::path temp_directory {};
  CompressionType compression = CompressionType::kNone;
};

10.7.3 Spillable Data Structures

Cognica implements spill-aware versions of key data structures:

SpillableSortBuffer:

  • Implements external merge sort
  • Writes sorted runs to disk when memory exceeded
  • Merges runs during final output phase

SpillableHashTable:

  • Uses Grace Hash Join partitioning
  • Partitions data by hash of key
  • Spills partitions independently
  • Reloads partitions on demand during probe

SpillableAggTable:

  • Hash aggregate with partition spill
  • Maintains partial aggregates in memory
  • Spills partitions when full
  • Merges partitions during finalization
Loading diagram...

10.7.4 Memory Tracking

Each operator tracks its memory usage:

class MemoryTracker {
public:
  void allocate(size_t bytes);
  void deallocate(size_t bytes);
  auto current_usage() const -> size_t;
  auto peak_usage() const -> size_t;
  auto budget() const -> size_t;
  auto should_spill() const -> bool;

private:
  std::atomic<size_t> current_ = 0;
  std::atomic<size_t> peak_ = 0;
  size_t budget_ = 0;
};

10.8 Cost Estimation

The cost model guides all physical planning decisions.

10.8.1 Cost Structure

struct Cost {
  double cpu_cost = 0.0;     // CPU cycles estimate
  double io_cost = 0.0;      // I/O operations
  double memory_cost = 0.0;  // Memory pressure
  double spill_cost = 0.0;   // Disk spill overhead

  auto total() const -> double {
    return cpu_cost +
           io_cost * kIOCostWeight +
           memory_cost * kMemoryCostWeight +
           spill_cost * kSpillCostWeight;
  }
};

Weight Constants:

static constexpr double kIOCostWeight = 10.0;      // I/O is expensive
static constexpr double kMemoryCostWeight = 0.1;   // Memory is pressure
static constexpr double kSpillCostWeight = 5.0;    // Spill is costly

The weights reflect modern hardware characteristics:

  • Disk I/O is 10x more expensive than CPU operations
  • Memory pressure translates to cache misses
  • Spill incurs both I/O and CPU overhead

10.8.2 Cost Formulas

Sequential Scan:

Cseq=n×(cputuple+iopagerowsper_page)C_{seq} = n \times (cpu_{tuple} + \frac{io_{page}}{rows_{per\_page}})

Index Scan:

Cidx=k×(cpulookup+iorandom+ioheap)C_{idx} = k \times (cpu_{lookup} + io_{random} + io_{heap})

Where k=n×selectivityk = n \times selectivity.

Hash Join:

Chash=build×cpuhash×kbuild+probe×cpuhash×kprobeC_{hash} = |build| \times cpu_{hash} \times k_{build} + |probe| \times cpu_{hash} \times k_{probe}

Sort (In-Memory):

Csort=n×log2(n)×cpucmpC_{sort} = n \times \log_2(n) \times cpu_{cmp}

Sort (External):

Cext=Csort+2×n×widthblock×iospill×logM(runs)C_{ext} = C_{sort} + \frac{2 \times n \times width}{block} \times io_{spill} \times \lceil \log_M(runs) \rceil

Hash Aggregate:

Cagg=n×cpuhash+g×cpuaggC_{agg} = n \times cpu_{hash} + g \times cpu_{agg}

Where gg is the number of groups.

10.8.3 Cost Constants

// CPU costs
static constexpr double kCPUTupleProcessing = 0.01;
static constexpr double kCPUComparison = 0.0001;
static constexpr double kCPUHashOp = 0.0005;

// I/O costs
static constexpr double kSeqPageCost = 1.0;
static constexpr double kRandomPageCost = 4.0;
static constexpr double kHeapAccessCost = 0.5;
static constexpr double kSpillIOCost = 0.01;  // Per byte

// Memory constants
static constexpr double kHashTableOverhead = 1.5;
static constexpr double kCompressionFactor = 0.5;  // LZ4 typical
static constexpr size_t kMergeWidth = 16;
static constexpr size_t kDefaultBlockSize = 8192;

10.8.4 Spill Prediction

The cost estimator predicts when operations will exceed memory:

auto CostEstimator::will_spill(int64_t rows,
                               double row_width,
                               size_t memory_budget) const -> bool {
  auto data_size = static_cast<double>(rows) * row_width;
  return data_size > static_cast<double>(memory_budget);
}

When spill is predicted, additional I/O cost is added:

Cspill=data_sizeblock_size×iospill×1compressionC_{spill} = \frac{data\_size}{block\_size} \times io_{spill} \times \frac{1}{compression}

10.9 Physical Ordering

Physical operators track output ordering properties, enabling sort elimination and merge join selection.

10.9.1 Ordering Representation

struct PhysicalOrdering {
  std::string column;
  SortOrder order;  // kAscending or kDescending
};

An operator's output ordering is a list of PhysicalOrdering entries representing the sort guarantee.

10.9.2 Ordering Propagation

Different operators affect ordering differently:

OperatorOrdering Effect
SeqScanNone (arbitrary order)
IndexScanIndex key order
SortProduces specified order
FilterPreserves input order
ProjectPreserves input order
HashJoinDestroys order
MergeJoinPreserves outer order
HashAggregateDestroys order
StreamAggregatePreserves group key order

10.9.3 Ordering Satisfaction

The optimizer checks if an operator's output satisfies a required ordering:

auto OperatorProperties::ordering_satisfies(
    const std::vector<PhysicalOrdering>& required) const -> bool {

  if (required.size() > ordering.size()) {
    return false;
  }

  for (size_t i = 0; i < required.size(); ++i) {
    if (ordering[i].column != required[i].column ||
        ordering[i].order != required[i].order) {
      return false;
    }
  }

  return true;
}

This enables sort elimination when input is already sorted.

10.10 Explain Plans

Physical plans can be explained for debugging and optimization analysis.

10.10.1 Explain Output

Each physical operator implements an explain() method:

auto HashJoinOp::explain() const -> std::string {
  auto ss = std::ostringstream {};
  ss << "HashJoin [" << join_type_to_string(join_type_) << "]";
  ss << " on (" << condition_to_string(join_condition_) << ")";
  ss << " build_from=" << (build_from_right_ ? "right" : "left");
  ss << " rows=" << properties_.cardinality.rows;
  ss << " cost=" << cost_.total();
  if (properties_.will_spill) {
    ss << " [SPILL]";
  }
  return ss.str();
}

10.10.2 Plan Tree Visualization

HashJoin [INNER] on (orders.customer_id = customers.id)
  build_from=right rows=50000 cost=12500.5
  |-- SeqScan(orders) rows=100000 cost=1000.0
  |-- IndexScan(customers) using pk_customers rows=10000 cost=500.0
      range: [1, 10000]

10.10.3 Execution Statistics

After execution, operators report actual statistics:

struct ExecutionStats {
  int64_t rows_processed = 0;
  int64_t rows_output = 0;
  int64_t bytes_read = 0;
  int64_t bytes_written = 0;
  int64_t execution_time_us = 0;
  int64_t memory_peak_bytes = 0;
  int64_t spill_bytes = 0;
};

Comparing estimated vs. actual statistics helps identify cardinality estimation errors.

10.11 CVM Integration

Physical plans are lowered to CVM bytecode for efficient execution. However, certain plan shapes are not yet supported by the CVM lowering path and fall back to the Volcano (iterator) executor automatically.

CVM Fallback Conditions: The executor checks should_use_cvm_for_plan() before attempting CVM lowering. Even when the CVM is otherwise enabled, the executor falls back to the Volcano path when the physical plan contains table-valued function or UNNEST operators inside an outer (correlated) context. The CVM subquery execution path does not correctly preserve standalone table-function row production when an outer context is installed, which manifests as a single NULL row for shapes like ARRAY(SELECT ... FROM unnest(...)). The fallback check walks the physical plan tree looking for kTableFunc or kUnnest nodes and, if any are found while an outer context is active, routes the query to the iterator executor.

10.11.1 Lowering Pipeline

Loading diagram...

10.11.2 Operator Lowering

Each physical operator has a corresponding lowering method:

class PlanLowering {
  void lower_seq_scan_(const SeqScanOp* op);
  void lower_index_scan_(const IndexScanOp* op);
  void lower_filter_(const PhysicalFilterOp* op);
  void lower_hash_join_(const HashJoinOp* op);
  void lower_sort_(const PhysicalSortOp* op);
  void lower_hash_aggregate_(const HashAggregateOp* op);
  void lower_project_(const ProjectOp* op);
  void lower_limit_(const LimitOp* op);
};

10.11.3 Cursor Slot Management

Scan operators use cursor slots to manage iteration state:

// Allocate slot
auto slot = allocate_cursor_slot_();

// Open cursor
emit_(Opcode::CURSOR_OPEN, slot, collection_name);

// Iteration loop
emit_label_(loop_label);
emit_(Opcode::CURSOR_NEXT, result_reg, slot);
emit_(Opcode::JUMP_NULL, result_reg, done_label);
// ... process row ...
emit_(Opcode::JUMP, loop_label);

// Close cursor
emit_label_(done_label);
emit_(Opcode::CURSOR_CLOSE, slot);

10.11.4 Join Lowering

Hash join lowering generates build and probe phases:

void PlanLowering::lower_hash_join_(const HashJoinOp* op) {
  // Lower build side
  lower_(op->build_child());

  // Build phase: populate hash table
  emit_(Opcode::HASH_BUILD_START, hash_table_reg);
  emit_label_(build_loop);
  emit_(Opcode::CURSOR_NEXT, build_row, build_cursor);
  emit_(Opcode::JUMP_NULL, build_row, build_done);
  emit_(Opcode::HASH_BUILD_INSERT, hash_table_reg, build_row, key_reg);
  emit_(Opcode::JUMP, build_loop);
  emit_label_(build_done);

  // Lower probe side
  lower_(op->probe_child());

  // Probe phase: scan and look up
  emit_label_(probe_loop);
  emit_(Opcode::CURSOR_NEXT, probe_row, probe_cursor);
  emit_(Opcode::JUMP_NULL, probe_row, probe_done);
  emit_(Opcode::HASH_PROBE, match_reg, hash_table_reg, probe_row, key_reg);
  emit_(Opcode::JUMP_NULL, match_reg, probe_loop);

  // Emit matched row
  emit_(Opcode::ROW_COMBINE, result_reg, probe_row, match_reg);
  emit_(Opcode::EMIT_ROW, result_reg);
  emit_(Opcode::JUMP, probe_loop);

  emit_label_(probe_done);
  emit_(Opcode::HASH_TABLE_FREE, hash_table_reg);
}

10.11.5 Nested CTE Discovery

Before physical planning begins, the executor performs a recursive discovery pass over the AST to find and materialize CTEs defined in nested subqueries. A top-level query may contain derived tables or join subqueries that themselves define WITH clauses. If these nested CTEs are not discovered and materialized before the main plan is built, references to them will fail during execution.

The prepare_nested_subquery_ctes_() method walks the AST recursively:

  1. For each SelectStmt, it inspects from_subqueries and joins for nested subqueries and descends into them.
  2. When a nested statement contains a WITH clause, the method builds a logical plan for that statement, executes its CTEs, and registers the materialized results.
  3. The CTE name scope is saved and restored around each nested discovery to prevent name collisions between sibling subqueries.
  4. SetOperationStmt nodes are handled similarly, recursing into both left_query and right_query.

This discovery runs before optimization and physical planning for all three execution paths (Volcano, CVM, and Acero), ensuring that CTE materialization is complete regardless of which execution model is selected.

10.12 Summary

Cognica's physical planning system transforms logical query plans into efficient executable plans through several key mechanisms:

  1. Access Path Enumeration: Systematically evaluates sequential scans, index scans, index-only scans, and point lookups to find the cheapest data access method.

  2. Strategy Selection: Chooses optimal algorithms for sort (heap, quicksort, external), join (hash, merge, nested loop), and aggregate (hash, stream, sort) operations based on input characteristics and available resources.

  3. Hybrid Execution Model: Supports iterator-based execution for simplicity, CVM bytecode for performance, and vectorized execution for columnar workloads. The executor automatically falls back from CVM to the Volcano iterator model when the plan contains table functions or UNNEST operators inside a correlated outer context.

  4. Memory-Aware Planning: Distributes memory budgets across operators, predicts spill behavior, and configures spill options proactively.

  5. Physical Property Tracking: Maintains ordering and cardinality information to enable optimizations like sort elimination and merge join selection.

  6. Cost-Based Decisions: Uses a calibrated cost model considering CPU, I/O, memory, and spill costs to guide all planning decisions.

  7. Scan Projection Pushdown: Propagates needed-column sets top-down through the physical plan tree, with special handling for LATERAL join dependencies, so that scan operators read only the columns required by downstream operators.

  8. Nested CTE Discovery: Recursively walks the AST before planning to discover and materialize CTEs defined in nested subqueries, ensuring they are available regardless of nesting depth.

The physical planning phase bridges the gap between declarative query specification and efficient execution, making decisions that can impact query performance by orders of magnitude.

Copyright (c) 2023-2026 Cognica, Inc.