Chapter 30: Multi-Protocol Service Layer
Database systems increasingly serve diverse client ecosystems. Traditional applications use PostgreSQL drivers, data science workflows prefer Arrow-native protocols, and monitoring systems scrape HTTP endpoints. This chapter examines Cognica's multi-protocol service layer, which provides unified access through three distinct protocols: PostgreSQL wire protocol, Arrow Flight SQL, and HTTP REST.
30.1 Protocol Diversity in Modern Systems
The proliferation of protocols reflects the specialization of different use cases:
PostgreSQL Wire Protocol: Industry-standard database access enabling compatibility with thousands of existing tools, ORMs, and drivers.
Arrow Flight SQL: Columnar data transfer optimized for analytical workloads, eliminating serialization overhead when integrated with Arrow-native systems.
HTTP REST: Universal access for web applications, administrative interfaces, and systems without specialized client libraries.
30.1.1 Protocol Characteristics
Each protocol offers distinct trade-offs:
| Protocol | Serialization | Connection Model | Type Safety | Use Cases |
|---|---|---|---|---|
| PostgreSQL | Custom binary | Persistent | Runtime | Applications, BI tools |
| Flight SQL | Arrow IPC | Persistent | Runtime | Analytics, data science |
| HTTP | JSON/Text | Request-response | None | Admin, monitoring |
30.1.2 Unified Query Engine
Despite protocol diversity, all requests eventually execute against the same query engine. The service layer's responsibility is protocol translation—converting protocol-specific requests into internal operations and converting results back to protocol-specific formats:
30.2 Service Layer Architecture
The ServiceCoordinator class serves as the unified service coordinator, managing multiple protocol-specific services:
class ServiceCoordinator final {
public:
ServiceCoordinator();
~ServiceCoordinator();
void initialize();
void shutdown();
auto get_services() const -> const std::vector<std::shared_ptr<ServiceType>>&;
void run();
private:
std::vector<std::shared_ptr<ServiceType>> services_;
HTTPService http_service_;
FlightSQLService flight_sql_service_;
PostgreSQLService pgsql_service_;
};
30.2.1 Type-Erased Service Interface
Cognica uses a type-erased service interface that enables uniform handling of heterogeneous services:
class Service {
public:
void initialize() {
return te::call<void>(
[](auto& self) { return self.initialize(); },
*this);
}
void shutdown() {
return te::call<void>(
[](auto& self) { return self.shutdown(); },
*this);
}
auto get_services() const -> std::vector<std::shared_ptr<ServiceBase>> {
return te::call<std::vector<std::shared_ptr<ServiceBase>>>(
[](const auto& self) { return self.get_services(); },
*this);
}
void run() {
return te::call<void>(
[](auto& self) { return self.run(); },
*this);
}
};
using ServiceType = te::poly<Service>;
This pattern enables the service coordinator to manage services without knowing their concrete types, facilitating plugin-style extensibility.
30.2.2 Lifecycle Management
Services follow a consistent lifecycle:
- Construction: Service objects are created with default state
- Initialization:
initialize()configures resources based on runtime configuration - Running:
run()starts accepting connections (may be blocking or non-blocking) - Shutdown:
shutdown()gracefully terminates connections and releases resources
void ServiceCoordinator::initialize() {
// Initialize individual services based on configuration
http_service_.initialize();
flight_sql_service_.initialize();
pgsql_service_.initialize();
// Register protocol services
for (auto& service : services_) {
service->initialize();
}
}
30.3 HTTP REST Protocol
The HTTP service provides REST endpoints for web-based access and administrative operations.
30.3.1 HTTP Server Architecture
The HTTP server uses Boost.Beast for asynchronous I/O:
class HTTPServer final {
public:
explicit HTTPServer(const config::NetworkHTTPOption& options);
// Route registration
auto router() -> HTTPRouter&;
auto websocket_router() -> WebSocketRouter&;
void start();
void stop();
private:
void do_accept_();
void on_accept_(beast::error_code ec, tcp::socket socket);
private:
config::NetworkHTTPOption options_;
asio::io_context io_context_;
tcp::acceptor acceptor_;
HTTPRouter router_;
WebSocketRouter websocket_router_;
std::vector<std::thread> threads_;
std::atomic<bool> running_;
};
30.3.2 URL Routing with Path Parameters
The router supports RESTful URL patterns with path parameters:
class HTTPRouter final {
public:
void add_route(HTTPMethod method, const std::string& pattern,
RouteHandler handler);
// Convenience methods
void get(const std::string& pattern, RouteHandler handler);
void post(const std::string& pattern, RouteHandler handler);
void put(const std::string& pattern, RouteHandler handler);
void del(const std::string& pattern, RouteHandler handler);
auto route(HTTPRequest& request) const -> HTTPResponse;
private:
struct RouteEntry {
HTTPMethod method;
std::vector<std::string> segments;
std::vector<std::string> param_names;
RouteHandler handler;
};
auto match_route_(const std::string& path, const RouteEntry& entry,
std::unordered_map<std::string, std::string>& params) const
-> bool;
private:
std::vector<RouteEntry> routes_;
};
Route patterns support path parameters using curly braces:
void HTTPService::register_collection_routes_() {
// GET /api/v1/collections
router_.get("/api/v1/collections", handle_list_collections_);
// GET /api/v1/collections/{name}
router_.get("/api/v1/collections/{name}", handle_get_collection_);
// POST /api/v1/collections/{name}/documents
router_.post("/api/v1/collections/{name}/documents", handle_insert_);
// GET /api/v1/collections/{name}/documents/{id}
router_.get("/api/v1/collections/{name}/documents/{id}", handle_get_document_);
}
30.3.3 HTTP Session Handling
Each connection spawns an HTTP session that handles the request-response cycle:
class HTTPSession : public std::enable_shared_from_this<HTTPSession> {
public:
HTTPSession(tcp::socket socket, const HTTPRouter& router,
WebSocketRouter& websocket_router, int64_t request_timeout_ms,
int64_t max_body_size);
void run();
private:
void do_read_();
void on_read_(beast::error_code ec, size_t bytes_transferred);
void do_write_(beast::http::response<beast::http::string_body> response);
void on_write_(beast::error_code ec, size_t bytes_transferred, bool close);
void do_close_();
// WebSocket upgrade detection
auto try_websocket_upgrade_() -> bool;
private:
beast::tcp_stream stream_;
beast::flat_buffer buffer_;
beast::http::request<beast::http::string_body> request_;
const HTTPRouter& router_;
WebSocketRouter& websocket_router_;
};
30.3.4 WebSocket Support
The HTTP service supports WebSocket upgrades for bidirectional communication:
class WebSocketRouter final {
public:
void add_route(const std::string& pattern, WebSocketHandler handler);
auto match(const std::string& path,
std::unordered_map<std::string, std::string>& params) const
-> std::optional<WebSocketHandler>;
};
class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
public:
WebSocketSession(tcp::socket socket, WebSocketHandler handler);
void run();
void send(const std::string& message);
void close();
private:
void do_read_();
void on_read_(beast::error_code ec, size_t bytes_transferred);
void do_write_();
private:
beast::websocket::stream<tcp::socket> ws_;
WebSocketHandler handler_;
beast::flat_buffer buffer_;
std::queue<std::string> write_queue_;
};
30.3.5 Metrics and Health Endpoints
The HTTP service exposes Prometheus metrics and health endpoints:
void HTTPService::register_metrics_routes_() {
router_.get("/metrics", [this](HTTPRequest& req) {
return prometheus_handler_->handle(req);
});
}
void HTTPService::register_health_routes_() {
router_.get("/health", [](HTTPRequest& req) {
HTTPResponse response;
response.status = 200;
response.body = R"({"status": "healthy"})";
return response;
});
router_.get("/ready", [](HTTPRequest& req) {
HTTPResponse response;
response.status = is_ready() ? 200 : 503;
response.body = is_ready() ?
R"({"status": "ready"})" :
R"({"status": "not ready"})";
return response;
});
}
30.4 PostgreSQL Wire Protocol Service
The PostgreSQL wire protocol service enables compatibility with the extensive PostgreSQL ecosystem (covered in detail in Chapter 28).
30.4.1 Service Configuration
The PostgreSQL service manages server lifecycle and query execution:
class PostgreSQLService final {
public:
PostgreSQLService();
~PostgreSQLService();
void initialize();
void shutdown();
void run();
auto is_enabled() const -> bool;
auto is_running() const -> bool;
// Create independent query executor for testing
auto create_executor()
-> std::shared_ptr<net::pgsql::PostgreSQLQueryExecutor>;
// Access role manager for authentication
auto get_role_manager() -> sql::auth::RoleManager*;
private:
std::unique_ptr<net::pgsql::PostgreSQLServer> server_;
std::shared_ptr<db::document::DocumentDB> db_;
std::unique_ptr<sql::auth::RoleManager> role_manager_;
std::unique_ptr<sql::duckdb::DuckDBManager> duckdb_manager_;
std::unique_ptr<sql::virtual_table::ExternalVirtualTableManager>
virtual_table_manager_;
bool enabled_;
};
30.4.2 Server Implementation
The PostgreSQL server accepts connections and spawns sessions:
class PostgreSQLServer final {
public:
PostgreSQLServer(const config::PostgreSQLOption& options,
QueryExecutorFactory executor_factory);
void start();
void stop();
auto is_running() const -> bool;
auto port() const -> uint16_t;
auto session_count() const -> size_t;
private:
void do_accept_();
void on_accept_(const boost::system::error_code& ec, tcp::socket socket);
void register_session_(std::shared_ptr<PostgreSQLSession> session);
void unregister_session_(PostgreSQLSession* session);
void close_all_sessions_();
void init_ssl_context_();
private:
config::PostgreSQLOption options_;
QueryExecutorFactory executor_factory_;
asio::io_context io_context_;
tcp::acceptor acceptor_;
std::vector<std::thread> threads_;
std::atomic<bool> running_;
std::atomic<size_t> session_count_;
std::unique_ptr<ssl::context> ssl_ctx_;
std::mutex sessions_mutex_;
std::unordered_set<std::shared_ptr<PostgreSQLSession>> sessions_;
};
30.4.3 Query Executor Interface
The query executor interface abstracts SQL execution from the protocol layer:
class QueryExecutor {
public:
virtual ~QueryExecutor() = default;
struct QueryResult {
bool success;
std::string error_message;
ErrorNoticeFields error_fields;
// SELECT results
std::vector<ColumnInfo> columns;
std::vector<std::vector<std::optional<std::vector<uint8_t>>>> rows;
// DML results
std::string command_tag;
int64_t rows_affected;
// Streaming cursor support
uint64_t cursor_id;
bool is_complete;
// COPY support
bool is_copy;
CopyDirection copy_direction;
};
// Simple Query Protocol
virtual auto execute_simple_query(const std::string& query) -> QueryResult = 0;
// Extended Query Protocol
virtual auto parse_query(const std::string& query,
const std::vector<int32_t>& param_oids)
-> QueryResult = 0;
virtual auto execute_portal(
const std::string& query,
const std::vector<std::optional<std::vector<uint8_t>>>& params,
const std::vector<int16_t>& param_formats,
const std::vector<int32_t>& param_oids,
int32_t max_rows,
uint64_t cursor_id) -> QueryResult = 0;
// Schema introspection
virtual auto describe_query(const std::string& query)
-> std::optional<std::vector<ColumnInfo>> = 0;
// Transaction control
virtual void begin_transaction() = 0;
virtual void commit_transaction() = 0;
virtual void rollback_transaction() = 0;
};
30.4.4 Session State Machine
PostgreSQL sessions implement a state machine for protocol handling:
enum class SessionState {
kInitial, // Waiting for startup message
kSSLNegotiation, // SSL negotiation in progress
kAuthentication, // Authentication in progress
kReady, // Ready for queries
kInTransaction, // Inside a transaction
kFailedTransaction, // Transaction has failed
kCopyIn, // Receiving COPY data
kCopyOut, // Sending COPY data
kClosing, // Session is closing
};
The session transitions between states based on client messages and query results:
30.5 Arrow Flight SQL Service
Flight SQL provides columnar data transfer optimized for analytical workloads.
30.5.1 Service Configuration
class FlightSQLService final {
public:
FlightSQLService();
~FlightSQLService();
void initialize();
void shutdown();
void run();
auto is_enabled() const -> bool;
private:
bool enabled_;
std::unique_ptr<net::flight_sql::FlightSQLServer> server_;
std::thread server_thread_;
};
30.5.2 Flight SQL Server Implementation
The Flight SQL server extends Arrow's FlightSqlServerBase:
class FlightSQLServer final : public arrow::flight::sql::FlightSqlServerBase {
public:
explicit FlightSQLServer(db::document::DocumentDB* db,
const config::FlightSQLOption& options);
// Query Execution
auto GetFlightInfoStatement(
const arrow::flight::ServerCallContext& context,
const arrow::flight::sql::StatementQuery& command,
const arrow::flight::FlightDescriptor& descriptor)
-> arrow::Result<std::unique_ptr<arrow::flight::FlightInfo>> override;
auto DoGetStatement(
const arrow::flight::ServerCallContext& context,
const arrow::flight::sql::StatementQueryTicket& command)
-> arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>> override;
// Prepared Statements
auto CreatePreparedStatement(
const arrow::flight::ServerCallContext& context,
const arrow::flight::sql::ActionCreatePreparedStatementRequest& request)
-> arrow::Result<
arrow::flight::sql::ActionCreatePreparedStatementResult> override;
// Metadata APIs
auto DoGetTables(
const arrow::flight::ServerCallContext& context,
const arrow::flight::sql::GetTables& command)
-> arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>> override;
// Transactions
auto BeginTransaction(
const arrow::flight::ServerCallContext& context,
const arrow::flight::sql::ActionBeginTransactionRequest& request)
-> arrow::Result<
arrow::flight::sql::ActionBeginTransactionResult> override;
auto EndTransaction(
const arrow::flight::ServerCallContext& context,
const arrow::flight::sql::ActionEndTransactionRequest& request)
-> arrow::Status override;
private:
db::document::DocumentDB* db_;
const config::FlightSQLOption& options_;
std::unordered_map<std::string, std::shared_ptr<FlightSQLSession>> sessions_;
std::mutex sessions_mutex_;
std::unordered_map<std::string, std::shared_ptr<FlightSQLStatement>>
statements_;
std::mutex statements_mutex_;
};
30.5.3 Two-Phase Query Execution
Flight SQL uses a two-phase execution model for query results:
Phase 1: GetFlightInfo - Client sends query, server returns metadata (schema, endpoints)
auto GetFlightInfoStatement(
const arrow::flight::ServerCallContext& context,
const arrow::flight::sql::StatementQuery& command,
const arrow::flight::FlightDescriptor& descriptor)
-> arrow::Result<std::unique_ptr<arrow::flight::FlightInfo>> {
auto session = get_or_create_session_(context);
auto schema = session->infer_schema(command.query);
if (!schema.ok()) {
return schema.status();
}
// Generate ticket for data retrieval
auto handle = generate_statement_handle_();
statement_queries_[handle] = command.query;
statement_schemas_[handle] = *schema;
auto ticket = arrow::flight::Ticket{handle};
auto endpoint = make_flight_endpoint_(handle);
return arrow::flight::FlightInfo::Make(
**schema, descriptor, {endpoint}, -1, -1);
}
Phase 2: DoGet - Client retrieves data using ticket from Phase 1
auto DoGetStatement(
const arrow::flight::ServerCallContext& context,
const arrow::flight::sql::StatementQueryTicket& command)
-> arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>> {
auto session = get_or_create_session_(context);
auto handle = command.statement_handle;
auto it = statement_queries_.find(handle);
if (it == statement_queries_.end()) {
return arrow::Status::KeyError("Statement handle not found");
}
return session->execute_query_to_stream(it->second);
}
30.5.4 Flight SQL Session Management
Each client connection has an associated session with independent state:
class FlightSQLSession final {
public:
FlightSQLSession(db::document::DocumentDB* db,
const std::string& session_id,
const config::FlightSQLOption& options);
auto session_id() const -> const std::string&;
auto transaction_id() const -> const std::string&;
auto in_transaction() const -> bool;
auto pid() const -> int32_t;
// Query execution
auto get_flight_info_statement(
const std::string& query,
const arrow::flight::FlightDescriptor& descriptor)
-> arrow::Result<std::unique_ptr<arrow::flight::FlightInfo>>;
auto execute_query_to_stream(const std::string& query)
-> arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>>;
auto execute_update(const std::string& query) -> arrow::Result<int64_t>;
// Transaction management
auto begin_transaction() -> arrow::Status;
auto commit() -> arrow::Status;
auto rollback() -> arrow::Status;
// Savepoint management
auto create_savepoint(const std::string& name) -> arrow::Status;
auto release_savepoint(const std::string& name) -> arrow::Status;
auto rollback_to_savepoint(const std::string& name) -> arrow::Status;
private:
std::string session_id_;
std::string transaction_id_;
const config::FlightSQLOption& options_;
std::unique_ptr<sql::session::SQLSession> sql_session_;
bool in_transaction_;
std::optional<arrow::flight::Location> server_location_;
// Statement cache
std::unordered_map<std::string, std::string> statement_queries_;
std::unordered_map<std::string, std::shared_ptr<arrow::Schema>>
statement_schemas_;
};
30.6 Configuration Model
Each protocol has dedicated configuration options:
30.6.1 Network Configuration Structure
struct NetworkOption {
std::vector<NetworkBindingOption> bindings;
NetworkHTTPOption http;
FlightSQLOption flight_sql;
PostgreSQLOption pgsql;
};
struct NetworkHTTPOption {
bool enabled = false;
std::string host = "0.0.0.0";
uint16_t port = 8080;
int32_t num_threads = 4;
int64_t request_timeout_ms = 30000;
int64_t max_body_size = 16 * 1024 * 1024; // 16MB
SSLCredentialsOption ssl;
};
struct FlightSQLOption {
bool enabled = false;
std::string host = "0.0.0.0";
uint16_t port = 31337;
int64_t max_batch_size = 65536;
int64_t statement_timeout_ms = 0;
int64_t statement_cache_ttl_s = 300;
SSLCredentialsOption ssl;
};
struct PostgreSQLOption {
bool enabled = false;
std::string host = "0.0.0.0";
uint16_t port = 5432;
int32_t num_threads = 4;
int64_t statement_timeout_ms = 0;
int64_t idle_session_timeout_ms = 0;
int32_t max_connections = 100;
int64_t max_message_size = 1_GB;
int64_t default_fetch_size = 10000;
PostgreSQLAuthOption auth;
SSLCredentialsOption ssl;
};
30.6.2 SSL/TLS Configuration
All protocols support TLS encryption:
struct SSLCredentialsOption {
bool enabled = false;
std::string cert_path; // Certificate file path
std::string key_path; // Private key file path
std::string ca_path; // CA certificate for client verification
bool require_client_cert = false; // mTLS requirement
};
30.7 Session Management
Cognica tracks active sessions across all protocols through a unified session registry.
30.7.1 Session Registry
The session registry provides cross-protocol visibility into active sessions:
class SessionRegistry {
public:
void register_session(int32_t pid, const SessionInfo& info);
void unregister_session(int32_t pid);
void update_session_state(int32_t pid, SessionState state);
void update_query_info(int32_t pid, const QueryInfo& query);
auto get_all_sessions() const -> std::vector<SessionInfo>;
auto get_session(int32_t pid) const -> std::optional<SessionInfo>;
// Cancel query by session PID
auto cancel_query(int32_t pid) -> bool;
// Terminate session by PID
auto terminate_session(int32_t pid) -> bool;
};
30.7.2 Session Information
Session information enables administrative monitoring:
struct SessionInfo {
int32_t pid; // Process ID
std::string protocol; // "postgresql", "flight_sql", etc.
std::string client_addr; // Client IP address
uint16_t client_port; // Client port
std::string database; // Connected database
std::string username; // Authenticated user
std::chrono::system_clock::time_point backend_start; // Session start time
std::chrono::system_clock::time_point query_start; // Current query start
SessionState state; // Current state
std::string current_query; // Active query text
bool waiting; // Waiting for lock
};
30.7.3 System Views
The session registry powers PostgreSQL-compatible system views:
-- pg_stat_activity equivalent
SELECT pid, usename, application_name, client_addr, state, query
FROM pg_stat_activity;
-- Cancel a query
SELECT pg_cancel_backend(12345);
-- Terminate a session
SELECT pg_terminate_backend(12345);
30.8 Connection Management
Each protocol implements connection lifecycle management appropriate to its characteristics.
30.8.1 HTTP Connection Handling
HTTP connections are short-lived by default, with optional keep-alive:
void HTTPSession::on_write_(beast::error_code ec, size_t bytes_transferred,
bool close) {
if (ec) {
return do_close_();
}
if (close) {
return do_close_();
}
// Keep connection alive for next request
request_ = {};
do_read_();
}
30.8.2 PostgreSQL Connection Pooling
PostgreSQL connections are persistent with idle timeout management:
void PostgreSQLSession::start_idle_timer_() {
if (options_.idle_session_timeout_ms <= 0) {
return;
}
idle_timer_ = std::make_unique<asio::steady_timer>(
socket_.get_executor(),
std::chrono::milliseconds(options_.idle_session_timeout_ms));
idle_timer_->async_wait([this, self = shared_from_this()](
const boost::system::error_code& ec) {
on_idle_timeout_(ec);
});
}
void PostgreSQLSession::on_idle_timeout_(const boost::system::error_code& ec) {
if (ec) {
return; // Timer was cancelled
}
// Close idle connection
close();
}
30.8.3 Flight SQL Connection Reuse
Flight SQL sessions support connection reuse with statement caching:
void FlightSQLSession::cleanup_expired_statements_() {
auto now = std::chrono::steady_clock::now();
auto ttl = std::chrono::seconds(options_.statement_cache_ttl_s);
std::vector<std::string> expired;
for (const auto& [handle, timestamp] : statement_timestamps_) {
if (now - timestamp > ttl) {
expired.push_back(handle);
}
}
for (const auto& handle : expired) {
statement_queries_.erase(handle);
statement_schemas_.erase(handle);
statement_timestamps_.erase(handle);
}
}
30.9 Thread Safety and Concurrency
Multi-protocol services require careful concurrency management.
30.9.1 Strand-Based Serialization
Boost.Asio strands ensure thread-safe message processing:
class PostgreSQLSession {
void queue_message_(const std::vector<uint8_t>& data) {
asio::dispatch(strand_, [this, data]() {
write_buffer_.insert(write_buffer_.end(), data.begin(), data.end());
});
}
private:
asio::strand<asio::any_io_executor> strand_;
};
30.9.2 Shared Resource Protection
Global resources use mutex protection:
class FlightSQLServer {
auto get_or_create_session_(const arrow::flight::ServerCallContext& context)
-> std::shared_ptr<FlightSQLSession> {
auto session_id = extract_session_id_(context);
std::lock_guard<std::mutex> lock(sessions_mutex_);
auto it = sessions_.find(session_id);
if (it != sessions_.end()) {
return it->second;
}
auto session = std::make_shared<FlightSQLSession>(
db_, session_id, options_);
sessions_[session_id] = session;
return session;
}
private:
std::mutex sessions_mutex_;
std::unordered_map<std::string, std::shared_ptr<FlightSQLSession>> sessions_;
};
30.10 Error Handling Across Protocols
Each protocol has distinct error reporting mechanisms that must be respected.
30.10.1 PostgreSQL Error Responses
PostgreSQL uses SQLSTATE codes and structured error fields:
void PostgreSQLSession::send_error_response_(const ErrorNoticeFields& fields) {
auto message = PostgreSQLProtocol::create_error_response(fields);
queue_message_(message);
}
// ErrorNoticeFields includes:
// - severity (ERROR, WARNING, etc.)
// - code (SQLSTATE like "42P01")
// - message (human-readable description)
// - detail (additional context)
// - hint (suggested resolution)
// - position (character position in query)
30.10.2 Flight SQL Error Status
Flight SQL uses Arrow Status for error reporting:
auto execute_query_to_stream(const std::string& query)
-> arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>> {
auto result = execute_query_(query);
if (!result.ok()) {
return arrow::Status::ExecutionError(
"Query execution failed: " + result.status().message());
}
return std::make_unique<arrow::flight::RecordBatchStream>(*result);
}
30.10.3 HTTP Error Responses
HTTP uses status codes and JSON error bodies:
HTTPResponse handle_http_error(const db::Status& status) {
HTTPResponse response;
if (status.IsNotFound()) {
response.status = 404;
} else if (status.IsInvalidArgument()) {
response.status = 400;
} else if (status.IsPermissionDenied()) {
response.status = 403;
} else {
response.status = 500;
}
nlohmann::json error;
error["error"] = status.message();
error["code"] = status.code();
response.body = error.dump();
return response;
}
30.11 Performance Considerations
Multi-protocol support introduces performance trade-offs that require careful tuning.
30.11.1 Thread Pool Sizing
Each protocol may have distinct thread pool requirements:
- HTTP: Thread-per-connection for short requests, or async I/O for high concurrency
- PostgreSQL: Fixed thread pool sized for expected connection count
- Flight SQL: Thread pool sized for batch processing parallelism
30.11.2 Buffer Management
Different protocols have different buffering characteristics:
| Protocol | Buffer Strategy | Typical Size |
|---|---|---|
| HTTP | Per-request allocation | 16KB - 16MB |
| PostgreSQL | Reusable session buffer | 8KB - 1GB |
| Flight SQL | Arrow buffer pool | 64KB batches |
30.11.3 Connection Overhead
Connection establishment costs vary by protocol:
PostgreSQL and Flight SQL benefit significantly from connection pooling due to their higher connection establishment costs.
30.12 Summary
The multi-protocol service layer enables Cognica to serve diverse client ecosystems through a unified architecture.
Key Design Decisions:
-
Type-Erased Service Interface: Enables uniform service management without tight coupling to specific protocol implementations.
-
Protocol-Specific Sessions: Each protocol has dedicated session handling optimized for its characteristics (stateless HTTP vs. stateful PostgreSQL).
-
Shared Query Engine: All protocols execute queries against the same underlying engine, ensuring consistent semantics.
-
Independent Lifecycle: Each protocol service can be enabled, configured, and scaled independently.
Protocol Selection Guidelines:
- PostgreSQL: Use for compatibility with existing tools and ORMs
- Flight SQL: Use for analytical workloads requiring columnar data transfer
- HTTP: Use for administrative interfaces and web-based access
The multi-protocol architecture positions Cognica as a versatile database system capable of serving applications ranging from traditional OLTP workloads to modern data science pipelines, all through a unified storage and query engine.
Exercises
-
Protocol Adapter: Design a protocol adapter that exposes GraphQL queries over the HTTP protocol. The adapter should translate GraphQL queries to SQL and convert results to GraphQL response format.
-
Connection Pooler: Implement an external connection pooler (similar to PgBouncer) that multiplexes client connections to a smaller number of server connections. Consider transaction-level and statement-level pooling modes.
-
Protocol Performance Benchmark: Design a benchmark comparing the three protocols for different workload patterns (point queries, bulk reads, streaming inserts). Analyze the overhead of each protocol's serialization format.
-
Custom Protocol Extension: Design an extension mechanism that allows adding new protocols without modifying the core service layer. Consider plugin discovery, lifecycle management, and configuration integration.
Further Reading
- Boost.Beast HTTP/WebSocket Library Documentation
- Apache Arrow Flight SQL Specification
- PostgreSQL Wire Protocol Documentation (Frontend/Backend Protocol)
- Boost.Asio Asynchronous I/O Framework