Chapter 30: Multi-Protocol Service Layer

Database systems increasingly serve diverse client ecosystems. Traditional applications use PostgreSQL drivers, data science workflows prefer Arrow-native protocols, and monitoring systems scrape HTTP endpoints. This chapter examines Cognica's multi-protocol service layer, which provides unified access through three distinct protocols: PostgreSQL wire protocol, Arrow Flight SQL, and HTTP REST.

30.1 Protocol Diversity in Modern Systems

The proliferation of protocols reflects the specialization of different use cases:

PostgreSQL Wire Protocol: Industry-standard database access enabling compatibility with thousands of existing tools, ORMs, and drivers.

Arrow Flight SQL: Columnar data transfer optimized for analytical workloads, eliminating serialization overhead when integrated with Arrow-native systems.

HTTP REST: Universal access for web applications, administrative interfaces, and systems without specialized client libraries.

30.1.1 Protocol Characteristics

Each protocol offers distinct trade-offs:

ProtocolSerializationConnection ModelType SafetyUse Cases
PostgreSQLCustom binaryPersistentRuntimeApplications, BI tools
Flight SQLArrow IPCPersistentRuntimeAnalytics, data science
HTTPJSON/TextRequest-responseNoneAdmin, monitoring

30.1.2 Unified Query Engine

Despite protocol diversity, all requests eventually execute against the same query engine. The service layer's responsibility is protocol translation—converting protocol-specific requests into internal operations and converting results back to protocol-specific formats:

Loading diagram...

30.2 Service Layer Architecture

The ServiceCoordinator class serves as the unified service coordinator, managing multiple protocol-specific services:

class ServiceCoordinator final {
public:
  ServiceCoordinator();
  ~ServiceCoordinator();

  void initialize();
  void shutdown();

  auto get_services() const -> const std::vector<std::shared_ptr<ServiceType>>&;

  void run();

private:
  std::vector<std::shared_ptr<ServiceType>> services_;
  HTTPService http_service_;
  FlightSQLService flight_sql_service_;
  PostgreSQLService pgsql_service_;
};

30.2.1 Type-Erased Service Interface

Cognica uses a type-erased service interface that enables uniform handling of heterogeneous services:

class Service {
public:
  void initialize() {
    return te::call<void>(
        [](auto& self) { return self.initialize(); },
        *this);
  }

  void shutdown() {
    return te::call<void>(
        [](auto& self) { return self.shutdown(); },
        *this);
  }

  auto get_services() const -> std::vector<std::shared_ptr<ServiceBase>> {
    return te::call<std::vector<std::shared_ptr<ServiceBase>>>(
        [](const auto& self) { return self.get_services(); },
        *this);
  }

  void run() {
    return te::call<void>(
        [](auto& self) { return self.run(); },
        *this);
  }
};

using ServiceType = te::poly<Service>;

This pattern enables the service coordinator to manage services without knowing their concrete types, facilitating plugin-style extensibility.

30.2.2 Lifecycle Management

Services follow a consistent lifecycle:

  1. Construction: Service objects are created with default state
  2. Initialization: initialize() configures resources based on runtime configuration
  3. Running: run() starts accepting connections (may be blocking or non-blocking)
  4. Shutdown: shutdown() gracefully terminates connections and releases resources
void ServiceCoordinator::initialize() {
  // Initialize individual services based on configuration
  http_service_.initialize();
  flight_sql_service_.initialize();
  pgsql_service_.initialize();

  // Register protocol services
  for (auto& service : services_) {
    service->initialize();
  }
}

30.3 HTTP REST Protocol

The HTTP service provides REST endpoints for web-based access and administrative operations.

30.3.1 HTTP Server Architecture

The HTTP server uses Boost.Beast for asynchronous I/O:

class HTTPServer final {
public:
  explicit HTTPServer(const config::NetworkHTTPOption& options);

  // Route registration
  auto router() -> HTTPRouter&;
  auto websocket_router() -> WebSocketRouter&;

