Chapter 29: External Table Integration

Modern data architectures increasingly rely on data residing outside the primary database system. Data lakes, cloud storage, operational databases, and analytical engines each contain valuable data that applications need to query alongside local tables. This chapter examines Cognica's external table integration system, which provides unified SQL access to heterogeneous data sources through a Foreign Data Wrapper (FDW) abstraction inspired by PostgreSQL's design.

29.1 The Foreign Data Wrapper Paradigm

The SQL/MED (Management of External Data) standard introduced the concept of foreign tables—tables that appear in the database catalog but whose data resides in external systems. PostgreSQL's implementation of this standard through Foreign Data Wrappers established a pattern that Cognica extends with modern columnar formats and distributed query capabilities.

29.1.1 Architectural Goals

Cognica's external table integration addresses several architectural challenges:

Transparent SQL Access: External data should be queryable using standard SQL syntax without requiring application code changes. A query joining local and external tables should work identically to queries involving only local tables.

Predicate Pushdown: Filter conditions should be pushed to the external system whenever possible. Filtering a Parquet file containing billions of rows at the storage layer avoids transferring unnecessary data.

Columnar Efficiency: Modern analytical workloads benefit from columnar storage. External table access should preserve columnar execution through the entire pipeline, not materialize row-by-row.

Connection Pooling: Database connections to external systems are expensive. Connection pooling amortizes connection establishment costs across multiple queries.

29.1.2 Source Type Taxonomy

Cognica supports three categories of external data sources, each with distinct access patterns:

