Chapter 10: Physical Planning and Execution Strategies
10.1 From Logical to Physical Plans
The transition from logical to physical planning marks a critical boundary in query processing. While logical plans describe what operations to perform using abstract relational algebra, physical plans specify how to execute those operations using concrete algorithms and data structures.
This chapter explores Cognica's physical planning system, which transforms optimized logical plans into executable physical plans by making three fundamental decisions:
- Access Path Selection: How to retrieve data (sequential scan, index scan, or point lookup)
- Algorithm Selection: Which algorithm to use for each operation (hash join vs. merge join, quicksort vs. external sort)
- Resource Allocation: How to distribute memory budgets and handle spill to disk
10.1.1 Physical Plan Properties
A physical plan differs from a logical plan in several important ways:
| Property | Logical Plan | Physical Plan |
|---|---|---|
| Abstraction | Relational algebra | Executable algorithms |
| Cost awareness | None | Full cost model |
| Memory awareness | None | Budget allocation |
| Ordering | Implicit | Explicit tracking |
| Parallelism | Unspecified | Execution model defined |
Physical operators carry rich metadata that guides execution:
struct OperatorProperties {
CardinalityEstimate cardinality {}; // Expected output rows
std::vector<PhysicalOrdering> ordering {}; // Output sort order
size_t memory_budget = 0; // Allocated memory
bool will_spill = false; // Spill prediction
std::optional<SpillOptions> spill_options {};
bool preserves_ordering = false; // Ordering guarantee
};
10.1.2 Physical Planning Architecture
The physical planning process follows a systematic flow:
10.2 Physical Operator Types
Cognica defines a comprehensive set of physical operators, each representing a specific execution algorithm.
10.2.1 Operator Classification
Physical operators are classified by their function:
enum class PhysicalOpType : uint8_t {
// === Scan Operators ===
kSeqScan, // Full collection sequential scan
kIndexScan, // B-tree index range scan with heap fetch
kIndexOnlyScan, // Covering index scan (no heap fetch)
kPrimaryKeyLookup, // Direct point lookup by primary key
// === Filter Operators ===
kFilter, // Interpreted predicate evaluation
kBytecodeFilter, // CVM bytecode compiled filter
// === Sort Operators ===
kSort, // In-memory quicksort
kTopKSort, // Heap-based top-K selection
kExternalSort, // External merge sort (disk spill)
kIndexSort, // Leverage index ordering
// === Limit/Skip ===
kLimit, // Output row cap (constant or expression)
// === Aggregate Operators ===
kHashAggregate, // Hash-based grouping
kSortAggregate, // Sort-based grouping
kStreamAggregate, // Streaming on sorted input
// === Join Operators ===
kHashJoin, // Build/probe hash join
kIndexNestedLoop, // Index lookup on inner table
kNestedLoopJoin, // Simple nested loop
kMergeJoin, // Sort-merge join
// === Set Operators ===
kUnionAll, // Concatenate without dedup
kHashUnion, // Union with hash deduplication
// === Transform Operators ===
kProject, // Field selection/transformation
kLiteral, // Constant field injection
// === Subquery and Table Function Operators ===
kSubquery, // Derived table (inline view)
kTableFunc, // Table-valued function (e.g., generate_series)
kUnnest, // Array unnesting to rows
// === Search Operators ===
kFullTextSearch, // Full-text search
kVectorSearch // Vector similarity search
};
10.2.2 Operator Hierarchy
All physical operators inherit from a common base class:
10.2.3 Scan Operators
Scan operators provide the foundation for data access:
Sequential Scan (SeqScanOp):
- Reads all documents in a collection
- Optionally applies a pushed-down filter
- Cost: where is collection size
- Best when: High selectivity or no suitable index
Index Scan (IndexScanOp):
- Uses B-tree index to locate matching documents
- Fetches documents from heap storage
- Cost: where is result size
- Best when: Low selectivity with matching index
Index-Only Scan (IndexOnlyScanOp):
- Reads data directly from index (covering index)
- No heap fetch required
- Cost:
- Best when: Index contains all required columns
Primary Key Lookup (PrimaryKeyLookupOp):
- Direct point lookup by primary key
- Cost: per lookup
- Best when: Equality predicate on primary key
10.2.4 Join Operators
Join operators combine data from multiple sources:
Hash Join (HashJoinOp):
Phase 1 (Build): Hash smaller relation into hash table
Phase 2 (Probe): Scan larger relation, probe hash table
- Build cost:
- Probe cost:
- Memory:
- Best when: No useful indexes, adequate memory
Index Nested Loop (IndexNestedLoopOp):
For each row in outer:
Look up matching rows via index on inner
- Cost:
- Memory:
- Best when: Small outer table, index on inner join key
Merge Join (MergeJoinOp):
Advance both sorted inputs in lockstep
Emit matching pairs
- Cost: (assuming pre-sorted)
- Memory:
- Best when: Both inputs already sorted on join key
Nested Loop Join (NestedLoopJoinOp):
For each row in outer:
For each row in inner:
If predicate matches, emit
- Cost:
- Memory:
- Best when: Very small relations or fallback
10.2.5 Aggregate Operators
Aggregate operators compute grouped summaries:
Hash Aggregate (HashAggregateOp):
- Groups rows using hash table keyed by group columns
- Maintains aggregate state per group
- Cost:
- Memory: where is number of groups
Stream Aggregate (StreamAggregateOp):
- Requires input sorted on group keys
- Processes groups in single pass
- Cost:
- Memory: (single group state)
Sort Aggregate (SortAggregateOp):
- Sorts input, then streams
- Cost: for sort + for aggregate
- Memory: for sort
10.2.6 Sort Operators
Sort operators order result sets:
In-Memory Sort (PhysicalSortOp):
- QuickSort implementation
- Cost:
- Memory:
- Best when: Data fits in memory
Top-K Sort (TopKSortOp):
- Heap-based selection
- Cost:
- Memory:
- Best when: Small LIMIT clause
External Sort (ExternalSortOp):
- External merge sort with disk spill
- Cost: comparisons + I/O
- Memory: Configurable budget
- Best when: Data exceeds memory
Index Sort (IndexSortOp):
- Leverages index ordering
- Cost: (no actual sort)
- Best when: Index provides required ordering
10.2.7 Subquery and Table Function Operators
Subquery (PhysicalSubquery):
The physical subquery operator wraps a complete physical plan that was produced from a derived table (FROM (SELECT ...) AS t). When the logical SubqueryOp carries column aliases (the AS t(col1, col2) syntax), the physical planner inserts a rename projection layer above the child plan. This rename layer is a PhysicalProject (or BatchProject in batch mode) whose target list maps each original output column to its alias name. This approach is more general than mutating column names on a specific child node type, because it works regardless of whether the child is a PhysicalValues, a sort, a join, or any other operator.
After inserting the optional rename layer, the subquery alias is propagated to the outermost physical node via set_output_alias(), which allows parent join operators to build CompositeRow objects with properly qualified field names.
Table Function (PhysicalTableFunc):
Table-valued functions such as generate_series(), json_each(), or graph query functions produce a set of rows from their arguments. The physical operator stores the function name, argument expressions, and declared output columns. At execution time, the operator evaluates its arguments, invokes the registered table function implementation, and produces rows through the standard iterator interface.
Table function operators participate in the transactional execution context. The ExecutionContext carries an optional DocumentDB handle and DocumentDBTransaction pointer so that write-capable table functions (such as Cypher mutation functions) can participate in the active SQL transaction instead of falling back to global database state.
Unnest (PhysicalUnnest):
The unnest operator expands an array-valued expression into a set of rows, one per element. It is the physical counterpart of UNNEST(array_expr) in the FROM clause.
10.2.8 Limit Operator
The PhysicalLimit operator supports both constant and expression-based limits. When the logical plan contains a constant LIMIT or OFFSET, the values are stored directly. When the value comes from a parameter or expression, the operator stores the unevaluated AST expression in limit_expr / offset_expr and evaluates it at open() time to produce an effective limit and offset. This allows parameterized queries like LIMIT $1 OFFSET $2 to work correctly without recompilation.
10.3 Execution Models
Cognica implements a hybrid execution architecture supporting three execution models, each optimized for different scenarios.
10.3.1 Iterator (Volcano) Model
The primary execution model follows the classic Volcano iterator pattern:
Each physical operator implements the Cursor interface:
class Cursor {
public:
virtual auto next() -> Document* = 0;
virtual auto has_next() const -> bool = 0;
virtual void close() = 0;
};
The to_cursor() method converts a physical operator to an executable cursor:
class PhysicalOperator {
public:
virtual auto to_cursor(ExecutionContext& ctx) -> std::unique_ptr<Cursor> = 0;
};
Cursor Implementations:
| Cursor Type | Purpose |
|---|---|
ScanCursor | Iterates over collection |
FilterCursor | Applies predicate to input |
ProjectCursor | Transforms fields |
SortCursor | Buffers and sorts input |
LimitCursor | Caps output count |
HashJoinOperator | Executes hash join |
UnionCursor | Combines multiple inputs |
Advantages:
- Simple, composable design
- Low memory overhead for streaming operations
- Natural pipelining of operators
Disadvantages:
- High per-row overhead from virtual function calls
- Poor cache utilization (row-at-a-time processing)
10.3.2 CVM Bytecode Execution
For SQL queries, Cognica compiles physical plans to CVM (Cognica Virtual Machine) bytecode:
Bytecode Generation Example:
For a simple filter query:
SELECT name FROM users WHERE age > 21
Generated bytecode:
CURSOR_OPEN slot_0, "users"
LABEL loop:
CURSOR_NEXT R0, slot_0
JUMP_NULL R0, done
; Filter: age > 21
FIELD_GET R1, R0, "age"
LOAD_CONST R2, 21
CMP_GT R3, R1, R2
JUMP_IF_FALSE R3, loop
; Project: name
FIELD_GET R4, R0, "name"
EMIT_ROW R4
JUMP loop
LABEL done:
CURSOR_CLOSE slot_0
HALT
Lowering Result Structure:
struct PlanLoweringResult {
bool success = false;
std::unique_ptr<BytecodeModule> module;
std::vector<std::string> output_columns;
uint8_t cursor_slots_used = 0;
uint8_t registers_used = 0;
// Index queries for cursor initialization
std::unordered_map<uint8_t, Document> index_queries;
// Registered subqueries
std::vector<std::unique_ptr<SubqueryExpr>> subqueries;
// Optimization hints for external tables
std::unordered_map<uint8_t, int64_t> limit_hints;
std::unordered_map<uint8_t, int64_t> offset_hints;
};
Register Allocation:
The lowering phase manages a limited set of virtual registers:
class PlanLowering {
private:
static constexpr size_t kMaxRegisters = 32;
std::bitset<kMaxRegisters> register_in_use_;
std::bitset<kMaxRegisters> register_reserved_;
std::deque<uint8_t> allocation_order_; // FIFO for eviction
uint8_t next_cursor_slot_ = 0;
auto allocate_register_() -> uint8_t;
auto allocate_cursor_slot_() -> uint8_t;
auto evict_register_() -> uint8_t; // Spill to stack if needed
};
Advantages:
- Eliminates virtual function overhead
- Computed-goto dispatch for fast interpretation
- Amenable to JIT compilation
10.3.3 Vectorized Execution
For columnar data processing, Cognica supports vectorized execution using Apache Arrow:
class VectorizedContext {
auto get_batch(uint8_t reg) -> ColumnBatch*;
auto get_column(uint8_t batch_reg, size_t col_idx) -> ColumnData*;
};
class ColumnBatch {
size_t row_count_;
std::vector<ColumnData> columns_;
};
Vectorized operations process data in batches (typically 1024-4096 rows), enabling:
- SIMD instruction utilization
- Better cache locality
- Reduced interpretation overhead
Vectorized Lowering Methods:
class PlanLowering {
void lower_seq_scan_vectorized_(const SeqScanOp* op);
void lower_filter_vectorized_(const FilterOp* op);
void lower_project_vectorized_(const ProjectOp* op);
void lower_batch_hash_join_(const HashJoinOp* op);
void lower_batch_sort_(const SortOp* op);
};
10.4 Physical Plan Builder
The PhysicalPlanBuilder transforms logical plans into physical plans through a recursive descent process.
10.4.1 Builder Configuration
class PhysicalPlanBuilder final {
public:
struct Config {
std::string collection_name;
const IndexDescriptor* index_desc = nullptr;
const IndexStatisticsManager* stats_mgr = nullptr;
const LSMCostModel* lsm_cost = nullptr;
int64_t total_rows = 0;
double avg_row_width = 512.0;
size_t memory_budget = 256 * 1024 * 1024; // 256 MB
std::vector<std::string> output_fields {};
};
explicit PhysicalPlanBuilder(Config config);
auto build(const LogicalPlan& logical_plan) -> PhysicalPlan;
private:
auto build_(const LogicalOperator* op) -> PhysicalOpPtr;
auto build_scan_(const ScanOp* scan) -> PhysicalOpPtr;
auto build_filter_(const FilterOp* filter) -> PhysicalOpPtr;
auto build_sort_(const SortOp* sort) -> PhysicalOpPtr;
auto build_group_(const GroupOp* group) -> PhysicalOpPtr;
auto build_join_(const JoinOp* join) -> PhysicalOpPtr;
};
10.4.2 Recursive Plan Construction
The builder processes the logical plan tree bottom-up:
auto PhysicalPlanBuilder::build_(const LogicalOperator* op) -> PhysicalOpPtr {
switch (op->type()) {
case LogicalOpType::kScan:
return build_scan_(static_cast<const ScanOp*>(op));
case LogicalOpType::kFilter:
return build_filter_(static_cast<const FilterOp*>(op));
case LogicalOpType::kSort:
return build_sort_(static_cast<const SortOp*>(op));
case LogicalOpType::kGroup:
return build_group_(static_cast<const GroupOp*>(op));
case LogicalOpType::kJoin:
return build_join_(static_cast<const JoinOp*>(op));
// ... other operator types
}
}
10.4.3 Access Path Selection
For scan operators, the builder enumerates and costs all viable access paths:
auto PhysicalPlanBuilder::build_scan_(const ScanOp* scan) -> PhysicalOpPtr {
auto enumerator = AccessPathEnumerator {
.collection_name = config_.collection_name,
.index_desc = config_.index_desc,
.stats_mgr = config_.stats_mgr,
.lsm_cost = config_.lsm_cost,
.total_rows = config_.total_rows,
.avg_row_width = config_.avg_row_width,
.output_fields = config_.output_fields
};
// Get filter from parent if available
auto filter = get_pushed_filter_();
// Select lowest-cost access path
auto best_path = enumerator.select_best(filter);
return best_path.op;
}
10.4.4 Strategy Selection Integration
For blocking operators, the builder consults strategy selectors:
auto PhysicalPlanBuilder::build_sort_(const SortOp* sort) -> PhysicalOpPtr {
// Build child first
auto child = build_(sort->children()[0].get());
auto child_props = child->properties();
// Configure strategy selection
auto sort_config = SortConfig {
.input_rows = child_props.cardinality.rows,
.row_width = child_props.cardinality.row_width,
.sort_keys = sort->sort_keys(),
.limit_hint = current_limit_hint_,
.memory_budget = remaining_memory_budget_,
.input_ordering = child_props.ordering,
.available_indexes = get_available_indexes_()
};
// Select strategy
auto strategy = SortStrategySelector::select(sort_config);
// Create appropriate physical operator
switch (strategy) {
case SortStrategy::kNoSort:
return child; // Already sorted
case SortStrategy::kTopKHeap:
return std::make_unique<TopKSortOp>(
*current_limit_hint_, sort->sort_keys(), std::move(child));
case SortStrategy::kInMemorySort:
return std::make_unique<PhysicalSortOp>(
sort->sort_keys(), std::move(child));
case SortStrategy::kExternalSort:
return std::make_unique<ExternalSortOp>(
sort->sort_keys(), std::move(child), spill_config_);
case SortStrategy::kIndexScan:
return create_index_sort_(sort->sort_keys());
}
}
10.4.5 Scan Projection Pushdown
After the physical plan tree is constructed, the planner performs a top-down pass to propagate column references down to scan operators. The goal is to ensure that each scan reads only the columns that are actually needed by the operators above it, reducing I/O and memory consumption.
The set_scan_projections_() method walks the physical plan tree and accumulates the set of referenced columns at each operator by inspecting:
- Filter operators: columns referenced in the predicate expression.
- Project operators: columns referenced in the target list.
- Join operators (Hash, NestedLoop, Merge): columns referenced in the join condition.
- Sort operators: columns referenced in ORDER BY expressions.
- Aggregate operators (Hash, Stream): columns in GROUP BY keys, aggregate function arguments, aggregate ordering clauses, and HAVING predicates.
- DistinctOn operators: columns in the DISTINCT ON expression list.
- Unnest operators: columns referenced in the unnest expression.
- TableFunc operators: columns referenced in function arguments.
When the pass reaches a PhysicalSeqScan or PhysicalIndexScan, it sets the scan's projection list to the accumulated column set.
LATERAL Dependency Propagation: Lateral joins introduce a special challenge for projection pushdown. The right side of a lateral join may reference columns from the left side that do not appear in the join condition itself. To handle this, the planner uses a separate collect_plan_referenced_columns_() method that recursively collects all column references from the entire right subtree of a lateral nested-loop join. These columns are added to the needed-columns set before the pass descends into the left child, ensuring that the left scan includes all columns the right side depends on.
10.5 Access Path Enumeration
Access path enumeration generates all viable ways to read data from a table.
10.5.1 Access Path Structure
struct AccessPath {
PhysicalOpPtr op; // The physical operator
optimizer::Cost cost; // Estimated cost
bool is_covering; // True if index-only scan possible
};
10.5.2 Enumeration Process
The AccessPathEnumerator systematically considers each access method:
auto AccessPathEnumerator::enumerate(const ast::Expression* filter)
-> std::vector<AccessPath> {
auto paths = std::vector<AccessPath> {};
// Sequential scan is always available (fallback)
paths.push_back(create_seq_scan_());
// Check for primary key equality condition
if (is_pk_equality_(filter)) {
paths.push_back(create_pk_lookup_(filter));
}
// Consider each available index
for (const auto& index : indexes_) {
// Extract bounds that match this index
if (auto bounds = extract_index_bounds_(filter, index)) {
// Standard index scan (with heap fetch)
paths.push_back(create_index_scan_(index, *bounds));
// Index-only scan (if covering)
if (is_covering_index_(index)) {
paths.push_back(create_index_only_scan_(index, *bounds));
}
}
}
// Sort by cost (cheapest first)
std::sort(paths.begin(), paths.end(),
[](const auto& a, const auto& b) {
return a.cost.total() < b.cost.total();
});
return paths;
}
10.5.3 Index Bound Extraction
The enumerator analyzes filter predicates to extract index bounds:
auto AccessPathEnumerator::extract_index_bounds_(
const ast::Expression* filter,
const IndexInfo& index) -> std::optional<IndexBounds> {
if (!filter) {
return std::nullopt;
}
auto bounds = IndexBounds {};
// Collect equality conditions on index columns
auto equalities = collect_equality_fields_(filter);
// Match against index prefix
for (const auto& idx_col : index.columns) {
if (auto it = equalities.find(idx_col); it != equalities.end()) {
bounds.add_equality(idx_col, it->second);
} else {
break; // Index prefix broken
}
}
// Check for range condition on next column
if (bounds.prefix_length() < index.columns.size()) {
auto next_col = index.columns[bounds.prefix_length()];
if (auto range = extract_range_(filter, next_col)) {
bounds.set_range(*range);
}
}
return bounds.is_useful() ? std::optional(bounds) : std::nullopt;
}
10.5.4 Cost Comparison
The cost model determines which access path wins:
Sequential Scan Cost:
Index Scan Cost:
Index-Only Scan Cost:
Primary Key Lookup Cost:
Where is the number of keys to look up.
Break-Even Analysis:
Index scan beats sequential scan when:
With typical values:
For highly selective queries (< ~2.5% of rows), index scans typically win.
10.6 Strategy Selection
Strategy selectors choose the best algorithm for each operator type based on input characteristics, available resources, and physical properties.
10.6.1 Sort Strategy Selection
enum class SortStrategy {
kNoSort, // Input already sorted
kTopKHeap, // Heap-based for small LIMIT
kInMemorySort, // QuickSort in memory
kExternalSort, // External merge sort
kIndexScan // Use index ordering
};
Selection Logic:
Algorithm Complexity Comparison:
| Strategy | Time | Space | I/O |
|---|---|---|---|
| TopK Heap | 0 | ||
| In-Memory | 0 | ||
| External |
Where is buffer size and is merge width.
10.6.2 Join Strategy Selection
enum class JoinStrategy {
kHashJoin, // Build/probe hash join
kIndexNestedLoop, // Index lookup on inner
kMergeJoin, // Sort-merge join
kNestedLoop // Simple nested loop
};
Selection Logic:
Threshold Constants:
static constexpr double kINLOuterThreshold = 1000.0;
static constexpr double kHashBuildCostPerRow = 1.0;
static constexpr double kHashProbeCostPerRow = 1.2;
static constexpr double kNestLoopCostPerRow = 10.0;
static constexpr double kSortMergeCostPerRow = 2.0;
10.6.3 Aggregate Strategy Selection
enum class AggregateStrategy {
kStreamAggregate, // Input sorted on group keys
kHashAggregate, // Hash-based grouping
kSortAggregate // Sort then stream
};
Selection Logic:
auto AggregateStrategySelector::select(const AggConfig& config) -> AggregateStrategy {
// If input already sorted on group keys, stream is optimal
if (is_sorted_on_(config.input_ordering, config.group_keys)) {
return AggregateStrategy::kStreamAggregate;
}
// Estimate hash table memory requirement
auto estimated_groups = config.estimated_groups.value_or(
config.input_rows * kDefaultGroupReduction);
auto hash_memory = estimated_groups * kBytesPerGroupState;
// Hash aggregate if fits in memory
if (hash_memory <= config.memory_budget) {
return AggregateStrategy::kHashAggregate;
}
// Sort-aggregate for large group counts
return AggregateStrategy::kSortAggregate;
}
Memory Constants:
static constexpr size_t kBytesPerGroupState = 256; // Per-group state
static constexpr double kDefaultGroupReduction = 0.1; // 10% reduction
10.7 Memory Management
Memory management is critical for physical execution. Cognica implements sophisticated memory budgeting with automatic spill handling.
10.7.1 Memory Budget Allocation
The MemoryBudgetAllocator distributes memory across operators:
struct OperatorMemoryBudget {
size_t allocated = 0;
bool will_spill = false;
double spill_fraction = 0.0;
SpillOptions spill_options {};
};
class MemoryBudgetAllocator final {
public:
auto allocate(const ParsedPipeline& pipeline,
const std::vector<CardinalityEstimate>& estimates)
-> std::vector<OperatorMemoryBudget>;
private:
size_t total_budget_;
std::string spill_directory_;
};
Allocation Strategy:
- Identify Blocking Operators: Sort, Join, Aggregate require memory
- Calculate Requirements: Estimate memory per operator
- Weighted Distribution: Allocate proportionally with priority weighting
- Apply Constraints: Enforce minimum (16 MB) and maximum bounds
- Configure Spill: Set up spill options for operators exceeding budget
Priority Weights:
static constexpr double kSortPriority = 1.5; // Sort is memory-hungry
static constexpr double kGroupPriority = 1.2; // Groups vary widely
static constexpr double kJoinPriority = 1.0; // Base priority
10.7.2 Spill Configuration
When operators cannot fit data in memory, they spill to disk:
struct SpillConfig {
size_t memory_limit = 256 * 1024 * 1024; // 256 MB
size_t spill_batch_size = 10000; // Rows per spill batch
int32_t max_merge_width = 16; // Merge fan-in
std::filesystem::path temp_directory {};
CompressionType compression = CompressionType::kNone;
};
10.7.3 Spillable Data Structures
Cognica implements spill-aware versions of key data structures:
SpillableSortBuffer:
- Implements external merge sort
- Writes sorted runs to disk when memory exceeded
- Merges runs during final output phase
SpillableHashTable:
- Uses Grace Hash Join partitioning
- Partitions data by hash of key
- Spills partitions independently
- Reloads partitions on demand during probe
SpillableAggTable:
- Hash aggregate with partition spill
- Maintains partial aggregates in memory
- Spills partitions when full
- Merges partitions during finalization
10.7.4 Memory Tracking
Each operator tracks its memory usage:
class MemoryTracker {
public:
void allocate(size_t bytes);
void deallocate(size_t bytes);
auto current_usage() const -> size_t;
auto peak_usage() const -> size_t;
auto budget() const -> size_t;
auto should_spill() const -> bool;
private:
std::atomic<size_t> current_ = 0;
std::atomic<size_t> peak_ = 0;
size_t budget_ = 0;
};
10.8 Cost Estimation
The cost model guides all physical planning decisions.
10.8.1 Cost Structure
struct Cost {
double cpu_cost = 0.0; // CPU cycles estimate
double io_cost = 0.0; // I/O operations
double memory_cost = 0.0; // Memory pressure
double spill_cost = 0.0; // Disk spill overhead
auto total() const -> double {
return cpu_cost +
io_cost * kIOCostWeight +
memory_cost * kMemoryCostWeight +
spill_cost * kSpillCostWeight;
}
};
Weight Constants:
static constexpr double kIOCostWeight = 10.0; // I/O is expensive
static constexpr double kMemoryCostWeight = 0.1; // Memory is pressure
static constexpr double kSpillCostWeight = 5.0; // Spill is costly
The weights reflect modern hardware characteristics:
- Disk I/O is 10x more expensive than CPU operations
- Memory pressure translates to cache misses
- Spill incurs both I/O and CPU overhead
10.8.2 Cost Formulas
Sequential Scan:
Index Scan:
Where .
Hash Join:
Sort (In-Memory):
Sort (External):
Hash Aggregate:
Where is the number of groups.
10.8.3 Cost Constants
// CPU costs
static constexpr double kCPUTupleProcessing = 0.01;
static constexpr double kCPUComparison = 0.0001;
static constexpr double kCPUHashOp = 0.0005;
// I/O costs
static constexpr double kSeqPageCost = 1.0;
static constexpr double kRandomPageCost = 4.0;
static constexpr double kHeapAccessCost = 0.5;
static constexpr double kSpillIOCost = 0.01; // Per byte
// Memory constants
static constexpr double kHashTableOverhead = 1.5;
static constexpr double kCompressionFactor = 0.5; // LZ4 typical
static constexpr size_t kMergeWidth = 16;
static constexpr size_t kDefaultBlockSize = 8192;
10.8.4 Spill Prediction
The cost estimator predicts when operations will exceed memory:
auto CostEstimator::will_spill(int64_t rows,
double row_width,
size_t memory_budget) const -> bool {
auto data_size = static_cast<double>(rows) * row_width;
return data_size > static_cast<double>(memory_budget);
}
When spill is predicted, additional I/O cost is added:
10.9 Physical Ordering
Physical operators track output ordering properties, enabling sort elimination and merge join selection.
10.9.1 Ordering Representation
struct PhysicalOrdering {
std::string column;
SortOrder order; // kAscending or kDescending
};
An operator's output ordering is a list of PhysicalOrdering entries representing the sort guarantee.
10.9.2 Ordering Propagation
Different operators affect ordering differently:
| Operator | Ordering Effect |
|---|---|
| SeqScan | None (arbitrary order) |
| IndexScan | Index key order |
| Sort | Produces specified order |
| Filter | Preserves input order |
| Project | Preserves input order |
| HashJoin | Destroys order |
| MergeJoin | Preserves outer order |
| HashAggregate | Destroys order |
| StreamAggregate | Preserves group key order |
10.9.3 Ordering Satisfaction
The optimizer checks if an operator's output satisfies a required ordering:
auto OperatorProperties::ordering_satisfies(
const std::vector<PhysicalOrdering>& required) const -> bool {
if (required.size() > ordering.size()) {
return false;
}
for (size_t i = 0; i < required.size(); ++i) {
if (ordering[i].column != required[i].column ||
ordering[i].order != required[i].order) {
return false;
}
}
return true;
}
This enables sort elimination when input is already sorted.
10.10 Explain Plans
Physical plans can be explained for debugging and optimization analysis.
10.10.1 Explain Output
Each physical operator implements an explain() method:
auto HashJoinOp::explain() const -> std::string {
auto ss = std::ostringstream {};
ss << "HashJoin [" << join_type_to_string(join_type_) << "]";
ss << " on (" << condition_to_string(join_condition_) << ")";
ss << " build_from=" << (build_from_right_ ? "right" : "left");
ss << " rows=" << properties_.cardinality.rows;
ss << " cost=" << cost_.total();
if (properties_.will_spill) {
ss << " [SPILL]";
}
return ss.str();
}
10.10.2 Plan Tree Visualization
HashJoin [INNER] on (orders.customer_id = customers.id)
build_from=right rows=50000 cost=12500.5
|-- SeqScan(orders) rows=100000 cost=1000.0
|-- IndexScan(customers) using pk_customers rows=10000 cost=500.0
range: [1, 10000]
10.10.3 Execution Statistics
After execution, operators report actual statistics:
struct ExecutionStats {
int64_t rows_processed = 0;
int64_t rows_output = 0;
int64_t bytes_read = 0;
int64_t bytes_written = 0;
int64_t execution_time_us = 0;
int64_t memory_peak_bytes = 0;
int64_t spill_bytes = 0;
};
Comparing estimated vs. actual statistics helps identify cardinality estimation errors.
10.11 CVM Integration
Physical plans are lowered to CVM bytecode for efficient execution. However, certain plan shapes are not yet supported by the CVM lowering path and fall back to the Volcano (iterator) executor automatically.
CVM Fallback Conditions: The executor checks should_use_cvm_for_plan() before attempting CVM lowering. Even when the CVM is otherwise enabled, the executor falls back to the Volcano path when the physical plan contains table-valued function or UNNEST operators inside an outer (correlated) context. The CVM subquery execution path does not correctly preserve standalone table-function row production when an outer context is installed, which manifests as a single NULL row for shapes like ARRAY(SELECT ... FROM unnest(...)). The fallback check walks the physical plan tree looking for kTableFunc or kUnnest nodes and, if any are found while an outer context is active, routes the query to the iterator executor.
10.11.1 Lowering Pipeline
10.11.2 Operator Lowering
Each physical operator has a corresponding lowering method:
class PlanLowering {
void lower_seq_scan_(const SeqScanOp* op);
void lower_index_scan_(const IndexScanOp* op);
void lower_filter_(const PhysicalFilterOp* op);
void lower_hash_join_(const HashJoinOp* op);
void lower_sort_(const PhysicalSortOp* op);
void lower_hash_aggregate_(const HashAggregateOp* op);
void lower_project_(const ProjectOp* op);
void lower_limit_(const LimitOp* op);
};
10.11.3 Cursor Slot Management
Scan operators use cursor slots to manage iteration state:
// Allocate slot
auto slot = allocate_cursor_slot_();
// Open cursor
emit_(Opcode::CURSOR_OPEN, slot, collection_name);
// Iteration loop
emit_label_(loop_label);
emit_(Opcode::CURSOR_NEXT, result_reg, slot);
emit_(Opcode::JUMP_NULL, result_reg, done_label);
// ... process row ...
emit_(Opcode::JUMP, loop_label);
// Close cursor
emit_label_(done_label);
emit_(Opcode::CURSOR_CLOSE, slot);
10.11.4 Join Lowering
Hash join lowering generates build and probe phases:
void PlanLowering::lower_hash_join_(const HashJoinOp* op) {
// Lower build side
lower_(op->build_child());
// Build phase: populate hash table
emit_(Opcode::HASH_BUILD_START, hash_table_reg);
emit_label_(build_loop);
emit_(Opcode::CURSOR_NEXT, build_row, build_cursor);
emit_(Opcode::JUMP_NULL, build_row, build_done);
emit_(Opcode::HASH_BUILD_INSERT, hash_table_reg, build_row, key_reg);
emit_(Opcode::JUMP, build_loop);
emit_label_(build_done);
// Lower probe side
lower_(op->probe_child());
// Probe phase: scan and look up
emit_label_(probe_loop);
emit_(Opcode::CURSOR_NEXT, probe_row, probe_cursor);
emit_(Opcode::JUMP_NULL, probe_row, probe_done);
emit_(Opcode::HASH_PROBE, match_reg, hash_table_reg, probe_row, key_reg);
emit_(Opcode::JUMP_NULL, match_reg, probe_loop);
// Emit matched row
emit_(Opcode::ROW_COMBINE, result_reg, probe_row, match_reg);
emit_(Opcode::EMIT_ROW, result_reg);
emit_(Opcode::JUMP, probe_loop);
emit_label_(probe_done);
emit_(Opcode::HASH_TABLE_FREE, hash_table_reg);
}
10.11.5 Nested CTE Discovery
Before physical planning begins, the executor performs a recursive discovery pass over the AST to find and materialize CTEs defined in nested subqueries. A top-level query may contain derived tables or join subqueries that themselves define WITH clauses. If these nested CTEs are not discovered and materialized before the main plan is built, references to them will fail during execution.
The prepare_nested_subquery_ctes_() method walks the AST recursively:
- For each
SelectStmt, it inspectsfrom_subqueriesandjoinsfor nested subqueries and descends into them. - When a nested statement contains a
WITHclause, the method builds a logical plan for that statement, executes its CTEs, and registers the materialized results. - The CTE name scope is saved and restored around each nested discovery to prevent name collisions between sibling subqueries.
SetOperationStmtnodes are handled similarly, recursing into bothleft_queryandright_query.
This discovery runs before optimization and physical planning for all three execution paths (Volcano, CVM, and Acero), ensuring that CTE materialization is complete regardless of which execution model is selected.
10.12 Summary
Cognica's physical planning system transforms logical query plans into efficient executable plans through several key mechanisms:
-
Access Path Enumeration: Systematically evaluates sequential scans, index scans, index-only scans, and point lookups to find the cheapest data access method.
-
Strategy Selection: Chooses optimal algorithms for sort (heap, quicksort, external), join (hash, merge, nested loop), and aggregate (hash, stream, sort) operations based on input characteristics and available resources.
-
Hybrid Execution Model: Supports iterator-based execution for simplicity, CVM bytecode for performance, and vectorized execution for columnar workloads. The executor automatically falls back from CVM to the Volcano iterator model when the plan contains table functions or UNNEST operators inside a correlated outer context.
-
Memory-Aware Planning: Distributes memory budgets across operators, predicts spill behavior, and configures spill options proactively.
-
Physical Property Tracking: Maintains ordering and cardinality information to enable optimizations like sort elimination and merge join selection.
-
Cost-Based Decisions: Uses a calibrated cost model considering CPU, I/O, memory, and spill costs to guide all planning decisions.
-
Scan Projection Pushdown: Propagates needed-column sets top-down through the physical plan tree, with special handling for LATERAL join dependencies, so that scan operators read only the columns required by downstream operators.
-
Nested CTE Discovery: Recursively walks the AST before planning to discover and materialize CTEs defined in nested subqueries, ensuring they are available regardless of nesting depth.
The physical planning phase bridges the gap between declarative query specification and efficient execution, making decisions that can impact query performance by orders of magnitude.