Chapter 28: PostgreSQL Wire Protocol

28.1 Protocol Fundamentals

28.1.1 Why PostgreSQL Compatibility?

Database adoption hinges on ecosystem compatibility. Applications, ORMs, business intelligence tools, and administrative utilities all speak established protocols. Rather than defining a proprietary protocol and building an ecosystem from scratch, Cognica implements the PostgreSQL wire protocol—enabling immediate compatibility with thousands of existing tools.

This strategic choice provides:

  1. Zero migration friction: Applications connect to Cognica using existing PostgreSQL drivers
  2. Tooling ecosystem: psql, pgAdmin, DBeaver, Metabase, Tableau all work immediately
  3. ORM compatibility: SQLAlchemy, Hibernate, ActiveRecord connect without modification
  4. Operational familiarity: DBAs use familiar commands and workflows

28.1.2 Protocol Version 3.0

Cognica implements PostgreSQL Protocol Version 3.0, introduced in PostgreSQL 7.4 (2003) and still current. The protocol uses a simple message-based format over TCP:

Message Structure:

+------+--------+------------------+
| Type | Length |     Payload      |
+------+--------+------------------+
  1B      4B        (Length-4) B
  • Type: Single ASCII character identifying message type
  • Length: 32-bit big-endian integer including itself but excluding type byte
  • Payload: Message-specific data

The startup message is exceptional—it omits the type byte:

+--------+------------------+
| Length |     Payload      |
+--------+------------------+
   4B        (Length-4) B

28.1.3 Byte Order and String Encoding

The protocol uses network byte order (big-endian) for all multi-byte integers:

auto read_int32(const uint8_t* buf) -> int32_t {
  return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
}

auto write_int32(uint8_t* buf, int32_t value) -> void {
  buf[0] = (value >> 24) & 0xFF;
  buf[1] = (value >> 16) & 0xFF;
  buf[2] = (value >> 8) & 0xFF;
  buf[3] = value & 0xFF;
}

Strings are null-terminated C-style strings encoded in the client's declared encoding (typically UTF-8).

28.1.4 Frontend vs Backend Messages

The protocol distinguishes message direction:

Frontend Messages (Client to Server):

TypeNamePurpose
(none)StartupMessageInitiate connection
'Q'QuerySimple query execution
'P'ParsePrepare a statement
'B'BindBind parameters to statement
'D'DescribeGet statement/portal metadata
'E'ExecuteExecute a portal
'S'SyncSynchronization point
'X'TerminateClose connection

Backend Messages (Server to Client):

TypeNamePurpose
'R'AuthenticationAuthentication request/response
'T'RowDescriptionColumn metadata
'D'DataRowResult row data
'C'CommandCompleteQuery completion
'Z'ReadyForQueryServer ready for next query
'E'ErrorResponseError information

28.2 Connection Lifecycle

28.2.1 Session States

A PostgreSQL session progresses through well-defined states (src/cognica/net/pgsql/pgsql_session.hpp):

enum class SessionState {
  kInitial,           // Awaiting startup message
  kSSLNegotiation,    // SSL handshake in progress
  kAuthentication,    // Authentication in progress
  kReady,             // Ready for queries (idle)
  kInTransaction,     // Inside transaction block
  kFailedTransaction, // Transaction failed, awaiting ROLLBACK
  kCopyIn,            // Receiving COPY data
  kCopyOut,           // Sending COPY data
  kClosing            // Connection closing
};
Loading diagram...

28.2.2 Startup Sequence

The connection begins with a startup message containing protocol version and parameters:

StartupMessage:
+--------+---------+----------------------------------+
| Length | Version | Parameters (key=value pairs)     |
+--------+---------+----------------------------------+
   4B       4B        null-terminated strings, ends with \0

Protocol Version Encoding:

constexpr int32_t kProtocolVersion3 = 196608;  // (3 << 16) | 0

Standard Parameters:

  • user: Username for authentication
  • database: Target database name
  • options: Command-line options
  • application_name: Client application identifier
  • client_encoding: Character encoding (e.g., "UTF8")

Implementation (pgsql_session.cpp):

auto PostgreSQLSession::handle_startup_message_(const MessageReader& reader)
    -> void {
  auto version = reader.read_int32();

  if (version == kSSLRequestCode) {
    handle_ssl_request_();
    return;
  }

  if (version == kCancelRequestCode) {
    handle_cancel_request_(reader);
    return;
  }

  if (version != kProtocolVersion3) {
    send_error_(ErrorBuilder::protocol_violation(
        fmt::format("Unsupported protocol version: {}", version)));
    close_();
    return;
  }

  // Parse parameters
  while (reader.remaining() > 1) {
    auto key = reader.read_cstring();
    if (key.empty()) break;
    auto value = reader.read_cstring();
    startup_params_[key] = value;
  }

  username_ = startup_params_["user"];
  database_ = startup_params_["database"];

  begin_authentication_();
}

28.2.3 SSL/TLS Negotiation