SourceType={kFileArrow Dataset API (Parquet, CSV, ORC, IPC)kDuckDBDuckDB engine (PostgreSQL, MySQL, Delta Lake)kFlightSQLArrow Flight SQL (Cognica, ClickHouse, DataFusion)\text{SourceType} = \begin{cases} \text{kFile} & \text{Arrow Dataset API (Parquet, CSV, ORC, IPC)} \\ \text{kDuckDB} & \text{DuckDB engine (PostgreSQL, MySQL, Delta Lake)} \\ \text{kFlightSQL} & \text{Arrow Flight SQL (Cognica, ClickHouse, DataFusion)} \end{cases}

The file-based sources leverage Apache Arrow's Dataset API for zero-copy columnar access. DuckDB-backed sources utilize DuckDB's extensive connector ecosystem. Flight SQL sources communicate with Arrow-native analytical systems using the Flight protocol.

29.2 Virtual Table Definition Model

The ExternalVirtualTableDef structure captures all metadata required to access an external table:

struct ExternalVirtualTableDef {
  std::string table_name;
  std::string schema_name = "public";
  std::string source_path;
  ast::CopyFormat format = ast::CopyFormat::kAuto;
  ast::CopyPartitioning partitioning = ast::CopyPartitioning::kNone;

  // Schema (inferred or explicit)
  std::shared_ptr<arrow::Schema> schema;
  bool schema_is_explicit = false;

  // Partition columns (for PARTITION BY clause)
  std::vector<std::string> partition_columns;

  // Format-specific options
  std::unordered_map<std::string, std::string> options;

  // Schema refresh configuration
  SchemaRefreshMode schema_refresh_mode = SchemaRefreshMode::kAuto;
  int64_t schema_refresh_interval_sec = 60;

  // Source type discriminator
  VirtualTableSourceType source_type = VirtualTableSourceType::kFile;

  // DuckDB-specific fields
  std::optional<DuckDBSourceType> duckdb_source_type;
  std::string connection_string;
  std::string source_table;
  std::vector<std::string> required_extensions;
  bool read_only = true;

  // Flight SQL-specific fields
  std::optional<FlightSQLSourceType> flight_sql_source_type;
  std::string flight_endpoint;
  std::string flight_catalog;
  std::string flight_schema;
  std::string flight_table;
  std::string flight_auth_token;
  int32_t flight_timeout_ms = 30000;
};

29.2.1 Schema Management

External data sources present a schema management challenge: the schema may change without the database's knowledge. Cognica addresses this through configurable refresh strategies:

Automatic Refresh: When schema_refresh_mode is kAuto, the system periodically checks whether the source schema has changed. For file sources, this involves comparing file modification timestamps against the last refresh time.

Manual Refresh: Production systems may prefer explicit control over schema changes. When schema_refresh_mode is kManual, schema updates require an explicit ALTER VIRTUAL TABLE REFRESH command.

The refresh interval determines how frequently the system checks for schema changes:

needs_refresh={falseif mode=kManualelapsed_secondsrefresh_intervalif mode=kAuto\text{needs\_refresh} = \begin{cases} \text{false} & \text{if mode} = \text{kManual} \\ \text{elapsed\_seconds} \geq \text{refresh\_interval} & \text{if mode} = \text{kAuto} \end{cases}

29.2.2 Hive-Style Partitioning

Many data lake formats use Hive-style partitioning where partition values are encoded in directory paths:

s3://data-lake/events/
  year=2024/
    month=01/
      data-001.parquet
      data-002.parquet
    month=02/
      data-003.parquet
  year=2025/
    month=01/
      data-004.parquet

The partition columns (year, month) become virtual columns that can be used in filter predicates. When a query filters on partition columns, the Dataset API prunes entire directory subtrees without reading any files.

29.3 The Decorator Pattern for Cursor Providers

Cognica implements external table access using the Decorator design pattern. Each source type has a dedicated cursor provider that intercepts requests for its tables and delegates other requests to the wrapped provider:

Loading diagram...

29.3.1 Provider Chain Configuration

The cursor provider chain is assembled during session initialization:

// Start with native table provider
auto native_provider = std::make_unique<NativeCursorProvider>(db);

// Wrap with file-based external table provider
auto file_provider = std::make_unique<ExternalTableCursorProvider>(
    vtable_manager, native_provider.get());

// Wrap with DuckDB provider
auto duckdb_provider = std::make_unique<DuckDBCursorProvider>(
    vtable_manager, duckdb_manager, file_provider.get());

// Wrap with Flight SQL provider
auto flight_provider = std::make_unique<FlightSQLCursorProvider>(
    vtable_manager, flight_manager, duckdb_provider.get());

Each provider in the chain follows the same pattern:

  1. Check if the requested table matches this provider's type
  2. If yes, create the appropriate cursor
  3. If no, delegate to the wrapped provider

29.3.2 Table Name Resolution

Table names may include schema qualifiers (public.events) that must be parsed:

auto parse_table_name_(const std::string& table_name)
    -> std::pair<std::string, std::string> {
  auto dot_pos = table_name.find('.');
  if (dot_pos != std::string::npos) {
    return {table_name.substr(0, dot_pos),
            table_name.substr(dot_pos + 1)};
  }
  return {"public", table_name};
}

The virtual table manager maintains an in-memory map keyed by fully-qualified names, with persistence to a system collection for recovery after restart.

29.4 Arrow Dataset API Integration

For file-based external tables, Cognica leverages Apache Arrow's Dataset API, which provides a unified interface for reading columnar data from various formats.

29.4.1 Supported File Formats

The Dataset API supports multiple columnar formats:

FormatExtensionCharacteristics
Parquet.parquetColumnar, compressed, statistics
Arrow IPC.arrow, .featherColumnar, zero-copy
ORC.orcColumnar, compressed, Hive native
CSV.csvRow-oriented, text
JSON.json, .ndjsonSemi-structured, line-delimited

Parquet is the most common format due to its combination of columnar layout, efficient compression, and rich metadata including min/max statistics per row group.

29.4.2 Dataset Discovery

Dataset discovery involves scanning the source path and building a dataset object:

auto create_dataset_(const ExternalVirtualTableDef& def)
    -> std::shared_ptr<arrow::dataset::Dataset> {

  // Determine file format
  auto format = create_format_(def.format, def.options);

  // Create filesystem
  auto fs = arrow::fs::FileSystemFromUri(def.source_path);

  // Discover files
  arrow::dataset::FileSystemFactoryOptions factory_options;
  factory_options.partition_base_dir = def.source_path;

  if (def.partitioning != ast::CopyPartitioning::kNone) {
    factory_options.partitioning =
        arrow::dataset::HivePartitioning::MakeFactory();
  }

  auto factory = arrow::dataset::FileSystemDatasetFactory::Make(
      fs, selector, format, factory_options);

  return factory->Finish();
}

29.4.3 The ExternalTableCursor

The ExternalTableCursor streams data from external files using Arrow's batch-oriented API:

class ExternalTableCursor final : public db::document::Cursor {
public:
  explicit ExternalTableCursor(
      const ExternalVirtualTableDef& table_def,
      std::vector<std::string> columns = {},
      std::shared_ptr<arrow::compute::Expression> filter = nullptr);

  // With limit/offset hints for early termination
  ExternalTableCursor(
      const ExternalVirtualTableDef& table_def,
      std::vector<std::string> columns,
      std::shared_ptr<arrow::compute::Expression> filter,
      std::optional<int64_t> limit_hint,
      std::optional<int64_t> offset_hint);

private:
  // Arrow dataset and scanner
  std::shared_ptr<arrow::dataset::Dataset> dataset_;
  std::shared_ptr<arrow::dataset::Scanner> scanner_;
  std::shared_ptr<arrow::RecordBatchReader> reader_;

  // Current batch and position
  std::shared_ptr<arrow::RecordBatch> current_batch_;
  int64_t row_index_ = 0;

  // Current document (converted from batch row)
  db::document::Document current_doc_;
};

The cursor initializes a scanner with optional filter and projection pushdown:

auto initialize_scanner_() -> db::Status {
  arrow::dataset::ScannerBuilder builder(dataset_);

  // Apply projection pushdown
  if (!columns_.empty()) {
    builder.Project(columns_);
  }

  // Apply filter pushdown
  if (filter_) {
    builder.Filter(*filter_);
  }

  // Configure batch size for streaming
  builder.BatchSize(kDefaultBatchSize);

  scanner_ = builder.Finish();
  reader_ = scanner_->ToRecordBatchReader();
  return db::Status::OK();
}

29.5 Predicate Pushdown with ArrowFilterConverter

Converting SQL WHERE clauses to Arrow compute expressions enables predicate pushdown to the storage layer. The ArrowFilterConverter handles this translation.

29.5.1 Expression Mapping

SQL expressions map to Arrow compute expressions:

SQL ExpressionArrow Compute Function
a = bequal(a, b)
a <> bnot_equal(a, b)
a < bless(a, b)
a <= bless_equal(a, b)
a > bgreater(a, b)
a >= bgreater_equal(a, b)
a AND band_(a, b)
a OR bor_(a, b)
NOT ainvert(a)
a IS NULLis_null(a)
a IS NOT NULLis_valid(a)

29.5.2 Type Coercion

A common challenge arises when comparing string constants against numeric columns, particularly with Hive partition columns:

-- year column is inferred as INT64 from directory names
SELECT * FROM events WHERE year = '2024';

The converter performs automatic type coercion when schema information is available:

auto coerce_constant_to_column_type_(
    const std::string& column_name,
    arrow::compute::Expression constant_expr)
    -> arrow::compute::Expression {

  if (!schema_) {
    return constant_expr;
  }

  auto field = schema_->GetFieldByName(column_name);
  if (!field) {
    return constant_expr;
  }

  // Try to parse string as numeric
  if (constant_expr.literal() &&
      constant_expr.literal()->is_scalar()) {
    auto& scalar = *constant_expr.literal()->scalar();
    if (scalar.type->id() == arrow::Type::STRING) {
      auto str =
          static_cast<arrow::StringScalar&>(scalar).value->ToString();
      auto coerced =
          try_parse_string_as_numeric_(str, *field->type());
      if (coerced) {
        return *coerced;
      }
    }
  }

  return constant_expr;
}

29.5.3 Safe Fallback Semantics

Unsupported expressions return literal(true), which disables pushdown for that predicate while maintaining correctness:

auto convert_expr_(const ast::Expr* expr) -> arrow::compute::Expression {
  if (!expr) {
    return arrow::compute::literal(true);
  }

  switch (expr->type()) {
    case ast::ExprType::kBinaryExpr:
      return convert_binary_expr_(static_cast<const ast::BinaryExpr*>(expr));
    case ast::ExprType::kColumnRef:
      return convert_column_ref_(static_cast<const ast::ColumnRef*>(expr));
    // ... other cases
    default:
      // Unsupported: disable pushdown, filter post-scan
      return arrow::compute::literal(true);
  }
}

This approach ensures that any SQL predicate can be expressed, with pushdown optimizations applied where possible.

29.6 Vectorized Execution with Arrow Acero

When filter and limit pushdown combine, Cognica uses Arrow Acero for fully vectorized execution without row-by-row document conversion overhead.

29.6.1 The Acero Execution Engine

Arrow Acero is a streaming query execution engine that processes data in columnar batches:

class AceroExecutor final {
public:
  explicit AceroExecutor(std::shared_ptr<AceroTableProvider> table_provider);

  // Execute LogicalPlan and return Arrow Table
  auto execute(const planner::LogicalPlan* plan)
      -> arrow::Result<std::shared_ptr<arrow::Table>>;

  // Execute with streaming results
  using BatchCallback =
      std::function<arrow::Status(std::shared_ptr<arrow::RecordBatch>)>;
  auto execute_streaming(const planner::LogicalPlan* plan,
                         BatchCallback callback) -> arrow::Status;

  // Execute via Substrait intermediate representation
  auto execute_substrait(const arrow::Buffer& substrait_plan)
      -> arrow::Result<std::shared_ptr<arrow::Table>>;
};

29.6.2 Acero Plan Construction

The execution plan is constructed as a tree of Acero nodes:

auto create_acero_plan_(const ExternalVirtualTableDef& def,
                        const ast::Expr* filter,
                        std::optional<int64_t> limit)
    -> arrow::acero::Declaration {

  // Source: Dataset scan
  auto scan_options = std::make_shared<arrow::dataset::ScanOptions>();
  if (filter) {
    scan_options->filter = convert_filter_(filter);
  }

  arrow::acero::Declaration plan{"scan",
      arrow::dataset::ScanNodeOptions{dataset_, scan_options}};

  // Add FetchNode for limit
  if (limit) {
    plan = arrow::acero::Declaration::Sequence({
        std::move(plan),
        {"fetch", arrow::acero::FetchNodeOptions{0, *limit}}
    });
  }

  return plan;
}

29.6.3 Memory-Efficient Streaming

Acero processes data in a streaming fashion with bounded memory usage:

MemoryAcero=O(batch_size)vs.MemoryMaterialize=O(result_size)\text{Memory}_{\text{Acero}} = O(\text{batch\_size}) \quad \text{vs.} \quad \text{Memory}_{\text{Materialize}} = O(\text{result\_size})

The StreamingSinkConsumer enables callback-based result processing:

class StreamingSinkConsumer : public arrow::acero::SinkNodeConsumer {
public:
  explicit StreamingSinkConsumer(BatchCallback callback)
      : callback_(std::move(callback)) {}

  auto Consume(arrow::ExecBatch batch) -> arrow::Status override {
    auto record_batch = batch.ToRecordBatch(schema_);
    return callback_(record_batch);
  }

private:
  BatchCallback callback_;
  std::shared_ptr<arrow::Schema> schema_;
};

29.7 DuckDB Connector Integration

DuckDB provides Cognica with access to a rich ecosystem of data sources through its extension system.

29.7.1 Supported DuckDB Sources

Source TypeExtensionUse Cases
PostgreSQLpostgresOperational databases
MySQLmysqlLegacy systems
SQLite(built-in)Local databases
Delta LakedeltaData lake tables
IcebergicebergData lake tables

29.7.2 Connection Management

The DuckDBManager maintains a connection pool for efficient resource utilization:

class DuckDBManager final {
public:
  explicit DuckDBManager(const DuckDBConfig& config = DuckDBConfig{});

  // Connection pool management
  auto acquire_connection() -> DuckDBConnection;
  void release_connection(DuckDBConnection conn);

  // Query execution
  auto execute_query(const std::string& sql)
      -> std::pair<DuckDBResult, db::Status>;

  // Extension management
  auto load_extension(const std::string& extension_name) -> db::Status;

private:
  DuckDBConfig config_;
  duckdb_database db_;
  std::vector<duckdb_connection> connection_pool_;
  std::unordered_set<std::string> loaded_extensions_;
};

The configuration includes memory limits and cloud storage credentials:

struct DuckDBConfig {
  int64_t memory_limit_bytes = 1024 * 1024 * 1024;  // 1GB
  int32_t max_connections = 4;
  std::string temp_directory;

  // Cloud storage configurations
  S3Config s3;
  AzureConfig azure;
  GCSConfig gcs;
};

29.7.3 Query Translation

The DuckDBCursorProvider generates native DuckDB SQL from the virtual table definition and filter expression:

auto generate_query_(const ExternalVirtualTableDef& def,
                     const ast::Expr* filter,
                     const std::vector<std::string>& columns,
                     std::optional<int64_t> limit,
                     std::optional<int64_t> offset) const
    -> std::expected<std::string, db::Status> {

  std::ostringstream sql;

  // SELECT clause with projection pushdown
  sql << "SELECT ";
  if (columns.empty()) {
    sql << "*";
  } else {
    for (size_t i = 0; i < columns.size(); ++i) {
      if (i > 0) sql << ", ";
      sql << quote_identifier_(columns[i]);
    }
  }

  // FROM clause with source-specific table reference
  sql << " FROM " << generate_from_clause_(def);

  // WHERE clause with predicate pushdown
  if (filter) {
    sql << " WHERE " << generate_where_clause_(filter);
  }

  // LIMIT/OFFSET pushdown
  if (limit) {
    sql << " LIMIT " << *limit;
  }
  if (offset) {
    sql << " OFFSET " << *offset;
  }

  return sql.str();
}

29.7.4 DuckDB Cursor Implementation

The DuckDBCursor streams results using DuckDB's chunk-based API:

class DuckDBCursor final : public db::document::Cursor {
public:
  explicit DuckDBCursor(DuckDBResult result);

private:
  // Fetch the next chunk from the result
  auto fetch_next_chunk_() -> bool {
    current_chunk_ = duckdb_fetch_chunk(result_.get());
    if (!current_chunk_) {
      exhausted_ = true;
      return false;
    }
    chunk_size_ = duckdb_data_chunk_get_size(current_chunk_);
    chunk_row_index_ = 0;
    ++total_chunks_read_;
    return true;
  }

  // Convert current chunk row to document
  void update_current_document_() {
    current_doc_.clear();
    auto col_count = duckdb_column_count(&result_.get());
    for (idx_t col = 0; col < col_count; ++col) {
      auto name = duckdb_column_name(&result_.get(), col);
      auto type = duckdb_column_type(&result_.get(), col);
      auto vector = duckdb_data_chunk_get_vector(current_chunk_, col);
      current_doc_.set(name, convert_vector_value_(vector, type,
                                                    chunk_row_index_));
    }
  }

private:
  DuckDBResult result_;
  duckdb_data_chunk current_chunk_;
  idx_t chunk_row_index_;
  idx_t chunk_size_;
};

29.8 Arrow Flight SQL Connector

Flight SQL enables Cognica to query remote Arrow-native databases using the high-performance Flight protocol.

29.8.1 Supported Flight SQL Systems

SystemConfigurationUse Cases
CognicakCognicaDistributed Cognica clusters
ClickHousekClickHouseReal-time analytics
DataFusion/BallistakDataFusionDistributed query processing
DremiokDremioData lake analytics
GenerickGenericAny Flight SQL server

29.8.2 Client Manager Architecture

The FlightSQLClientManager handles connection pooling, authentication, and health checking:

class FlightSQLClientManager final {
public:
  explicit FlightSQLClientManager(const FlightSQLClientConfig& config);

  // Client acquisition (connection pooled)
  auto acquire_client(const FlightSQLEndpointConfig& endpoint_config)
      -> std::expected<arrow::flight::sql::FlightSqlClient*, db::Status>;

  void release_client(const std::string& endpoint,
                      arrow::flight::sql::FlightSqlClient* client);

  // Query execution with client lifecycle management
  auto execute_query_with_client(
      const FlightSQLEndpointConfig& endpoint_config,
      const std::string& sql)
      -> std::expected<QueryResult, db::Status>;

  // Schema discovery
  auto get_table_schema(const FlightSQLEndpointConfig& endpoint_config,
                        const std::string& catalog,
                        const std::string& schema,
                        const std::string& table)
      -> std::expected<std::shared_ptr<arrow::Schema>, db::Status>;

private:
  // Connection pool: endpoint -> list of clients
  std::unordered_map<std::string, std::deque<PooledClient>> connection_pools_;
  std::unordered_map<std::string, EndpointStatus> endpoint_status_;
};

29.8.3 Endpoint Configuration

Each Flight SQL endpoint requires authentication and TLS configuration:

struct FlightSQLEndpointConfig {
  std::string name;
  std::string endpoint;  // host:port or tls://host:port
  FlightSQLSourceType source_type;

  // Authentication
  std::string auth_token;  // Bearer token
  std::string username;    // Basic auth alternative
  std::string password;

  // TLS configuration
  bool tls_enabled = false;
  std::string tls_root_certs;    // Custom CA certificates
  std::string tls_private_key;   // Client private key (mTLS)
  std::string tls_cert_chain;    // Client certificate chain

  // Timeouts
  int32_t connect_timeout_ms = 5000;
  int32_t query_timeout_ms = 30000;
  int32_t max_retries = 3;
};

29.8.4 Flight SQL Cursor

The FlightSQLCursor streams results from the Flight server:

class FlightSQLCursor final : public db::document::Cursor {
public:
  // Create with reader and client lifecycle management
  FlightSQLCursor(std::unique_ptr<arrow::flight::FlightStreamReader> reader,
                  FlightSQLClientManager* manager,
                  const FlightSQLEndpointConfig& endpoint_config,
                  arrow::flight::sql::FlightSqlClient* acquired_client);

  // Deferred execution: FlightInfo stored, stream initialized on first access
  FlightSQLCursor(FlightSQLClientManager* manager,
                  const FlightSQLEndpointConfig& endpoint_config,
                  std::unique_ptr<arrow::flight::FlightInfo> info);

private:
  // Fetch the next record batch from the stream
  auto fetch_next_batch_() -> bool {
    auto status = reader_->Next(&current_batch_);
    if (!status.ok()) {
      status_ = db::Status::IOError(status.ToString());
      return false;
    }
    if (!current_batch_) {
      exhausted_ = true;
      return false;
    }
    batch_size_ = current_batch_->num_rows();
    batch_row_index_ = 0;
    ++total_batches_read_;
    return true;
  }

private:
  std::unique_ptr<arrow::flight::FlightStreamReader> reader_;
  FlightSQLClientManager* manager_;
  arrow::flight::sql::FlightSqlClient* acquired_client_;
  std::shared_ptr<arrow::RecordBatch> current_batch_;
};

29.9 Write Operations

External tables support INSERT, UPDATE, and DELETE operations through the ExternalTableWriter class.

29.9.1 Write Semantics by Format

Different file formats have different write characteristics:

FormatAppendUpdateDelete
ParquetNew fileRewriteRewrite
Arrow IPCNew fileRewriteRewrite
ORCNew fileRewriteRewrite
CSVNew fileRewriteRewrite

Parquet, Arrow IPC, and ORC are immutable formats. UPDATE and DELETE operations require reading the entire dataset, applying modifications in memory, and rewriting the affected files.

29.9.2 INSERT Operations

INSERT creates a new file with a timestamp-based filename for uniqueness:

auto insert(const std::vector<db::document::Document>& docs)
    -> WriteOperationResult {

  // Generate unique output path
  auto output_path = generate_output_path_();

  // Convert documents to Arrow RecordBatch
  auto batch = documents_to_batch_(docs, table_def_.schema);

  // Write based on format
  auto status = write_batch_to_file_(batch, output_path);

  return WriteOperationResult{
      .success = status.ok(),
      .rows_affected = static_cast<int64_t>(docs.size()),
      .output_path = output_path
  };
}

29.9.3 Streaming UPDATE and DELETE

For large datasets, UPDATE and DELETE use streaming batch processing to avoid memory exhaustion:

auto update(const ast::Expr* predicate,
            const db::document::Document& updates)
    -> WriteOperationResult {

  // Create streaming scanner
  auto scanner = create_scanner_();

  // Create streaming writer for temporary output
  auto temp_path = generate_temp_path_();
  auto writer = create_streaming_writer_(temp_path, table_def_.schema);

  int64_t updated_count = 0;

  // Process batches in streaming fashion
  for (auto batch : scanner) {
    auto processed_batch =
        process_batch_for_update_(batch, predicate, updates, updated_count);
    writer->write_batch(processed_batch);
  }

  writer->close();

  // Atomic replacement of original files
  replace_source_files_(temp_path);

  return WriteOperationResult{
      .success = true,
      .rows_affected = updated_count
  };
}

The process_batch_for_update_ function applies updates to matching rows within each batch:

auto process_batch_for_update_(
    const std::shared_ptr<arrow::RecordBatch>& batch,
    const ast::Expr* predicate,
    const db::document::Document& updates,
    int64_t& updated_count)
    -> std::shared_ptr<arrow::RecordBatch> {

  // Convert batch rows to documents for predicate evaluation
  std::vector<db::document::Document> output_docs;
  output_docs.reserve(batch->num_rows());

  for (int64_t i = 0; i < batch->num_rows(); ++i) {
    auto doc = batch_row_to_document_(batch, i);

    if (matches_predicate_(doc, predicate)) {
      apply_updates_(doc, updates);
      ++updated_count;
    }

    output_docs.push_back(std::move(doc));
  }

  return documents_to_batch_(output_docs, batch->schema());
}

29.9.4 StreamingBatchWriter Interface

The streaming writer interface enables incremental output for any supported format:

class StreamingBatchWriter {
public:
  virtual ~StreamingBatchWriter() = default;

  // Write a single batch to the output file
  virtual auto write_batch(const std::shared_ptr<arrow::RecordBatch>& batch)
      -> db::Status = 0;

  // Close the writer and finalize the output file
  virtual auto close() -> db::Status = 0;
};

Each format implements this interface with appropriate file finalization logic.

29.10 Query Optimization Strategies

External table integration benefits from multiple optimization layers.

29.10.1 Pushdown Hierarchy

Cognica applies pushdown optimizations in a specific order:

Pushdown Priority=Partition Pruning>Predicate>Projection>Limit\text{Pushdown Priority} = \text{Partition Pruning} > \text{Predicate} > \text{Projection} > \text{Limit}
  1. Partition Pruning: Eliminates entire directory subtrees based on partition column predicates
  2. Predicate Pushdown: Applies row-level filters at the storage layer
  3. Projection Pushdown: Reads only required columns
  4. Limit Pushdown: Stops scanning after sufficient rows

29.10.2 Statistics-Based Optimization

Parquet files include row group statistics (min/max values) that enable additional pruning:

auto get_table_stats(const std::string& table_name) -> TableStats {
  auto def = lookup_virtual_table_(table_name);
  if (!def || def->source_type != VirtualTableSourceType::kFile) {
    return delegate_->get_table_stats(table_name);
  }

  // Use Arrow Scanner::CountRows() which reads only metadata
  auto dataset = create_dataset_(*def);
  auto scanner = arrow::dataset::ScannerBuilder(dataset).Finish();
  auto row_count = scanner->CountRows();

  return TableStats{
      .row_count = *row_count,
      .size_bytes = estimate_size_(dataset)
  };
}

29.10.3 Cost Model Integration

The query optimizer uses external table statistics for cost estimation:

Costexternal=Networklatency+Rows×Row_SizeBandwidth+CPUconversion\text{Cost}_{\text{external}} = \text{Network}_{\text{latency}} + \frac{\text{Rows} \times \text{Row\_Size}}{\text{Bandwidth}} + \text{CPU}_{\text{conversion}}

For remote sources (DuckDB, Flight SQL), network latency dominates the cost model. For local file sources, I/O bandwidth and CPU conversion time are primary factors.

29.11 Error Handling and Recovery

External data sources introduce failure modes not present with local tables.

29.11.1 Connection Failure Handling

Connection failures to external systems are handled through retry logic:

auto acquire_client(const FlightSQLEndpointConfig& config)
    -> std::expected<FlightSqlClient*, db::Status> {

  for (int32_t retry = 0; retry < config.max_retries; ++retry) {
    auto result = try_acquire_client_(config);
    if (result) {
      return result;
    }

    // Exponential backoff
    std::this_thread::sleep_for(
        std::chrono::milliseconds(config.retry_delay_ms * (1 << retry)));
  }

  return std::unexpected(db::Status::IOError(
      "Failed to connect after " + std::to_string(config.max_retries) +
      " retries"));
}

29.11.2 Schema Mismatch Detection

Schema changes in external sources can cause query failures. The system detects mismatches during cursor initialization:

auto create_cursor_with_filter_(const ExternalVirtualTableDef& def,
                                 const ast::Expr* filter)
    -> std::unique_ptr<Cursor> {

  // Refresh schema if needed
  if (def.needs_schema_refresh()) {
    vtable_manager_->refresh_schema(def.table_name);
    def = *vtable_manager_->lookup(def.table_name);
  }

  // Validate filter columns exist in schema
  auto filter_columns = extract_column_refs_(filter);
  for (const auto& col : filter_columns) {
    if (!def.schema->GetFieldByName(col)) {
      return std::make_unique<ErrorCursor>(
          db::Status::InvalidArgument(
              "Column not found in external table: " + col));
    }
  }

  return create_external_cursor_(def, filter);
}

29.11.3 Health Monitoring

Endpoint health is tracked for connection pool management:

enum class EndpointHealth {
  kUnknown,    // Not yet checked
  kHealthy,    // Responding to health checks
  kUnhealthy,  // Failed health check
  kConnecting  // Connection in progress
};

struct EndpointStatus {
  EndpointHealth health;
  std::string last_error;
  std::chrono::steady_clock::time_point last_check_time;
  int64_t successful_queries;
  int64_t failed_queries;
};

29.12 Security Considerations

External table integration introduces security concerns that require careful handling.

29.12.1 Credential Management

Connection credentials should not be stored in plain text. The system supports multiple credential sources:

// Virtual table definition stores credential references, not values
std::string flight_auth_token;    // Bearer token or reference
std::string flight_username;      // Basic auth username
std::string flight_password;      // Basic auth password (encrypted)

// Production deployments should use environment variables
// or secret management systems (HashiCorp Vault, AWS Secrets Manager)

29.12.2 SQL Injection Prevention

Table and column names from external definitions must be sanitized:

static auto make_safe_identifier_(const std::string& name)
    -> std::expected<std::string, db::Status> {

  if (name.empty()) {
    return std::unexpected(db::Status::InvalidArgument(
        "Identifier name cannot be empty"));
  }

  std::string safe_name;
  safe_name.reserve(name.size());

  for (char c : name) {
    if (std::isalnum(c) || c == '_') {
      safe_name += c;
    } else {
      safe_name += '_';
    }
  }

  return safe_name;
}

29.12.3 TLS Configuration

Flight SQL connections support TLS with optional mutual authentication:

struct FlightSQLEndpointConfig {
  bool tls_enabled = false;
  std::string tls_root_certs;     // Custom CA certificates (PEM)
  std::string tls_private_key;    // Client private key (mTLS)
  std::string tls_cert_chain;     // Client certificate chain (mTLS)
  bool tls_skip_verify = false;   // Skip verification (testing only)
};

29.13 Performance Analysis

External table performance depends on multiple factors that differ from local table access.

29.13.1 Latency Components

Total query latency for external tables includes:

Ttotal=Tconnect+Tplan+Texecute+Ttransfer+TconvertT_{\text{total}} = T_{\text{connect}} + T_{\text{plan}} + T_{\text{execute}} + T_{\text{transfer}} + T_{\text{convert}}

Where:

  • TconnectT_{\text{connect}}: Connection establishment (amortized via pooling)
  • TplanT_{\text{plan}}: Query planning on remote system
  • TexecuteT_{\text{execute}}: Remote query execution
  • TtransferT_{\text{transfer}}: Network data transfer
  • TconvertT_{\text{convert}}: Arrow to Document conversion

29.13.2 Pushdown Effectiveness

The effectiveness of pushdown operations can be quantified:

Pushdown Ratio=1Rows TransferredTotal Rows\text{Pushdown Ratio} = 1 - \frac{\text{Rows Transferred}}{\text{Total Rows}}

High pushdown ratios indicate effective filter optimization. A query with 99% pushdown ratio transfers only 1% of the source data.

29.13.3 Memory Efficiency

Streaming execution maintains bounded memory usage:

OperationMemory Usage
Full MaterializationO(n)O(n) where nn = result size
Batch StreamingO(batch_size)O(\text{batch\_size})
Document ConversionO(batch_size×doc_size)O(\text{batch\_size} \times \text{doc\_size})

The streaming approach is critical for large result sets that would otherwise exhaust available memory.

29.14 Summary

External table integration extends Cognica's query capabilities to heterogeneous data sources while maintaining SQL compatibility and query optimization benefits.

Key Architectural Decisions:

  1. Decorator Pattern: The cursor provider chain enables clean separation of concerns between source types while maintaining a unified interface.

  2. Arrow-Native Processing: Using Arrow as the internal data format enables zero-copy data exchange and efficient columnar processing throughout the pipeline.

  3. Streaming Execution: Batch-oriented streaming avoids memory exhaustion when processing large external datasets.

  4. Connection Pooling: Reusing connections to external systems amortizes the substantial cost of connection establishment.

Query Optimization Opportunities:

  • Partition pruning eliminates entire directory subtrees
  • Predicate pushdown filters at the storage/remote layer
  • Projection pushdown reads only required columns
  • Limit pushdown enables early termination

The external table integration transforms Cognica from an isolated database into a federated query engine capable of unifying data across organizational boundaries. This capability is increasingly important as data architectures evolve toward data mesh and lakehouse patterns where data resides in multiple specialized systems.

Exercises

  1. Pushdown Analysis: Design a query plan analyzer that reports pushdown effectiveness for external table queries. The analyzer should identify predicates that could not be pushed down and suggest query rewrites that enable pushdown.

  2. Connection Pool Tuning: Implement an adaptive connection pool that adjusts pool size based on query load patterns. The pool should expand during high-load periods and contract during idle periods to balance resource utilization.

  3. Schema Evolution: Design a schema evolution handling system for external tables. When the external schema changes, the system should either adapt automatically (for compatible changes) or report detailed error messages (for incompatible changes).

  4. Cross-Source Joins: Analyze the query optimization challenges for joins between external tables from different sources (e.g., Parquet files joined with Flight SQL tables). Propose strategies for minimizing data transfer in such queries.

Further Reading

  • SQL/MED (Management of External Data) standard, ISO/IEC 9075-9
  • Apache Arrow Dataset API documentation
  • Apache Arrow Acero execution engine design
  • DuckDB extension architecture
  • Arrow Flight SQL protocol specification

Copyright (c) 2023-2026 Cognica, Inc.