Chapter 29: External Table Integration
Modern data architectures increasingly rely on data residing outside the primary database system. Data lakes, cloud storage, operational databases, and analytical engines each contain valuable data that applications need to query alongside local tables. This chapter examines Cognica's external table integration system, which provides unified SQL access to heterogeneous data sources through a Foreign Data Wrapper (FDW) abstraction inspired by PostgreSQL's design.
29.1 The Foreign Data Wrapper Paradigm
The SQL/MED (Management of External Data) standard introduced the concept of foreign tables—tables that appear in the database catalog but whose data resides in external systems. PostgreSQL's implementation of this standard through Foreign Data Wrappers established a pattern that Cognica extends with modern columnar formats and distributed query capabilities.
29.1.1 Architectural Goals
Cognica's external table integration addresses several architectural challenges:
Transparent SQL Access: External data should be queryable using standard SQL syntax without requiring application code changes. A query joining local and external tables should work identically to queries involving only local tables.
Predicate Pushdown: Filter conditions should be pushed to the external system whenever possible. Filtering a Parquet file containing billions of rows at the storage layer avoids transferring unnecessary data.
Columnar Efficiency: Modern analytical workloads benefit from columnar storage. External table access should preserve columnar execution through the entire pipeline, not materialize row-by-row.
Connection Pooling: Database connections to external systems are expensive. Connection pooling amortizes connection establishment costs across multiple queries.
29.1.2 Source Type Taxonomy
Cognica supports three categories of external data sources, each with distinct access patterns:
The file-based sources leverage Apache Arrow's Dataset API for zero-copy columnar access. DuckDB-backed sources utilize DuckDB's extensive connector ecosystem. Flight SQL sources communicate with Arrow-native analytical systems using the Flight protocol.
29.2 Virtual Table Definition Model
The ExternalVirtualTableDef structure captures all metadata required to access an external table:
struct ExternalVirtualTableDef {
std::string table_name;
std::string schema_name = "public";
std::string source_path;
ast::CopyFormat format = ast::CopyFormat::kAuto;
ast::CopyPartitioning partitioning = ast::CopyPartitioning::kNone;
// Schema (inferred or explicit)
std::shared_ptr<arrow::Schema> schema;
bool schema_is_explicit = false;
// Partition columns (for PARTITION BY clause)
std::vector<std::string> partition_columns;
// Format-specific options
std::unordered_map<std::string, std::string> options;
// Schema refresh configuration
SchemaRefreshMode schema_refresh_mode = SchemaRefreshMode::kAuto;
int64_t schema_refresh_interval_sec = 60;
// Source type discriminator
VirtualTableSourceType source_type = VirtualTableSourceType::kFile;
// DuckDB-specific fields
std::optional<DuckDBSourceType> duckdb_source_type;
std::string connection_string;
std::string source_table;
std::vector<std::string> required_extensions;
bool read_only = true;
// Flight SQL-specific fields
std::optional<FlightSQLSourceType> flight_sql_source_type;
std::string flight_endpoint;
std::string flight_catalog;
std::string flight_schema;
std::string flight_table;
std::string flight_auth_token;
int32_t flight_timeout_ms = 30000;
};
29.2.1 Schema Management
External data sources present a schema management challenge: the schema may change without the database's knowledge. Cognica addresses this through configurable refresh strategies:
Automatic Refresh: When schema_refresh_mode is kAuto, the system periodically checks whether the source schema has changed. For file sources, this involves comparing file modification timestamps against the last refresh time.
Manual Refresh: Production systems may prefer explicit control over schema changes. When schema_refresh_mode is kManual, schema updates require an explicit ALTER VIRTUAL TABLE REFRESH command.
The refresh interval determines how frequently the system checks for schema changes:
29.2.2 Hive-Style Partitioning
Many data lake formats use Hive-style partitioning where partition values are encoded in directory paths:
s3://data-lake/events/
year=2024/
month=01/
data-001.parquet
data-002.parquet
month=02/
data-003.parquet
year=2025/
month=01/
data-004.parquet
The partition columns (year, month) become virtual columns that can be used in filter predicates. When a query filters on partition columns, the Dataset API prunes entire directory subtrees without reading any files.
29.3 The Decorator Pattern for Cursor Providers
Cognica implements external table access using the Decorator design pattern. Each source type has a dedicated cursor provider that intercepts requests for its tables and delegates other requests to the wrapped provider:
29.3.1 Provider Chain Configuration
The cursor provider chain is assembled during session initialization:
// Start with native table provider
auto native_provider = std::make_unique<NativeCursorProvider>(db);
// Wrap with file-based external table provider
auto file_provider = std::make_unique<ExternalTableCursorProvider>(
vtable_manager, native_provider.get());
// Wrap with DuckDB provider
auto duckdb_provider = std::make_unique<DuckDBCursorProvider>(
vtable_manager, duckdb_manager, file_provider.get());
// Wrap with Flight SQL provider
auto flight_provider = std::make_unique<FlightSQLCursorProvider>(
vtable_manager, flight_manager, duckdb_provider.get());
Each provider in the chain follows the same pattern:
- Check if the requested table matches this provider's type
- If yes, create the appropriate cursor
- If no, delegate to the wrapped provider
29.3.2 Table Name Resolution
Table names may include schema qualifiers (public.events) that must be parsed:
auto parse_table_name_(const std::string& table_name)
-> std::pair<std::string, std::string> {
auto dot_pos = table_name.find('.');
if (dot_pos != std::string::npos) {
return {table_name.substr(0, dot_pos),
table_name.substr(dot_pos + 1)};
}
return {"public", table_name};
}
The virtual table manager maintains an in-memory map keyed by fully-qualified names, with persistence to a system collection for recovery after restart.
29.4 Arrow Dataset API Integration
For file-based external tables, Cognica leverages Apache Arrow's Dataset API, which provides a unified interface for reading columnar data from various formats.
29.4.1 Supported File Formats
The Dataset API supports multiple columnar formats:
| Format | Extension | Characteristics |
|---|---|---|
| Parquet | .parquet | Columnar, compressed, statistics |
| Arrow IPC | .arrow, .feather | Columnar, zero-copy |
| ORC | .orc | Columnar, compressed, Hive native |
| CSV | .csv | Row-oriented, text |
| JSON | .json, .ndjson | Semi-structured, line-delimited |
Parquet is the most common format due to its combination of columnar layout, efficient compression, and rich metadata including min/max statistics per row group.
29.4.2 Dataset Discovery
Dataset discovery involves scanning the source path and building a dataset object:
auto create_dataset_(const ExternalVirtualTableDef& def)
-> std::shared_ptr<arrow::dataset::Dataset> {
// Determine file format
auto format = create_format_(def.format, def.options);
// Create filesystem
auto fs = arrow::fs::FileSystemFromUri(def.source_path);
// Discover files
arrow::dataset::FileSystemFactoryOptions factory_options;
factory_options.partition_base_dir = def.source_path;
if (def.partitioning != ast::CopyPartitioning::kNone) {
factory_options.partitioning =
arrow::dataset::HivePartitioning::MakeFactory();
}
auto factory = arrow::dataset::FileSystemDatasetFactory::Make(
fs, selector, format, factory_options);
return factory->Finish();
}
29.4.3 The ExternalTableCursor
The ExternalTableCursor streams data from external files using Arrow's batch-oriented API:
class ExternalTableCursor final : public db::document::Cursor {
public:
explicit ExternalTableCursor(
const ExternalVirtualTableDef& table_def,
std::vector<std::string> columns = {},
std::shared_ptr<arrow::compute::Expression> filter = nullptr);
// With limit/offset hints for early termination
ExternalTableCursor(
const ExternalVirtualTableDef& table_def,
std::vector<std::string> columns,
std::shared_ptr<arrow::compute::Expression> filter,
std::optional<int64_t> limit_hint,
std::optional<int64_t> offset_hint);
private:
// Arrow dataset and scanner
std::shared_ptr<arrow::dataset::Dataset> dataset_;
std::shared_ptr<arrow::dataset::Scanner> scanner_;
std::shared_ptr<arrow::RecordBatchReader> reader_;
// Current batch and position
std::shared_ptr<arrow::RecordBatch> current_batch_;
int64_t row_index_ = 0;
// Current document (converted from batch row)
db::document::Document current_doc_;
};
The cursor initializes a scanner with optional filter and projection pushdown:
auto initialize_scanner_() -> db::Status {
arrow::dataset::ScannerBuilder builder(dataset_);
// Apply projection pushdown
if (!columns_.empty()) {
builder.Project(columns_);
}
// Apply filter pushdown
if (filter_) {
builder.Filter(*filter_);
}
// Configure batch size for streaming
builder.BatchSize(kDefaultBatchSize);
scanner_ = builder.Finish();
reader_ = scanner_->ToRecordBatchReader();
return db::Status::OK();
}
29.5 Predicate Pushdown with ArrowFilterConverter
Converting SQL WHERE clauses to Arrow compute expressions enables predicate pushdown to the storage layer. The ArrowFilterConverter handles this translation.
29.5.1 Expression Mapping
SQL expressions map to Arrow compute expressions:
| SQL Expression | Arrow Compute Function |
|---|---|
a = b | equal(a, b) |
a <> b | not_equal(a, b) |
a < b | less(a, b) |
a <= b | less_equal(a, b) |
a > b | greater(a, b) |
a >= b | greater_equal(a, b) |
a AND b | and_(a, b) |
a OR b | or_(a, b) |
NOT a | invert(a) |
a IS NULL | is_null(a) |
a IS NOT NULL | is_valid(a) |
29.5.2 Type Coercion
A common challenge arises when comparing string constants against numeric columns, particularly with Hive partition columns:
-- year column is inferred as INT64 from directory names
SELECT * FROM events WHERE year = '2024';
The converter performs automatic type coercion when schema information is available:
auto coerce_constant_to_column_type_(
const std::string& column_name,
arrow::compute::Expression constant_expr)
-> arrow::compute::Expression {
if (!schema_) {
return constant_expr;
}
auto field = schema_->GetFieldByName(column_name);
if (!field) {
return constant_expr;
}
// Try to parse string as numeric
if (constant_expr.literal() &&
constant_expr.literal()->is_scalar()) {
auto& scalar = *constant_expr.literal()->scalar();
if (scalar.type->id() == arrow::Type::STRING) {
auto str =
static_cast<arrow::StringScalar&>(scalar).value->ToString();
auto coerced =
try_parse_string_as_numeric_(str, *field->type());
if (coerced) {
return *coerced;
}
}
}
return constant_expr;
}
29.5.3 Safe Fallback Semantics
Unsupported expressions return literal(true), which disables pushdown for that predicate while maintaining correctness:
auto convert_expr_(const ast::Expr* expr) -> arrow::compute::Expression {
if (!expr) {
return arrow::compute::literal(true);
}
switch (expr->type()) {
case ast::ExprType::kBinaryExpr:
return convert_binary_expr_(static_cast<const ast::BinaryExpr*>(expr));
case ast::ExprType::kColumnRef:
return convert_column_ref_(static_cast<const ast::ColumnRef*>(expr));
// ... other cases
default:
// Unsupported: disable pushdown, filter post-scan
return arrow::compute::literal(true);
}
}
This approach ensures that any SQL predicate can be expressed, with pushdown optimizations applied where possible.
29.6 Vectorized Execution with Arrow Acero
When filter and limit pushdown combine, Cognica uses Arrow Acero for fully vectorized execution without row-by-row document conversion overhead.
29.6.1 The Acero Execution Engine
Arrow Acero is a streaming query execution engine that processes data in columnar batches:
class AceroExecutor final {
public:
explicit AceroExecutor(std::shared_ptr<AceroTableProvider> table_provider);
// Execute LogicalPlan and return Arrow Table
auto execute(const planner::LogicalPlan* plan)
-> arrow::Result<std::shared_ptr<arrow::Table>>;
// Execute with streaming results
using BatchCallback =
std::function<arrow::Status(std::shared_ptr<arrow::RecordBatch>)>;
auto execute_streaming(const planner::LogicalPlan* plan,
BatchCallback callback) -> arrow::Status;
// Execute via Substrait intermediate representation
auto execute_substrait(const arrow::Buffer& substrait_plan)
-> arrow::Result<std::shared_ptr<arrow::Table>>;
};
29.6.2 Acero Plan Construction
The execution plan is constructed as a tree of Acero nodes:
auto create_acero_plan_(const ExternalVirtualTableDef& def,
const ast::Expr* filter,
std::optional<int64_t> limit)
-> arrow::acero::Declaration {
// Source: Dataset scan
auto scan_options = std::make_shared<arrow::dataset::ScanOptions>();
if (filter) {
scan_options->filter = convert_filter_(filter);
}
arrow::acero::Declaration plan{"scan",
arrow::dataset::ScanNodeOptions{dataset_, scan_options}};
// Add FetchNode for limit
if (limit) {
plan = arrow::acero::Declaration::Sequence({
std::move(plan),
{"fetch", arrow::acero::FetchNodeOptions{0, *limit}}
});
}
return plan;
}
29.6.3 Memory-Efficient Streaming
Acero processes data in a streaming fashion with bounded memory usage:
The StreamingSinkConsumer enables callback-based result processing:
class StreamingSinkConsumer : public arrow::acero::SinkNodeConsumer {
public:
explicit StreamingSinkConsumer(BatchCallback callback)
: callback_(std::move(callback)) {}
auto Consume(arrow::ExecBatch batch) -> arrow::Status override {
auto record_batch = batch.ToRecordBatch(schema_);
return callback_(record_batch);
}
private:
BatchCallback callback_;
std::shared_ptr<arrow::Schema> schema_;
};
29.7 DuckDB Connector Integration
DuckDB provides Cognica with access to a rich ecosystem of data sources through its extension system.
29.7.1 Supported DuckDB Sources
| Source Type | Extension | Use Cases |
|---|---|---|
| PostgreSQL | postgres | Operational databases |
| MySQL | mysql | Legacy systems |
| SQLite | (built-in) | Local databases |
| Delta Lake | delta | Data lake tables |
| Iceberg | iceberg | Data lake tables |
29.7.2 Connection Management
The DuckDBManager maintains a connection pool for efficient resource utilization:
class DuckDBManager final {
public:
explicit DuckDBManager(const DuckDBConfig& config = DuckDBConfig{});
// Connection pool management
auto acquire_connection() -> DuckDBConnection;
void release_connection(DuckDBConnection conn);
// Query execution
auto execute_query(const std::string& sql)
-> std::pair<DuckDBResult, db::Status>;
// Extension management
auto load_extension(const std::string& extension_name) -> db::Status;
private:
DuckDBConfig config_;
duckdb_database db_;
std::vector<duckdb_connection> connection_pool_;
std::unordered_set<std::string> loaded_extensions_;
};
The configuration includes memory limits and cloud storage credentials:
struct DuckDBConfig {
int64_t memory_limit_bytes = 1024 * 1024 * 1024; // 1GB
int32_t max_connections = 4;
std::string temp_directory;
// Cloud storage configurations
S3Config s3;
AzureConfig azure;
GCSConfig gcs;
};
29.7.3 Query Translation
The DuckDBCursorProvider generates native DuckDB SQL from the virtual table definition and filter expression:
auto generate_query_(const ExternalVirtualTableDef& def,
const ast::Expr* filter,
const std::vector<std::string>& columns,
std::optional<int64_t> limit,
std::optional<int64_t> offset) const
-> std::expected<std::string, db::Status> {
std::ostringstream sql;
// SELECT clause with projection pushdown
sql << "SELECT ";
if (columns.empty()) {
sql << "*";
} else {
for (size_t i = 0; i < columns.size(); ++i) {
if (i > 0) sql << ", ";
sql << quote_identifier_(columns[i]);
}
}
// FROM clause with source-specific table reference
sql << " FROM " << generate_from_clause_(def);
// WHERE clause with predicate pushdown
if (filter) {
sql << " WHERE " << generate_where_clause_(filter);
}
// LIMIT/OFFSET pushdown
if (limit) {
sql << " LIMIT " << *limit;
}
if (offset) {
sql << " OFFSET " << *offset;
}
return sql.str();
}
29.7.4 DuckDB Cursor Implementation
The DuckDBCursor streams results using DuckDB's chunk-based API:
class DuckDBCursor final : public db::document::Cursor {
public:
explicit DuckDBCursor(DuckDBResult result);
private:
// Fetch the next chunk from the result
auto fetch_next_chunk_() -> bool {
current_chunk_ = duckdb_fetch_chunk(result_.get());
if (!current_chunk_) {
exhausted_ = true;
return false;
}
chunk_size_ = duckdb_data_chunk_get_size(current_chunk_);
chunk_row_index_ = 0;
++total_chunks_read_;
return true;
}
// Convert current chunk row to document
void update_current_document_() {
current_doc_.clear();
auto col_count = duckdb_column_count(&result_.get());
for (idx_t col = 0; col < col_count; ++col) {
auto name = duckdb_column_name(&result_.get(), col);
auto type = duckdb_column_type(&result_.get(), col);
auto vector = duckdb_data_chunk_get_vector(current_chunk_, col);
current_doc_.set(name, convert_vector_value_(vector, type,
chunk_row_index_));
}
}
private:
DuckDBResult result_;
duckdb_data_chunk current_chunk_;
idx_t chunk_row_index_;
idx_t chunk_size_;
};
29.8 Arrow Flight SQL Connector
Flight SQL enables Cognica to query remote Arrow-native databases using the high-performance Flight protocol.
29.8.1 Supported Flight SQL Systems
| System | Configuration | Use Cases |
|---|---|---|
| Cognica | kCognica | Distributed Cognica clusters |
| ClickHouse | kClickHouse | Real-time analytics |
| DataFusion/Ballista | kDataFusion | Distributed query processing |
| Dremio | kDremio | Data lake analytics |
| Generic | kGeneric | Any Flight SQL server |
29.8.2 Client Manager Architecture
The FlightSQLClientManager handles connection pooling, authentication, and health checking:
class FlightSQLClientManager final {
public:
explicit FlightSQLClientManager(const FlightSQLClientConfig& config);
// Client acquisition (connection pooled)
auto acquire_client(const FlightSQLEndpointConfig& endpoint_config)
-> std::expected<arrow::flight::sql::FlightSqlClient*, db::Status>;
void release_client(const std::string& endpoint,
arrow::flight::sql::FlightSqlClient* client);
// Query execution with client lifecycle management
auto execute_query_with_client(
const FlightSQLEndpointConfig& endpoint_config,
const std::string& sql)
-> std::expected<QueryResult, db::Status>;
// Schema discovery
auto get_table_schema(const FlightSQLEndpointConfig& endpoint_config,
const std::string& catalog,
const std::string& schema,
const std::string& table)
-> std::expected<std::shared_ptr<arrow::Schema>, db::Status>;
private:
// Connection pool: endpoint -> list of clients
std::unordered_map<std::string, std::deque<PooledClient>> connection_pools_;
std::unordered_map<std::string, EndpointStatus> endpoint_status_;
};
29.8.3 Endpoint Configuration
Each Flight SQL endpoint requires authentication and TLS configuration:
struct FlightSQLEndpointConfig {
std::string name;
std::string endpoint; // host:port or tls://host:port
FlightSQLSourceType source_type;
// Authentication
std::string auth_token; // Bearer token
std::string username; // Basic auth alternative
std::string password;
// TLS configuration
bool tls_enabled = false;
std::string tls_root_certs; // Custom CA certificates
std::string tls_private_key; // Client private key (mTLS)
std::string tls_cert_chain; // Client certificate chain
// Timeouts
int32_t connect_timeout_ms = 5000;
int32_t query_timeout_ms = 30000;
int32_t max_retries = 3;
};
29.8.4 Flight SQL Cursor
The FlightSQLCursor streams results from the Flight server:
class FlightSQLCursor final : public db::document::Cursor {
public:
// Create with reader and client lifecycle management
FlightSQLCursor(std::unique_ptr<arrow::flight::FlightStreamReader> reader,
FlightSQLClientManager* manager,
const FlightSQLEndpointConfig& endpoint_config,
arrow::flight::sql::FlightSqlClient* acquired_client);
// Deferred execution: FlightInfo stored, stream initialized on first access
FlightSQLCursor(FlightSQLClientManager* manager,
const FlightSQLEndpointConfig& endpoint_config,
std::unique_ptr<arrow::flight::FlightInfo> info);
private:
// Fetch the next record batch from the stream
auto fetch_next_batch_() -> bool {
auto status = reader_->Next(¤t_batch_);
if (!status.ok()) {
status_ = db::Status::IOError(status.ToString());
return false;
}
if (!current_batch_) {
exhausted_ = true;
return false;
}
batch_size_ = current_batch_->num_rows();
batch_row_index_ = 0;
++total_batches_read_;
return true;
}
private:
std::unique_ptr<arrow::flight::FlightStreamReader> reader_;
FlightSQLClientManager* manager_;
arrow::flight::sql::FlightSqlClient* acquired_client_;
std::shared_ptr<arrow::RecordBatch> current_batch_;
};
29.9 Write Operations
External tables support INSERT, UPDATE, and DELETE operations through the ExternalTableWriter class.
29.9.1 Write Semantics by Format
Different file formats have different write characteristics:
| Format | Append | Update | Delete |
|---|---|---|---|
| Parquet | New file | Rewrite | Rewrite |
| Arrow IPC | New file | Rewrite | Rewrite |
| ORC | New file | Rewrite | Rewrite |
| CSV | New file | Rewrite | Rewrite |
Parquet, Arrow IPC, and ORC are immutable formats. UPDATE and DELETE operations require reading the entire dataset, applying modifications in memory, and rewriting the affected files.
29.9.2 INSERT Operations
INSERT creates a new file with a timestamp-based filename for uniqueness:
auto insert(const std::vector<db::document::Document>& docs)
-> WriteOperationResult {
// Generate unique output path
auto output_path = generate_output_path_();
// Convert documents to Arrow RecordBatch
auto batch = documents_to_batch_(docs, table_def_.schema);
// Write based on format
auto status = write_batch_to_file_(batch, output_path);
return WriteOperationResult{
.success = status.ok(),
.rows_affected = static_cast<int64_t>(docs.size()),
.output_path = output_path
};
}
29.9.3 Streaming UPDATE and DELETE
For large datasets, UPDATE and DELETE use streaming batch processing to avoid memory exhaustion:
auto update(const ast::Expr* predicate,
const db::document::Document& updates)
-> WriteOperationResult {
// Create streaming scanner
auto scanner = create_scanner_();
// Create streaming writer for temporary output
auto temp_path = generate_temp_path_();
auto writer = create_streaming_writer_(temp_path, table_def_.schema);
int64_t updated_count = 0;
// Process batches in streaming fashion
for (auto batch : scanner) {
auto processed_batch =
process_batch_for_update_(batch, predicate, updates, updated_count);
writer->write_batch(processed_batch);
}
writer->close();
// Atomic replacement of original files
replace_source_files_(temp_path);
return WriteOperationResult{
.success = true,
.rows_affected = updated_count
};
}
The process_batch_for_update_ function applies updates to matching rows within each batch:
auto process_batch_for_update_(
const std::shared_ptr<arrow::RecordBatch>& batch,
const ast::Expr* predicate,
const db::document::Document& updates,
int64_t& updated_count)
-> std::shared_ptr<arrow::RecordBatch> {
// Convert batch rows to documents for predicate evaluation
std::vector<db::document::Document> output_docs;
output_docs.reserve(batch->num_rows());
for (int64_t i = 0; i < batch->num_rows(); ++i) {
auto doc = batch_row_to_document_(batch, i);
if (matches_predicate_(doc, predicate)) {
apply_updates_(doc, updates);
++updated_count;
}
output_docs.push_back(std::move(doc));
}
return documents_to_batch_(output_docs, batch->schema());
}
29.9.4 StreamingBatchWriter Interface
The streaming writer interface enables incremental output for any supported format:
class StreamingBatchWriter {
public:
virtual ~StreamingBatchWriter() = default;
// Write a single batch to the output file
virtual auto write_batch(const std::shared_ptr<arrow::RecordBatch>& batch)
-> db::Status = 0;
// Close the writer and finalize the output file
virtual auto close() -> db::Status = 0;
};
Each format implements this interface with appropriate file finalization logic.
29.10 Query Optimization Strategies
External table integration benefits from multiple optimization layers.
29.10.1 Pushdown Hierarchy
Cognica applies pushdown optimizations in a specific order:
- Partition Pruning: Eliminates entire directory subtrees based on partition column predicates
- Predicate Pushdown: Applies row-level filters at the storage layer
- Projection Pushdown: Reads only required columns
- Limit Pushdown: Stops scanning after sufficient rows
29.10.2 Statistics-Based Optimization
Parquet files include row group statistics (min/max values) that enable additional pruning:
auto get_table_stats(const std::string& table_name) -> TableStats {
auto def = lookup_virtual_table_(table_name);
if (!def || def->source_type != VirtualTableSourceType::kFile) {
return delegate_->get_table_stats(table_name);
}
// Use Arrow Scanner::CountRows() which reads only metadata
auto dataset = create_dataset_(*def);
auto scanner = arrow::dataset::ScannerBuilder(dataset).Finish();
auto row_count = scanner->CountRows();
return TableStats{
.row_count = *row_count,
.size_bytes = estimate_size_(dataset)
};
}
29.10.3 Cost Model Integration
The query optimizer uses external table statistics for cost estimation:
For remote sources (DuckDB, Flight SQL), network latency dominates the cost model. For local file sources, I/O bandwidth and CPU conversion time are primary factors.
29.11 Error Handling and Recovery
External data sources introduce failure modes not present with local tables.
29.11.1 Connection Failure Handling
Connection failures to external systems are handled through retry logic:
auto acquire_client(const FlightSQLEndpointConfig& config)
-> std::expected<FlightSqlClient*, db::Status> {
for (int32_t retry = 0; retry < config.max_retries; ++retry) {
auto result = try_acquire_client_(config);
if (result) {
return result;
}
// Exponential backoff
std::this_thread::sleep_for(
std::chrono::milliseconds(config.retry_delay_ms * (1 << retry)));
}
return std::unexpected(db::Status::IOError(
"Failed to connect after " + std::to_string(config.max_retries) +
" retries"));
}
29.11.2 Schema Mismatch Detection
Schema changes in external sources can cause query failures. The system detects mismatches during cursor initialization:
auto create_cursor_with_filter_(const ExternalVirtualTableDef& def,
const ast::Expr* filter)
-> std::unique_ptr<Cursor> {
// Refresh schema if needed
if (def.needs_schema_refresh()) {
vtable_manager_->refresh_schema(def.table_name);
def = *vtable_manager_->lookup(def.table_name);
}
// Validate filter columns exist in schema
auto filter_columns = extract_column_refs_(filter);
for (const auto& col : filter_columns) {
if (!def.schema->GetFieldByName(col)) {
return std::make_unique<ErrorCursor>(
db::Status::InvalidArgument(
"Column not found in external table: " + col));
}
}
return create_external_cursor_(def, filter);
}
29.11.3 Health Monitoring
Endpoint health is tracked for connection pool management:
enum class EndpointHealth {
kUnknown, // Not yet checked
kHealthy, // Responding to health checks
kUnhealthy, // Failed health check
kConnecting // Connection in progress
};
struct EndpointStatus {
EndpointHealth health;
std::string last_error;
std::chrono::steady_clock::time_point last_check_time;
int64_t successful_queries;
int64_t failed_queries;
};
29.12 Security Considerations
External table integration introduces security concerns that require careful handling.
29.12.1 Credential Management
Connection credentials should not be stored in plain text. The system supports multiple credential sources:
// Virtual table definition stores credential references, not values
std::string flight_auth_token; // Bearer token or reference
std::string flight_username; // Basic auth username
std::string flight_password; // Basic auth password (encrypted)
// Production deployments should use environment variables
// or secret management systems (HashiCorp Vault, AWS Secrets Manager)
29.12.2 SQL Injection Prevention
Table and column names from external definitions must be sanitized:
static auto make_safe_identifier_(const std::string& name)
-> std::expected<std::string, db::Status> {
if (name.empty()) {
return std::unexpected(db::Status::InvalidArgument(
"Identifier name cannot be empty"));
}
std::string safe_name;
safe_name.reserve(name.size());
for (char c : name) {
if (std::isalnum(c) || c == '_') {
safe_name += c;
} else {
safe_name += '_';
}
}
return safe_name;
}
29.12.3 TLS Configuration
Flight SQL connections support TLS with optional mutual authentication:
struct FlightSQLEndpointConfig {
bool tls_enabled = false;
std::string tls_root_certs; // Custom CA certificates (PEM)
std::string tls_private_key; // Client private key (mTLS)
std::string tls_cert_chain; // Client certificate chain (mTLS)
bool tls_skip_verify = false; // Skip verification (testing only)
};
29.13 Performance Analysis
External table performance depends on multiple factors that differ from local table access.
29.13.1 Latency Components
Total query latency for external tables includes:
Where:
- : Connection establishment (amortized via pooling)
- : Query planning on remote system
- : Remote query execution
- : Network data transfer
- : Arrow to Document conversion
29.13.2 Pushdown Effectiveness
The effectiveness of pushdown operations can be quantified:
High pushdown ratios indicate effective filter optimization. A query with 99% pushdown ratio transfers only 1% of the source data.
29.13.3 Memory Efficiency
Streaming execution maintains bounded memory usage:
| Operation | Memory Usage |
|---|---|
| Full Materialization | where = result size |
| Batch Streaming | |
| Document Conversion |
The streaming approach is critical for large result sets that would otherwise exhaust available memory.
29.14 Summary
External table integration extends Cognica's query capabilities to heterogeneous data sources while maintaining SQL compatibility and query optimization benefits.
Key Architectural Decisions:
-
Decorator Pattern: The cursor provider chain enables clean separation of concerns between source types while maintaining a unified interface.
-
Arrow-Native Processing: Using Arrow as the internal data format enables zero-copy data exchange and efficient columnar processing throughout the pipeline.
-
Streaming Execution: Batch-oriented streaming avoids memory exhaustion when processing large external datasets.
-
Connection Pooling: Reusing connections to external systems amortizes the substantial cost of connection establishment.
Query Optimization Opportunities:
- Partition pruning eliminates entire directory subtrees
- Predicate pushdown filters at the storage/remote layer
- Projection pushdown reads only required columns
- Limit pushdown enables early termination
The external table integration transforms Cognica from an isolated database into a federated query engine capable of unifying data across organizational boundaries. This capability is increasingly important as data architectures evolve toward data mesh and lakehouse patterns where data resides in multiple specialized systems.
Exercises
-
Pushdown Analysis: Design a query plan analyzer that reports pushdown effectiveness for external table queries. The analyzer should identify predicates that could not be pushed down and suggest query rewrites that enable pushdown.
-
Connection Pool Tuning: Implement an adaptive connection pool that adjusts pool size based on query load patterns. The pool should expand during high-load periods and contract during idle periods to balance resource utilization.
-
Schema Evolution: Design a schema evolution handling system for external tables. When the external schema changes, the system should either adapt automatically (for compatible changes) or report detailed error messages (for incompatible changes).
-
Cross-Source Joins: Analyze the query optimization challenges for joins between external tables from different sources (e.g., Parquet files joined with Flight SQL tables). Propose strategies for minimizing data transfer in such queries.
Further Reading
- SQL/MED (Management of External Data) standard, ISO/IEC 9075-9
- Apache Arrow Dataset API documentation
- Apache Arrow Acero execution engine design
- DuckDB extension architecture
- Arrow Flight SQL protocol specification