Clients request SSL by sending a special startup message:

SSLRequest:
+--------+----------+
| Length | SSLCode  |
+--------+----------+
   4B       4B (80877103)

The server responds with a single byte:

  • 'S': SSL available, proceed with handshake
  • 'N': SSL not available, continue unencrypted
auto PostgreSQLSession::handle_ssl_request_() -> void {
  if (ssl_enabled_) {
    // Accept SSL
    write_byte_('S');
    flush_write_buffer_();
    start_ssl_handshake_();
  } else {
    // Reject SSL
    write_byte_('N');
    flush_write_buffer_();
    // Client will send regular startup message
    state_ = SessionState::kInitial;
  }
}

After successful SSL handshake, the client sends a regular startup message over the encrypted channel.

28.2.4 Server Response After Startup

After successful authentication, the server sends several messages before ReadyForQuery:

  1. ParameterStatus messages for server configuration:
void send_initial_parameters_() {
  send_parameter_status_("server_version", "17.0");
  send_parameter_status_("server_encoding", "UTF8");
  send_parameter_status_("client_encoding", "UTF8");
  send_parameter_status_("DateStyle", "ISO, MDY");
  send_parameter_status_("TimeZone", "UTC");
  send_parameter_status_("integer_datetimes", "on");
  send_parameter_status_("standard_conforming_strings", "on");
}
  1. BackendKeyData for query cancellation:
void send_backend_key_data_() {
  // Process ID and secret key for cancel requests
  MessageWriter writer;
  writer.write_char('K');
  writer.write_int32(12);  // Length
  writer.write_int32(process_id_);
  writer.write_int32(secret_key_);
  queue_message_(writer.data());
}
  1. ReadyForQuery indicating server is ready:
void send_ready_for_query_(TransactionStatus status) {
  MessageWriter writer;
  writer.write_char('Z');
  writer.write_int32(5);  // Length
  writer.write_char(static_cast<char>(status));
  queue_message_(writer.data());
}

28.3 Authentication

28.3.1 Authentication Methods

PostgreSQL supports multiple authentication mechanisms. Cognica implements:

enum class AuthenticationType : int32_t {
  kOk = 0,                  // Authentication successful
  kCleartextPassword = 3,   // Send password in cleartext
  kMD5Password = 5,         // Send MD5-hashed password
  kSASL = 10,               // SCRAM-SHA-256 initial
  kSASLContinue = 11,       // SCRAM-SHA-256 continue
  kSASLFinal = 12,          // SCRAM-SHA-256 final
};

28.3.2 SCRAM-SHA-256 Authentication

SCRAM (Salted Challenge Response Authentication Mechanism) provides secure password verification without transmitting the password:

Protocol Flow:

Loading diagram...

Implementation (src/cognica/net/pgsql/pgsql_auth.hpp):

class ScramAuthenticator {
public:
  explicit ScramAuthenticator(const std::string& stored_password);

  // Process client-first message, return server-first
  auto process_client_first(std::string_view client_first)
      -> std::expected<std::string, std::string>;

  // Process client-final message, return server-final
  auto process_client_final(std::string_view client_final)
      -> std::expected<std::string, std::string>;

  auto is_authenticated() const -> bool { return state_ == State::kCompleted; }

private:
  enum class State {
    kInitial,
    kClientFirstReceived,
    kServerFirstSent,
    kCompleted,
    kFailed
  };

  State state_{State::kInitial};
  std::string stored_key_;
  std::string server_key_;
  std::string salt_;
  int iterations_;
  std::string client_nonce_;
  std::string server_nonce_;
  std::string auth_message_;
};

Key Derivation:

The SCRAM protocol derives keys from the password:

SaltedPassword=PBKDF2(password,salt,iterations)\text{SaltedPassword} = \text{PBKDF2}(\text{password}, \text{salt}, \text{iterations}) ClientKey=HMAC(SaltedPassword,"Client Key")\text{ClientKey} = \text{HMAC}(\text{SaltedPassword}, \text{"Client Key"}) StoredKey=SHA256(ClientKey)\text{StoredKey} = \text{SHA256}(\text{ClientKey}) ServerKey=HMAC(SaltedPassword,"Server Key")\text{ServerKey} = \text{HMAC}(\text{SaltedPassword}, \text{"Server Key"})

Password Storage Format:

SCRAM-SHA-256$<iterations>:<salt>$<StoredKey>:<ServerKey>

Example:

SCRAM-SHA-256$4096:salt123base64$StoredKeyBase64:ServerKeyBase64

28.3.3 Authentication Message Flow

void PostgreSQLSession::begin_authentication_() {
  state_ = SessionState::kAuthentication;

  auto auth_method = config_.get_auth_method(username_, database_);

  switch (auth_method) {
    case AuthMethod::kTrust:
      // No authentication required
      complete_authentication_();
      break;

    case AuthMethod::kScramSha256: {
      // Request SCRAM-SHA-256
      auto stored_password = credential_store_.get_password(username_);
      scram_auth_ = std::make_unique<ScramAuthenticator>(stored_password);

      MessageWriter writer;
      writer.build_authentication_sasl({"SCRAM-SHA-256"});
      queue_message_(writer.data());
      flush_write_buffer_();
      break;
    }
  }
}