  void start();
  void stop();

private:
  void do_accept_();
  void on_accept_(beast::error_code ec, tcp::socket socket);

private:
  config::NetworkHTTPOption options_;
  asio::io_context io_context_;
  tcp::acceptor acceptor_;
  HTTPRouter router_;
  WebSocketRouter websocket_router_;
  std::vector<std::thread> threads_;
  std::atomic<bool> running_;
};

30.3.2 URL Routing with Path Parameters

The router supports RESTful URL patterns with path parameters:

class HTTPRouter final {
public:
  void add_route(HTTPMethod method, const std::string& pattern,
                 RouteHandler handler);

  // Convenience methods
  void get(const std::string& pattern, RouteHandler handler);
  void post(const std::string& pattern, RouteHandler handler);
  void put(const std::string& pattern, RouteHandler handler);
  void del(const std::string& pattern, RouteHandler handler);

  auto route(HTTPRequest& request) const -> HTTPResponse;

private:
  struct RouteEntry {
    HTTPMethod method;
    std::vector<std::string> segments;
    std::vector<std::string> param_names;
    RouteHandler handler;
  };

  auto match_route_(const std::string& path, const RouteEntry& entry,
                    std::unordered_map<std::string, std::string>& params) const
      -> bool;

private:
  std::vector<RouteEntry> routes_;
};

Route patterns support path parameters using curly braces:

void HTTPService::register_collection_routes_() {
  // GET /api/v1/collections
  router_.get("/api/v1/collections", handle_list_collections_);

  // GET /api/v1/collections/{name}
  router_.get("/api/v1/collections/{name}", handle_get_collection_);

  // POST /api/v1/collections/{name}/documents
  router_.post("/api/v1/collections/{name}/documents", handle_insert_);

  // GET /api/v1/collections/{name}/documents/{id}
  router_.get("/api/v1/collections/{name}/documents/{id}", handle_get_document_);
}

30.3.3 HTTP Session Handling

Each connection spawns an HTTP session that handles the request-response cycle:

class HTTPSession : public std::enable_shared_from_this<HTTPSession> {
public:
  HTTPSession(tcp::socket socket, const HTTPRouter& router,
              WebSocketRouter& websocket_router, int64_t request_timeout_ms,
              int64_t max_body_size);

  void run();

private:
  void do_read_();
  void on_read_(beast::error_code ec, size_t bytes_transferred);
  void do_write_(beast::http::response<beast::http::string_body> response);
  void on_write_(beast::error_code ec, size_t bytes_transferred, bool close);
  void do_close_();

  // WebSocket upgrade detection
  auto try_websocket_upgrade_() -> bool;

private:
  beast::tcp_stream stream_;
  beast::flat_buffer buffer_;
  beast::http::request<beast::http::string_body> request_;
  const HTTPRouter& router_;
  WebSocketRouter& websocket_router_;
};

30.3.4 WebSocket Support

The HTTP service supports WebSocket upgrades for bidirectional communication:

class WebSocketRouter final {
public:
  void add_route(const std::string& pattern, WebSocketHandler handler);

  auto match(const std::string& path,
             std::unordered_map<std::string, std::string>& params) const
      -> std::optional<WebSocketHandler>;
};

class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
public:
  WebSocketSession(tcp::socket socket, WebSocketHandler handler);

  void run();
  void send(const std::string& message);
  void close();

private:
  void do_read_();
  void on_read_(beast::error_code ec, size_t bytes_transferred);
  void do_write_();

private:
  beast::websocket::stream<tcp::socket> ws_;
  WebSocketHandler handler_;
  beast::flat_buffer buffer_;
  std::queue<std::string> write_queue_;
};

30.3.5 Metrics and Health Endpoints

The HTTP service exposes Prometheus metrics and health endpoints:

void HTTPService::register_metrics_routes_() {
  router_.get("/metrics", [this](HTTPRequest& req) {
    return prometheus_handler_->handle(req);
  });
}