28.4 Simple Query Protocol

28.4.1 Query Message

The Simple Query protocol executes SQL in a single round-trip:

Query Message ('Q'):
+------+--------+------------------+
|  'Q' | Length | Query String \0  |
+------+--------+------------------+

Response Sequence:

  1. For SELECT: RowDescription -> DataRow* -> CommandComplete
  2. For INSERT/UPDATE/DELETE: CommandComplete
  3. For errors: ErrorResponse
  4. Finally: ReadyForQuery

28.4.2 RowDescription Message

Describes the columns of a result set:

void MessageWriter::build_row_description(
    const std::vector<ColumnInfo>& columns) {
  write_char('T');
  auto length_pos = reserve_length_();

  write_int16(columns.size());

  for (const auto& col : columns) {
    write_cstring(col.name);
    write_int32(col.table_oid);      // Table OID (0 if not from table)
    write_int16(col.column_index);   // Column index (0 if not from table)
    write_int32(col.type_oid);       // Data type OID
    write_int16(col.type_size);      // Type size (-1 for variable)
    write_int32(col.type_modifier);  // Type modifier (-1 if none)
    write_int16(col.format_code);    // 0=text, 1=binary
  }

  fill_length_(length_pos);
}

28.4.3 DataRow Message

Contains one row of result data:

void MessageWriter::build_data_row(const std::vector<std::optional<std::string>>& values) {
  write_char('D');
  auto length_pos = reserve_length_();

  write_int16(values.size());

  for (const auto& value : values) {
    if (value.has_value()) {
      write_int32(value->size());
      write_bytes(value->data(), value->size());
    } else {
      write_int32(-1);  // NULL
    }
  }

  fill_length_(length_pos);
}

28.4.4 CommandComplete Message

Indicates successful command completion with a tag:

void MessageWriter::build_command_complete(std::string_view tag) {
  write_char('C');
  auto length_pos = reserve_length_();
  write_cstring(tag);
  fill_length_(length_pos);
}

Command Tags:

  • SELECT n: Query returned n rows
  • INSERT oid n: Inserted n rows (oid for single-row insert)
  • UPDATE n: Updated n rows
  • DELETE n: Deleted n rows
  • CREATE TABLE: DDL completion

28.4.5 Simple Query Execution

void PostgreSQLSession::handle_query_message_(const MessageReader& reader) {
  auto query = reader.read_cstring();

  try {
    auto result = executor_.execute(query);

    if (result.has_rows()) {
      send_row_description_(result.columns());

      for (const auto& row : result.rows()) {
        send_data_row_(row);
      }
    }

    send_command_complete_(result.command_tag());

  } catch (const SQLException& e) {
    send_error_response_(e);
  }

  send_ready_for_query_(get_transaction_status_());
}

28.5 Extended Query Protocol

28.5.1 Protocol Overview

The Extended Query Protocol separates parsing from execution, enabling:

  1. Prepared statements: Parse once, execute many times
  2. Parameter binding: Safe parameter substitution
  3. Binary data: Efficient binary format for parameters and results
  4. Partial execution: Fetch results in batches

Message Sequence:

Parse -> Bind -> Describe (optional) -> Execute -> Sync

28.5.2 Parse Message

Creates a prepared statement from a query string:

Parse Message ('P'):
+------+--------+------+-------+--------+----------+
|  'P' | Length | Name | Query | nParams| ParamOIDs|
+------+--------+------+-------+--------+----------+

Implementation:

void PostgreSQLSession::handle_parse_(const MessageReader& reader) {
  auto stmt_name = reader.read_cstring();
  auto query = reader.read_cstring();
  auto num_params = reader.read_int16();

  std::vector<Oid> param_oids;
  for (int i = 0; i < num_params; i++) {
    param_oids.push_back(reader.read_int32());
  }

  // Parse and analyze query
  auto parse_result = parser_.parse(query);
  if (!parse_result.ok()) {
    set_extended_error_state_();
    send_error_(parse_result.error());
    return;
  }

  // Compute query fingerprint for plan caching
  auto fingerprint = libpg_query::fingerprint(query);

  // Check plan cache
  std::shared_ptr<CompiledPlan> plan;
  if (auto cached = plan_cache_.get(fingerprint)) {
    plan = cached;
  } else {
    plan = compiler_.compile(parse_result.tree(), param_oids);
    plan_cache_.put(fingerprint, plan);
  }

  // Store prepared statement
  auto stmt = std::make_shared<PreparedStatement>(
      stmt_name, query, param_oids, plan);
  prepared_statements_[stmt_name] = stmt;

  send_parse_complete_();
}

28.5.3 Bind Message

Binds parameter values to a prepared statement, creating a portal:

Bind Message ('B'):
+---+------+--------+------+----------+--------+----------+--------+---------+
|'B'|Length|Portal  |Stmt  |nFmtCodes |FmtCodes|nParams   |ParamLen|ParamData|...
+---+------+--------+------+----------+--------+----------+--------+---------+

Format Code Rules:

  • 0 format codes: All parameters use text format
  • 1 format code: All parameters use that format
  • N format codes: Per-parameter format specification

Implementation:

void PostgreSQLSession::handle_bind_(const MessageReader& reader) {
  auto portal_name = reader.read_cstring();
  auto stmt_name = reader.read_cstring();

  // Read parameter format codes
  auto num_format_codes = reader.read_int16();
  std::vector<int16_t> param_formats(num_format_codes);
  for (int i = 0; i < num_format_codes; i++) {
    param_formats[i] = reader.read_int16();
  }

  // Read parameter values
  auto num_params = reader.read_int16();
  std::vector<std::optional<std::vector<uint8_t>>> param_values;

  for (int i = 0; i < num_params; i++) {
    auto len = reader.read_int32();
    if (len == -1) {
      param_values.push_back(std::nullopt);  // NULL
    } else {
      param_values.push_back(reader.read_bytes(len));
    }
  }

  // Read result format codes
  auto num_result_formats = reader.read_int16();
  std::vector<int16_t> result_formats(num_result_formats);
  for (int i = 0; i < num_result_formats; i++) {
    result_formats[i] = reader.read_int16();
  }

  // Get prepared statement
  auto stmt = prepared_statements_[stmt_name];
  if (!stmt) {
    set_extended_error_state_();
    send_error_(ErrorBuilder::invalid_prepared_statement(stmt_name));
    return;
  }

  // Convert parameters to VMValues
  auto params = convert_parameters_(param_values, param_formats,
                                     stmt->param_types());

  // Create portal
  auto portal = std::make_shared<Portal>(
      portal_name, stmt, std::move(params), result_formats);
  portals_[portal_name] = portal;

  send_bind_complete_();
}

28.5.4 Describe Message

Requests metadata about a statement or portal:

Describe Message ('D'):
+------+--------+------+------+
|  'D' | Length | Type | Name |
+------+--------+------+------+
         'S' or 'P'

For Statements ('S'):

  • Sends ParameterDescription: Parameter type OIDs
  • Sends RowDescription or NoData: Result columns

For Portals ('P'):

  • Sends RowDescription or NoData: Result columns with bound parameters
void PostgreSQLSession::handle_describe_(const MessageReader& reader) {
  auto type = reader.read_char();
  auto name = reader.read_cstring();

  if (type == 'S') {
    // Describe statement
    auto stmt = prepared_statements_[name];
    send_parameter_description_(stmt->param_types());

    if (stmt->returns_rows()) {
      send_row_description_(stmt->columns());
    } else {
      send_no_data_();
    }
  } else {
    // Describe portal
    auto portal = portals_[name];

    if (portal->returns_rows()) {
      send_row_description_(portal->columns());
    } else {
      send_no_data_();
    }
  }
}

28.5.5 Execute Message

Executes a portal with optional row limit:

Execute Message ('E'):
+------+--------+--------+----------+
|  'E' | Length | Portal | MaxRows  |
+------+--------+--------+----------+

Implementation:

void PostgreSQLSession::handle_execute_(const MessageReader& reader) {
  auto portal_name = reader.read_cstring();
  auto max_rows = reader.read_int32();  // 0 = no limit

  auto portal = portals_[portal_name];
  if (!portal) {
    set_extended_error_state_();
    send_error_(ErrorBuilder::invalid_portal(portal_name));
    return;
  }

  // Send RowDescription if not already sent
  if (!portal->row_description_sent() && portal->returns_rows()) {
    send_row_description_(portal->columns());
    portal->mark_row_description_sent();
  }

  // Execute with row limit
  auto rows_sent = 0;
  while (portal->has_more_rows()) {
    auto row = portal->fetch_row();
    send_data_row_(row, portal->result_formats());
    rows_sent++;

    if (max_rows > 0 && rows_sent >= max_rows) {
      portal->mark_suspended();
      send_portal_suspended_();
      return;
    }
  }

  send_command_complete_(portal->command_tag());
  portal->mark_exhausted();
}

28.5.6 Sync Message

Marks a synchronization point and requests ReadyForQuery:

void PostgreSQLSession::handle_sync_() {
  // Clear extended query error state
  clear_extended_error_state_();

  // Flush output buffer
  flush_write_buffer_();

  // Send ready for query
  send_ready_for_query_(get_transaction_status_());
}

28.5.7 Error Handling in Extended Query

When an error occurs during Parse, Bind, Describe, or Execute, the session enters an error state where subsequent messages (except Sync) are discarded:

void PostgreSQLSession::handle_extended_message_(char type,
                                                   const MessageReader& reader) {
  if (extended_error_state_ && type != 'S') {
    // Discard message, wait for Sync
    return;
  }

  switch (type) {
    case 'P': handle_parse_(reader); break;
    case 'B': handle_bind_(reader); break;
    case 'D': handle_describe_(reader); break;
    case 'E': handle_execute_(reader); break;
    case 'S': handle_sync_(); break;
    case 'C': handle_close_(reader); break;
  }
}

void PostgreSQLSession::set_extended_error_state_() {
  extended_error_state_ = true;
}

void PostgreSQLSession::clear_extended_error_state_() {
  extended_error_state_ = false;
}

This behavior prevents cascading errors when a pipeline of messages fails partway through.

28.6 Prepared Statements and Plan Caching

28.6.1 PreparedStatement Structure

class PreparedStatement {
public:
  PreparedStatement(std::string name,
                    std::string query,
                    std::vector<Oid> param_types,
                    std::shared_ptr<CompiledPlan> plan);

  auto name() const -> const std::string&;
  auto query() const -> const std::string&;
  auto param_types() const -> const std::vector<Oid>&;
  auto columns() const -> const std::vector<ColumnInfo>&;
  auto plan() const -> std::shared_ptr<CompiledPlan>;

  // For cache invalidation
  auto table_dependencies() const -> const std::vector<TableId>&;
  auto fingerprint() const -> uint64_t;

private:
  std::string name_;
  std::string query_;
  std::vector<Oid> param_types_;
  std::shared_ptr<CompiledPlan> plan_;
  uint64_t fingerprint_;
};

28.6.2 Query Fingerprinting

Cognica uses libpg_query to compute query fingerprints for plan cache lookup:

auto compute_fingerprint(const std::string& query) -> uint64_t {
  auto result = pg_query_fingerprint(query.c_str());
  if (result.error) {
    throw SQLException(result.error->message);
  }
  auto fingerprint = result.fingerprint_int;
  pg_query_free_fingerprint_result(result);
  return fingerprint;
}

The fingerprint normalizes:

  • Literal values: WHERE id = 1 and WHERE id = 2 have the same fingerprint
  • Whitespace and comments
  • Parameter numbering

28.6.3 Plan Cache

class PlanCache {
public:
  auto get(uint64_t fingerprint) -> std::shared_ptr<CompiledPlan>;
  auto put(uint64_t fingerprint, std::shared_ptr<CompiledPlan> plan) -> void;
  auto invalidate(TableId table) -> void;

private:
  struct CacheEntry {
    std::shared_ptr<CompiledPlan> plan;
    std::vector<TableId> dependencies;
    std::chrono::steady_clock::time_point last_used;
  };

  std::unordered_map<uint64_t, CacheEntry> cache_;
  std::unordered_multimap<TableId, uint64_t> dependency_index_;
  std::mutex mutex_;
};

Cache Invalidation:

When a table's schema changes (DDL), all cached plans depending on that table are invalidated:

void PlanCache::invalidate(TableId table) {
  std::lock_guard lock(mutex_);

  auto range = dependency_index_.equal_range(table);
  for (auto it = range.first; it != range.second; ++it) {
    cache_.erase(it->second);
  }
  dependency_index_.erase(table);
}

28.6.4 Direct CVM Execution

For prepared statements, Cognica compiles the query to CVM bytecode with LOAD_PARAM instructions:

auto compile_prepared_query(const ParseTree& tree,
                            const std::vector<Oid>& param_types)
    -> CompiledPlan {
  Compiler compiler;

  // Parameters become LOAD_PARAM instructions
  for (size_t i = 0; i < param_types.size(); i++) {
    compiler.register_parameter(i, oid_to_type(param_types[i]));
  }

  return compiler.compile(tree);
}

At execution time, bound parameters are loaded directly without SQL string manipulation—eliminating SQL injection risks:

void execute_prepared(const CompiledPlan& plan,
                      const std::vector<VMValue>& params) {
  CVMContext ctx;

  // Set parameter values
  for (size_t i = 0; i < params.size(); i++) {
    ctx.set_parameter(i, params[i]);
  }

  // Execute bytecode
  cvm_.execute(plan.bytecode(), ctx);
}

28.7 Data Type Handling

28.7.1 Type OID Mapping

PostgreSQL identifies types by Object Identifiers (OIDs). Cognica maps between OIDs and internal types:

constexpr Oid kOidBool = 16;
constexpr Oid kOidInt2 = 21;
constexpr Oid kOidInt4 = 23;
constexpr Oid kOidInt8 = 20;
constexpr Oid kOidFloat4 = 700;
constexpr Oid kOidFloat8 = 701;
constexpr Oid kOidText = 25;
constexpr Oid kOidVarchar = 1043;
constexpr Oid kOidTimestamp = 1114;
constexpr Oid kOidTimestamptz = 1184;
constexpr Oid kOidJson = 114;
constexpr Oid kOidJsonb = 3802;
constexpr Oid kOidUuid = 2950;
constexpr Oid kOidBytea = 17;