void HTTPService::register_health_routes_() {
  router_.get("/health", [](HTTPRequest& req) {
    HTTPResponse response;
    response.status = 200;
    response.body = R"({"status": "healthy"})";
    return response;
  });

  router_.get("/ready", [](HTTPRequest& req) {
    HTTPResponse response;
    response.status = is_ready() ? 200 : 503;
    response.body = is_ready() ?
        R"({"status": "ready"})" :
        R"({"status": "not ready"})";
    return response;
  });
}

30.4 PostgreSQL Wire Protocol Service

The PostgreSQL wire protocol service enables compatibility with the extensive PostgreSQL ecosystem (covered in detail in Chapter 28).

30.4.1 Service Configuration

The PostgreSQL service manages server lifecycle and query execution:

class PostgreSQLService final {
public:
  PostgreSQLService();
  ~PostgreSQLService();

  void initialize();
  void shutdown();
  void run();

  auto is_enabled() const -> bool;
  auto is_running() const -> bool;

  // Create independent query executor for testing
  auto create_executor()
      -> std::shared_ptr<net::pgsql::PostgreSQLQueryExecutor>;

  // Access role manager for authentication
  auto get_role_manager() -> sql::auth::RoleManager*;

private:
  std::unique_ptr<net::pgsql::PostgreSQLServer> server_;
  std::shared_ptr<db::document::DocumentDB> db_;
  std::unique_ptr<sql::auth::RoleManager> role_manager_;
  std::unique_ptr<sql::duckdb::DuckDBManager> duckdb_manager_;
  std::unique_ptr<sql::virtual_table::ExternalVirtualTableManager>
      virtual_table_manager_;
  bool enabled_;
};

30.4.2 Server Implementation

The PostgreSQL server accepts connections and spawns sessions:

class PostgreSQLServer final {
public:
  PostgreSQLServer(const config::PostgreSQLOption& options,
                   QueryExecutorFactory executor_factory);

  void start();
  void stop();
  auto is_running() const -> bool;
  auto port() const -> uint16_t;
  auto session_count() const -> size_t;

private:
  void do_accept_();
  void on_accept_(const boost::system::error_code& ec, tcp::socket socket);

  void register_session_(std::shared_ptr<PostgreSQLSession> session);
  void unregister_session_(PostgreSQLSession* session);
  void close_all_sessions_();
  void init_ssl_context_();

private:
  config::PostgreSQLOption options_;
  QueryExecutorFactory executor_factory_;
  asio::io_context io_context_;
  tcp::acceptor acceptor_;
  std::vector<std::thread> threads_;
  std::atomic<bool> running_;
  std::atomic<size_t> session_count_;

  std::unique_ptr<ssl::context> ssl_ctx_;
  std::mutex sessions_mutex_;
  std::unordered_set<std::shared_ptr<PostgreSQLSession>> sessions_;
};

30.4.3 Query Executor Interface

The query executor interface abstracts SQL execution from the protocol layer:

class QueryExecutor {
public:
  virtual ~QueryExecutor() = default;

  struct QueryResult {
    bool success;
    std::string error_message;
    ErrorNoticeFields error_fields;

    // SELECT results
    std::vector<ColumnInfo> columns;
    std::vector<std::vector<std::optional<std::vector<uint8_t>>>> rows;

    // DML results
    std::string command_tag;
    int64_t rows_affected;

    // Streaming cursor support
    uint64_t cursor_id;
    bool is_complete;

    // COPY support
    bool is_copy;
    CopyDirection copy_direction;
  };

  // Simple Query Protocol
  virtual auto execute_simple_query(const std::string& query) -> QueryResult = 0;

  // Extended Query Protocol
  virtual auto parse_query(const std::string& query,
                           const std::vector<int32_t>& param_oids)
      -> QueryResult = 0;