auto cognica_type_to_oid(ColumnType type) -> Oid {
  switch (type) {
    case ColumnType::kBool: return kOidBool;
    case ColumnType::kInt32: return kOidInt4;
    case ColumnType::kInt64: return kOidInt8;
    case ColumnType::kFloat: return kOidFloat4;
    case ColumnType::kDouble: return kOidFloat8;
    case ColumnType::kString: return kOidText;
    case ColumnType::kTimestamp: return kOidTimestamptz;
    case ColumnType::kJson: return kOidJsonb;
    case ColumnType::kBytes: return kOidBytea;
    default: return kOidText;
  }
}

28.7.2 Text vs Binary Format

The protocol supports two data formats:

Text Format (format code 0):

  • Human-readable ASCII/UTF-8 representation
  • Portable across versions
  • Easier to debug

Binary Format (format code 1):

  • Network byte order encoding
  • More efficient for large values
  • Type-specific encoding
auto encode_value_text(const VMValue& value, ColumnType type) -> std::string {
  switch (type) {
    case ColumnType::kBool:
      return value.as_bool() ? "t" : "f";
    case ColumnType::kInt32:
      return std::to_string(value.as_int32());
    case ColumnType::kInt64:
      return std::to_string(value.as_int64());
    case ColumnType::kDouble:
      return fmt::format("{}", value.as_double());
    case ColumnType::kTimestamp:
      return format_timestamp(value.as_timestamp());
    default:
      return value.as_string();
  }
}

auto encode_value_binary(const VMValue& value, ColumnType type)
    -> std::vector<uint8_t> {
  std::vector<uint8_t> buf;
  switch (type) {
    case ColumnType::kBool:
      buf.push_back(value.as_bool() ? 1 : 0);
      break;
    case ColumnType::kInt32:
      write_int32_be(buf, value.as_int32());
      break;
    case ColumnType::kInt64:
      write_int64_be(buf, value.as_int64());
      break;
    case ColumnType::kDouble:
      write_double_be(buf, value.as_double());
      break;
    // ...
  }
  return buf;
}

28.7.3 Parameter Conversion

Incoming parameters are converted from wire format to VMValue:

auto convert_parameter(const std::vector<uint8_t>& data,
                       int16_t format,
                       Oid type_oid) -> VMValue {
  if (format == 0) {
    // Text format
    std::string text(data.begin(), data.end());
    return parse_text_value(text, type_oid);
  } else {
    // Binary format
    return parse_binary_value(data, type_oid);
  }
}

auto parse_text_value(const std::string& text, Oid type_oid) -> VMValue {
  switch (type_oid) {
    case kOidBool:
      return VMValue(text == "t" || text == "true" || text == "1");
    case kOidInt4:
      return VMValue(std::stoi(text));
    case kOidInt8:
      return VMValue(std::stoll(text));
    case kOidFloat8:
      return VMValue(std::stod(text));
    case kOidTimestamptz:
      return VMValue(parse_timestamp(text));
    default:
      return VMValue(text);
  }
}

28.8 Error Handling

28.8.1 Error Response Format

PostgreSQL errors include extensive metadata:

struct ErrorFields {
  char severity;              // 'S': ERROR, WARNING, etc.
  std::string sqlstate;       // 'C': 5-character SQL state code
  std::string message;        // 'M': Primary message
  std::string detail;         // 'D': Detail message
  std::string hint;           // 'H': Hint for resolution
  std::string position;       // 'P': Cursor position in query
  std::string where;          // 'W': Call stack
  std::string schema_name;    // 's': Schema name
  std::string table_name;     // 't': Table name
  std::string column_name;    // 'c': Column name
  std::string constraint_name;// 'n': Constraint name
  std::string file;           // 'F': Source file
  std::string line;           // 'L': Source line
  std::string routine;        // 'R': Function name
};

28.8.2 SQL State Codes

Cognica implements standard PostgreSQL error codes (src/cognica/net/pgsql/pgsql_error_codes.hpp):

namespace sqlstate {
  // Class 00 - Successful Completion
  constexpr const char* kSuccessfulCompletion = "00000";

  // Class 08 - Connection Exception
  constexpr const char* kConnectionException = "08000";
  constexpr const char* kProtocolViolation = "08P01";

  // Class 23 - Integrity Constraint Violation
  constexpr const char* kIntegrityConstraintViolation = "23000";
  constexpr const char* kUniqueViolation = "23505";
  constexpr const char* kForeignKeyViolation = "23503";

  // Class 42 - Syntax Error or Access Rule Violation
  constexpr const char* kSyntaxError = "42601";
  constexpr const char* kUndefinedTable = "42P01";
  constexpr const char* kUndefinedColumn = "42703";

  // Class 57 - Operator Intervention
  constexpr const char* kQueryCanceled = "57014";
  constexpr const char* kIdleSessionTimeout = "57P05";
}

28.8.3 Error Builder