  virtual auto execute_portal(
      const std::string& query,
      const std::vector<std::optional<std::vector<uint8_t>>>& params,
      const std::vector<int16_t>& param_formats,
      const std::vector<int32_t>& param_oids,
      int32_t max_rows,
      uint64_t cursor_id) -> QueryResult = 0;

  // Schema introspection
  virtual auto describe_query(const std::string& query)
      -> std::optional<std::vector<ColumnInfo>> = 0;

  // Transaction control
  virtual void begin_transaction() = 0;
  virtual void commit_transaction() = 0;
  virtual void rollback_transaction() = 0;
};

30.4.4 Session State Machine

PostgreSQL sessions implement a state machine for protocol handling:

enum class SessionState {
  kInitial,            // Waiting for startup message
  kSSLNegotiation,     // SSL negotiation in progress
  kAuthentication,     // Authentication in progress
  kReady,              // Ready for queries
  kInTransaction,      // Inside a transaction
  kFailedTransaction,  // Transaction has failed
  kCopyIn,             // Receiving COPY data
  kCopyOut,            // Sending COPY data
  kClosing,            // Session is closing
};

The session transitions between states based on client messages and query results:

State Transition={kInitialkSSLNegotiationif SSL requestedkInitialkAuthenticationif startup messagekAuthenticationkReadyif auth successkReadykInTransactionif BEGINkInTransactionkReadyif COMMIT/ROLLBACK\text{State Transition} = \begin{cases} \text{kInitial} \rightarrow \text{kSSLNegotiation} & \text{if SSL requested} \\ \text{kInitial} \rightarrow \text{kAuthentication} & \text{if startup message} \\ \text{kAuthentication} \rightarrow \text{kReady} & \text{if auth success} \\ \text{kReady} \rightarrow \text{kInTransaction} & \text{if BEGIN} \\ \text{kInTransaction} \rightarrow \text{kReady} & \text{if COMMIT/ROLLBACK} \end{cases}

30.5 Arrow Flight SQL Service

Flight SQL provides columnar data transfer optimized for analytical workloads.

30.5.1 Service Configuration

class FlightSQLService final {
public:
  FlightSQLService();
  ~FlightSQLService();

  void initialize();
  void shutdown();
  void run();

  auto is_enabled() const -> bool;

private:
  bool enabled_;
  std::unique_ptr<net::flight_sql::FlightSQLServer> server_;
  std::thread server_thread_;
};

30.5.2 Flight SQL Server Implementation

The Flight SQL server extends Arrow's FlightSqlServerBase:

class FlightSQLServer final : public arrow::flight::sql::FlightSqlServerBase {
public:
  explicit FlightSQLServer(db::document::DocumentDB* db,
                           const config::FlightSQLOption& options);

  // Query Execution
  auto GetFlightInfoStatement(
      const arrow::flight::ServerCallContext& context,
      const arrow::flight::sql::StatementQuery& command,
      const arrow::flight::FlightDescriptor& descriptor)
      -> arrow::Result<std::unique_ptr<arrow::flight::FlightInfo>> override;

  auto DoGetStatement(
      const arrow::flight::ServerCallContext& context,
      const arrow::flight::sql::StatementQueryTicket& command)
      -> arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>> override;

  // Prepared Statements
  auto CreatePreparedStatement(
      const arrow::flight::ServerCallContext& context,
      const arrow::flight::sql::ActionCreatePreparedStatementRequest& request)
      -> arrow::Result<
          arrow::flight::sql::ActionCreatePreparedStatementResult> override;

  // Metadata APIs
  auto DoGetTables(
      const arrow::flight::ServerCallContext& context,
      const arrow::flight::sql::GetTables& command)
      -> arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>> override;

  // Transactions
  auto BeginTransaction(
      const arrow::flight::ServerCallContext& context,
      const arrow::flight::sql::ActionBeginTransactionRequest& request)
      -> arrow::Result<
          arrow::flight::sql::ActionBeginTransactionResult> override;

  auto EndTransaction(
      const arrow::flight::ServerCallContext& context,
      const arrow::flight::sql::ActionEndTransactionRequest& request)
      -> arrow::Status override;

private:
  db::document::DocumentDB* db_;
  const config::FlightSQLOption& options_;
  std::unordered_map<std::string, std::shared_ptr<FlightSQLSession>> sessions_;
  std::mutex sessions_mutex_;
  std::unordered_map<std::string, std::shared_ptr<FlightSQLStatement>>
      statements_;
  std::mutex statements_mutex_;
};

30.5.3 Two-Phase Query Execution

Flight SQL uses a two-phase execution model for query results:

Phase 1: GetFlightInfo - Client sends query, server returns metadata (schema, endpoints)

auto GetFlightInfoStatement(
    const arrow::flight::ServerCallContext& context,
    const arrow::flight::sql::StatementQuery& command,
    const arrow::flight::FlightDescriptor& descriptor)
    -> arrow::Result<std::unique_ptr<arrow::flight::FlightInfo>> {

  auto session = get_or_create_session_(context);
  auto schema = session->infer_schema(command.query);
  if (!schema.ok()) {
    return schema.status();
  }

  // Generate ticket for data retrieval
  auto handle = generate_statement_handle_();
  statement_queries_[handle] = command.query;
  statement_schemas_[handle] = *schema;

  auto ticket = arrow::flight::Ticket{handle};
  auto endpoint = make_flight_endpoint_(handle);

  return arrow::flight::FlightInfo::Make(
      **schema, descriptor, {endpoint}, -1, -1);
}

Phase 2: DoGet - Client retrieves data using ticket from Phase 1

auto DoGetStatement(
    const arrow::flight::ServerCallContext& context,
    const arrow::flight::sql::StatementQueryTicket& command)
    -> arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>> {

  auto session = get_or_create_session_(context);
  auto handle = command.statement_handle;

  auto it = statement_queries_.find(handle);
  if (it == statement_queries_.end()) {
    return arrow::Status::KeyError("Statement handle not found");
  }

  return session->execute_query_to_stream(it->second);
}

30.5.4 Flight SQL Session Management

Each client connection has an associated session with independent state:

class FlightSQLSession final {
public:
  FlightSQLSession(db::document::DocumentDB* db,
                   const std::string& session_id,
                   const config::FlightSQLOption& options);

  auto session_id() const -> const std::string&;
  auto transaction_id() const -> const std::string&;
  auto in_transaction() const -> bool;
  auto pid() const -> int32_t;

  // Query execution
  auto get_flight_info_statement(
      const std::string& query,
      const arrow::flight::FlightDescriptor& descriptor)
      -> arrow::Result<std::unique_ptr<arrow::flight::FlightInfo>>;

  auto execute_query_to_stream(const std::string& query)
      -> arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>>;

  auto execute_update(const std::string& query) -> arrow::Result<int64_t>;

  // Transaction management
  auto begin_transaction() -> arrow::Status;
  auto commit() -> arrow::Status;
  auto rollback() -> arrow::Status;

  // Savepoint management
  auto create_savepoint(const std::string& name) -> arrow::Status;
  auto release_savepoint(const std::string& name) -> arrow::Status;
  auto rollback_to_savepoint(const std::string& name) -> arrow::Status;

private:
  std::string session_id_;
  std::string transaction_id_;
  const config::FlightSQLOption& options_;
  std::unique_ptr<sql::session::SQLSession> sql_session_;
  bool in_transaction_;
  std::optional<arrow::flight::Location> server_location_;

  // Statement cache
  std::unordered_map<std::string, std::string> statement_queries_;
  std::unordered_map<std::string, std::shared_ptr<arrow::Schema>>
      statement_schemas_;
};

30.6 Configuration Model

Each protocol has dedicated configuration options:

30.6.1 Network Configuration Structure

struct NetworkOption {
  std::vector<NetworkBindingOption> bindings;
  NetworkHTTPOption http;
  FlightSQLOption flight_sql;
  PostgreSQLOption pgsql;
};

struct NetworkHTTPOption {
  bool enabled = false;
  std::string host = "0.0.0.0";
  uint16_t port = 8080;
  int32_t num_threads = 4;
  int64_t request_timeout_ms = 30000;
  int64_t max_body_size = 16 * 1024 * 1024;  // 16MB
  SSLCredentialsOption ssl;
};

struct FlightSQLOption {
  bool enabled = false;
  std::string host = "0.0.0.0";
  uint16_t port = 31337;
  int64_t max_batch_size = 65536;
  int64_t statement_timeout_ms = 0;
  int64_t statement_cache_ttl_s = 300;
  SSLCredentialsOption ssl;
};

struct PostgreSQLOption {
  bool enabled = false;
  std::string host = "0.0.0.0";
  uint16_t port = 5432;
  int32_t num_threads = 4;
  int64_t statement_timeout_ms = 0;
  int64_t idle_session_timeout_ms = 0;
  int32_t max_connections = 100;
  int64_t max_message_size = 1_GB;
  int64_t default_fetch_size = 10000;
  PostgreSQLAuthOption auth;
  SSLCredentialsOption ssl;
};

30.6.2 SSL/TLS Configuration

All protocols support TLS encryption:

struct SSLCredentialsOption {
  bool enabled = false;
  std::string cert_path;      // Certificate file path
  std::string key_path;       // Private key file path
  std::string ca_path;        // CA certificate for client verification
  bool require_client_cert = false;  // mTLS requirement
};

30.7 Session Management

Cognica tracks active sessions across all protocols through a unified session registry.

30.7.1 Session Registry

The session registry provides cross-protocol visibility into active sessions:

class SessionRegistry {
public:
  void register_session(int32_t pid, const SessionInfo& info);
  void unregister_session(int32_t pid);
  void update_session_state(int32_t pid, SessionState state);
  void update_query_info(int32_t pid, const QueryInfo& query);

  auto get_all_sessions() const -> std::vector<SessionInfo>;
  auto get_session(int32_t pid) const -> std::optional<SessionInfo>;

  // Cancel query by session PID
  auto cancel_query(int32_t pid) -> bool;

  // Terminate session by PID
  auto terminate_session(int32_t pid) -> bool;
};

30.7.2 Session Information

Session information enables administrative monitoring:

struct SessionInfo {
  int32_t pid;                               // Process ID
  std::string protocol;                      // "postgresql", "flight_sql", etc.
  std::string client_addr;                   // Client IP address
  uint16_t client_port;                      // Client port
  std::string database;                      // Connected database
  std::string username;                      // Authenticated user
  std::chrono::system_clock::time_point backend_start;  // Session start time
  std::chrono::system_clock::time_point query_start;    // Current query start
  SessionState state;                        // Current state
  std::string current_query;                 // Active query text
  bool waiting;                              // Waiting for lock
};

30.7.3 System Views

The session registry powers PostgreSQL-compatible system views:

-- pg_stat_activity equivalent
SELECT pid, usename, application_name, client_addr, state, query
FROM pg_stat_activity;

-- Cancel a query
SELECT pg_cancel_backend(12345);

-- Terminate a session
SELECT pg_terminate_backend(12345);

30.8 Connection Management

Each protocol implements connection lifecycle management appropriate to its characteristics.

30.8.1 HTTP Connection Handling

HTTP connections are short-lived by default, with optional keep-alive:

void HTTPSession::on_write_(beast::error_code ec, size_t bytes_transferred,
                             bool close) {
  if (ec) {
    return do_close_();
  }

  if (close) {
    return do_close_();
  }

  // Keep connection alive for next request
  request_ = {};
  do_read_();
}

30.8.2 PostgreSQL Connection Pooling

PostgreSQL connections are persistent with idle timeout management:

void PostgreSQLSession::start_idle_timer_() {
  if (options_.idle_session_timeout_ms <= 0) {
    return;
  }

  idle_timer_ = std::make_unique<asio::steady_timer>(
      socket_.get_executor(),
      std::chrono::milliseconds(options_.idle_session_timeout_ms));

  idle_timer_->async_wait([this, self = shared_from_this()](
                              const boost::system::error_code& ec) {
    on_idle_timeout_(ec);
  });
}

void PostgreSQLSession::on_idle_timeout_(const boost::system::error_code& ec) {
  if (ec) {
    return;  // Timer was cancelled
  }

  // Close idle connection
  close();
}

30.8.3 Flight SQL Connection Reuse

Flight SQL sessions support connection reuse with statement caching:

void FlightSQLSession::cleanup_expired_statements_() {
  auto now = std::chrono::steady_clock::now();
  auto ttl = std::chrono::seconds(options_.statement_cache_ttl_s);

  std::vector<std::string> expired;
  for (const auto& [handle, timestamp] : statement_timestamps_) {
    if (now - timestamp > ttl) {
      expired.push_back(handle);
    }
  }

  for (const auto& handle : expired) {
    statement_queries_.erase(handle);
    statement_schemas_.erase(handle);
    statement_timestamps_.erase(handle);
  }
}

30.9 Thread Safety and Concurrency

Multi-protocol services require careful concurrency management.

30.9.1 Strand-Based Serialization

Boost.Asio strands ensure thread-safe message processing:

class PostgreSQLSession {
  void queue_message_(const std::vector<uint8_t>& data) {
    asio::dispatch(strand_, [this, data]() {
      write_buffer_.insert(write_buffer_.end(), data.begin(), data.end());
    });
  }

private:
  asio::strand<asio::any_io_executor> strand_;
};

30.9.2 Shared Resource Protection

Global resources use mutex protection:

class FlightSQLServer {
  auto get_or_create_session_(const arrow::flight::ServerCallContext& context)
      -> std::shared_ptr<FlightSQLSession> {

    auto session_id = extract_session_id_(context);

    std::lock_guard<std::mutex> lock(sessions_mutex_);

    auto it = sessions_.find(session_id);
    if (it != sessions_.end()) {
      return it->second;
    }

    auto session = std::make_shared<FlightSQLSession>(
        db_, session_id, options_);
    sessions_[session_id] = session;
    return session;
  }

private:
  std::mutex sessions_mutex_;
  std::unordered_map<std::string, std::shared_ptr<FlightSQLSession>> sessions_;
};

30.10 Error Handling Across Protocols

Each protocol has distinct error reporting mechanisms that must be respected.

30.10.1 PostgreSQL Error Responses

PostgreSQL uses SQLSTATE codes and structured error fields:

void PostgreSQLSession::send_error_response_(const ErrorNoticeFields& fields) {
  auto message = PostgreSQLProtocol::create_error_response(fields);
  queue_message_(message);
}

// ErrorNoticeFields includes:
// - severity (ERROR, WARNING, etc.)
// - code (SQLSTATE like "42P01")
// - message (human-readable description)
// - detail (additional context)
// - hint (suggested resolution)
// - position (character position in query)

30.10.2 Flight SQL Error Status

Flight SQL uses Arrow Status for error reporting:

auto execute_query_to_stream(const std::string& query)
    -> arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>> {

  auto result = execute_query_(query);
  if (!result.ok()) {
    return arrow::Status::ExecutionError(
        "Query execution failed: " + result.status().message());
  }

  return std::make_unique<arrow::flight::RecordBatchStream>(*result);
}

30.10.3 HTTP Error Responses

HTTP uses status codes and JSON error bodies:

HTTPResponse handle_http_error(const db::Status& status) {
  HTTPResponse response;

  if (status.IsNotFound()) {
    response.status = 404;
  } else if (status.IsInvalidArgument()) {
    response.status = 400;
  } else if (status.IsPermissionDenied()) {
    response.status = 403;
  } else {
    response.status = 500;
  }

  nlohmann::json error;
  error["error"] = status.message();
  error["code"] = status.code();
  response.body = error.dump();

  return response;
}

30.11 Performance Considerations

Multi-protocol support introduces performance trade-offs that require careful tuning.

30.11.1 Thread Pool Sizing

Each protocol may have distinct thread pool requirements:

Optimal Threads=f(Protocol Characteristics,Workload,Cores)\text{Optimal Threads} = f(\text{Protocol Characteristics}, \text{Workload}, \text{Cores})
  • HTTP: Thread-per-connection for short requests, or async I/O for high concurrency
  • PostgreSQL: Fixed thread pool sized for expected connection count
  • Flight SQL: Thread pool sized for batch processing parallelism

30.11.2 Buffer Management

Different protocols have different buffering characteristics:

ProtocolBuffer StrategyTypical Size
HTTPPer-request allocation16KB - 16MB
PostgreSQLReusable session buffer8KB - 1GB
Flight SQLArrow buffer pool64KB batches

30.11.3 Connection Overhead

Connection establishment costs vary by protocol:

Tconnect={TTCP+TTLSHTTPTTCP+TTLS+TAuth+TStartupParamsPostgreSQLTTCP+TTLS+TAuth+THandshakeFlight SQLT_{\text{connect}} = \begin{cases} T_{\text{TCP}} + T_{\text{TLS}} & \text{HTTP} \\ T_{\text{TCP}} + T_{\text{TLS}} + T_{\text{Auth}} + T_{\text{StartupParams}} & \text{PostgreSQL} \\ T_{\text{TCP}} + T_{\text{TLS}} + T_{\text{Auth}} + T_{\text{Handshake}} & \text{Flight SQL} \end{cases}

PostgreSQL and Flight SQL benefit significantly from connection pooling due to their higher connection establishment costs.

30.12 Summary

The multi-protocol service layer enables Cognica to serve diverse client ecosystems through a unified architecture.

Key Design Decisions:

  1. Type-Erased Service Interface: Enables uniform service management without tight coupling to specific protocol implementations.

  2. Protocol-Specific Sessions: Each protocol has dedicated session handling optimized for its characteristics (stateless HTTP vs. stateful PostgreSQL).

  3. Shared Query Engine: All protocols execute queries against the same underlying engine, ensuring consistent semantics.

  4. Independent Lifecycle: Each protocol service can be enabled, configured, and scaled independently.

Protocol Selection Guidelines:

  • PostgreSQL: Use for compatibility with existing tools and ORMs
  • Flight SQL: Use for analytical workloads requiring columnar data transfer
  • HTTP: Use for administrative interfaces and web-based access

The multi-protocol architecture positions Cognica as a versatile database system capable of serving applications ranging from traditional OLTP workloads to modern data science pipelines, all through a unified storage and query engine.

Exercises

  1. Protocol Adapter: Design a protocol adapter that exposes GraphQL queries over the HTTP protocol. The adapter should translate GraphQL queries to SQL and convert results to GraphQL response format.

  2. Connection Pooler: Implement an external connection pooler (similar to PgBouncer) that multiplexes client connections to a smaller number of server connections. Consider transaction-level and statement-level pooling modes.

  3. Protocol Performance Benchmark: Design a benchmark comparing the three protocols for different workload patterns (point queries, bulk reads, streaming inserts). Analyze the overhead of each protocol's serialization format.

  4. Custom Protocol Extension: Design an extension mechanism that allows adding new protocols without modifying the core service layer. Consider plugin discovery, lifecycle management, and configuration integration.

Further Reading

  • Boost.Beast HTTP/WebSocket Library Documentation
  • Apache Arrow Flight SQL Specification
  • PostgreSQL Wire Protocol Documentation (Frontend/Backend Protocol)
  • Boost.Asio Asynchronous I/O Framework