A helper class constructs common errors:

class ErrorBuilder {
public:
  static auto syntax_error(std::string_view message, int position = -1)
      -> ErrorFields {
    return ErrorFields{
        .severity = 'E',
        .sqlstate = sqlstate::kSyntaxError,
        .message = std::string(message),
        .position = position >= 0 ? std::to_string(position) : ""
    };
  }

  static auto undefined_table(std::string_view table_name) -> ErrorFields {
    return ErrorFields{
        .severity = 'E',
        .sqlstate = sqlstate::kUndefinedTable,
        .message = fmt::format("relation \"{}\" does not exist", table_name),
        .table_name = std::string(table_name)
    };
  }

  static auto query_canceled() -> ErrorFields {
    return ErrorFields{
        .severity = 'E',
        .sqlstate = sqlstate::kQueryCanceled,
        .message = "canceling statement due to user request"
    };
  }
};

28.8.4 Sending Error Response

void MessageWriter::build_error_response(const ErrorFields& fields) {
  write_char('E');
  auto length_pos = reserve_length_();

  write_char('S');
  write_cstring(fields.severity == 'E' ? "ERROR" : "WARNING");

  write_char('V');
  write_cstring(fields.severity == 'E' ? "ERROR" : "WARNING");

  write_char('C');
  write_cstring(fields.sqlstate);

  write_char('M');
  write_cstring(fields.message);

  if (!fields.detail.empty()) {
    write_char('D');
    write_cstring(fields.detail);
  }

  if (!fields.hint.empty()) {
    write_char('H');
    write_cstring(fields.hint);
  }

  // ... other fields

  write_char('\0');  // Terminator
  fill_length_(length_pos);
}

28.9 Transaction State Management

28.9.1 Transaction Status

The ReadyForQuery message indicates current transaction state:

enum class TransactionStatus : char {
  kIdle = 'I',           // Not in transaction block
  kInTransaction = 'T',  // In transaction block
  kFailed = 'E'          // Failed transaction block
};

28.9.2 Implicit vs Explicit Transactions

Implicit Transaction (autocommit):

INSERT INTO users (name) VALUES ('Alice');
-- Automatically committed

Explicit Transaction:

BEGIN;
INSERT INTO users (name) VALUES ('Alice');
INSERT INTO accounts (user_id) VALUES (1);
COMMIT;

28.9.3 Failed Transaction Handling

After an error in a transaction block, all subsequent commands fail until ROLLBACK:

void PostgreSQLSession::execute_in_transaction_(const std::string& query) {
  if (transaction_status_ == TransactionStatus::kFailed) {
    if (!is_rollback_command(query)) {
      send_error_(ErrorBuilder::in_failed_transaction());
      return;
    }
  }

  try {
    auto result = executor_.execute(query, transaction_.get());
    send_result_(result);

    if (is_commit_command(query) || is_rollback_command(query)) {
      transaction_status_ = TransactionStatus::kIdle;
      transaction_.reset();
    }
  } catch (const SQLException& e) {
    send_error_(e);
    transaction_status_ = TransactionStatus::kFailed;
  }
}

28.10 Query Cancellation

28.10.1 Cancel Request Protocol

Clients can cancel running queries via a separate connection:

CancelRequest:
+--------+------------+-----+--------+
| Length | CancelCode | PID | Secret |
+--------+------------+-----+--------+
   4B       4B          4B     4B

The cancel code is 80877102.

28.10.2 Implementation

void PostgreSQLSession::handle_cancel_request_(const MessageReader& reader) {
  auto process_id = reader.read_int32();
  auto secret_key = reader.read_int32();

  // Find and cancel the target session
  auto cancelled = session_registry_.cancel_session(process_id, secret_key);

  // Close this connection (cancel requests don't get responses)
  close_();
}

The target session checks for cancellation during query execution:

void QueryExecutor::check_cancellation_() {
  if (session_->is_cancelled()) {
    throw SQLException(ErrorBuilder::query_canceled());
  }
}

28.10.3 Session Registry

The SessionRegistry tracks active sessions for cancellation and monitoring:

class SessionRegistry {
public:
  void register_session(int32_t pid, int32_t secret_key,
                        PostgreSQLSession* session);
  void unregister_session(int32_t pid);

  auto cancel_session(int32_t pid, int32_t secret_key) -> bool;
  auto terminate_session(int32_t pid) -> bool;

  // For pg_stat_activity
  auto get_all_sessions() const -> std::vector<SessionInfo>;

private:
  struct Entry {
    int32_t secret_key;
    PostgreSQLSession* session;
    std::chrono::system_clock::time_point backend_start;
  };

  std::unordered_map<int32_t, Entry> sessions_;
  std::shared_mutex mutex_;
};

28.11 COPY Protocol

28.11.1 COPY FROM (Import)

Loading diagram...

28.11.2 COPY TO (Export)

Loading diagram...

28.11.3 CopyInResponse/CopyOutResponse

void MessageWriter::build_copy_in_response(int8_t format,
                                            const std::vector<int16_t>& formats) {
  write_char('G');
  auto length_pos = reserve_length_();

  write_int8(format);  // 0=text, 1=binary
  write_int16(formats.size());
  for (auto f : formats) {
    write_int16(f);
  }

  fill_length_(length_pos);
}

28.12 Async I/O Architecture

28.12.1 Boost.ASIO Integration

Cognica uses Boost.ASIO for non-blocking I/O:

class PostgreSQLSession : public std::enable_shared_from_this<PostgreSQLSession> {
public:
  PostgreSQLSession(asio::io_context& io_context,
                    asio::ssl::context& ssl_context);

private:
  using SSLStream = asio::ssl::stream<asio::ip::tcp::socket>;

  void do_read_header_();
  void do_read_body_(size_t body_length);
  void do_write_();

  asio::io_context& io_context_;
  std::unique_ptr<SSLStream> socket_;
  asio::strand<asio::io_context::executor_type> strand_;

  std::vector<uint8_t> read_buffer_;
  std::vector<uint8_t> write_buffer_;
};

28.12.2 Read Pipeline

void PostgreSQLSession::do_read_header_() {
  auto self = shared_from_this();

  // Read 5 bytes: 1 type + 4 length
  asio::async_read(
      *socket_,
      asio::buffer(read_buffer_.data(), 5),
      asio::bind_executor(strand_, [this, self](auto ec, auto bytes) {
        if (ec) {
          handle_error_(ec);
          return;
        }

        auto type = read_buffer_[0];
        auto length = read_int32_be(read_buffer_.data() + 1);

        do_read_body_(length - 4);
      }));
}

void PostgreSQLSession::do_read_body_(size_t body_length) {
  auto self = shared_from_this();

  read_buffer_.resize(5 + body_length);

  asio::async_read(
      *socket_,
      asio::buffer(read_buffer_.data() + 5, body_length),
      asio::bind_executor(strand_, [this, self](auto ec, auto bytes) {
        if (ec) {
          handle_error_(ec);
          return;
        }

        on_message_complete_();
        do_read_header_();  // Continue reading
      }));
}

28.12.3 Write Pipeline

void PostgreSQLSession::queue_message_(std::span<const uint8_t> data) {
  write_buffer_.insert(write_buffer_.end(), data.begin(), data.end());
}

void PostgreSQLSession::flush_write_buffer_() {
  if (write_buffer_.empty() || write_in_progress_) {
    return;
  }

  write_in_progress_ = true;
  auto self = shared_from_this();

  asio::async_write(
      *socket_,
      asio::buffer(write_buffer_),
      asio::bind_executor(strand_, [this, self](auto ec, auto bytes) {
        write_in_progress_ = false;

        if (ec) {
          handle_error_(ec);
          return;
        }

        write_buffer_.clear();

        // Check if more data queued during write
        if (!pending_writes_.empty()) {
          flush_write_buffer_();
        }
      }));
}

28.13 Performance Considerations

28.13.1 Connection Pooling

While Cognica doesn't implement connection pooling internally, the protocol supports it through external poolers (PgBouncer, Pgpool-II):

  • Session pooling: Connections assigned for entire session
  • Transaction pooling: Connections assigned per transaction
  • Statement pooling: Connections assigned per statement

Cognica's stateless query execution supports all pooling modes.

28.13.2 Prepared Statement Benefits

Prepared statements provide significant performance benefits:

AspectSimple QueryPrepared Statement
ParsingEvery executionOnce
PlanningEvery executionOnce (cached)
Parameter safetyString escapingBinary binding
Network overheadFull query textParameter values only

28.13.3 Binary Format Benefits

Binary format reduces CPU and bandwidth:

TypeText SizeBinary SizeSavings
int321-11 bytes4 bytesVariable
int641-20 bytes8 bytesVariable
double1-24 bytes8 bytesVariable
timestamp26 bytes8 bytes69%
UUID36 bytes16 bytes56%

28.14 Summary

Cognica's PostgreSQL Wire Protocol implementation enables seamless integration with the PostgreSQL ecosystem:

  1. Protocol Completeness: Full support for both Simple Query and Extended Query protocols
  2. Security: SCRAM-SHA-256 authentication, SSL/TLS encryption
  3. Performance: Plan caching, binary format, prepared statement optimization
  4. Compatibility: Standard error codes, transaction semantics, cancellation support
  5. Async Architecture: Non-blocking I/O for high concurrency

Key implementation highlights:

  • Session state machine managing connection lifecycle
  • Extended Query Protocol with Parse/Bind/Describe/Execute phases
  • Query fingerprinting for plan cache lookup
  • Direct CVM execution for prepared statements
  • Comprehensive error handling with PostgreSQL-compatible codes

The wire protocol layer serves as Cognica's primary interface, translating between the PostgreSQL world and Cognica's internal execution engine.

Copyright (c) 2023-2026 Cognica, Inc.