Chapter 28: PostgreSQL Wire Protocol
28.1 Protocol Fundamentals
28.1.1 Why PostgreSQL Compatibility?
Database adoption hinges on ecosystem compatibility. Applications, ORMs, business intelligence tools, and administrative utilities all speak established protocols. Rather than defining a proprietary protocol and building an ecosystem from scratch, Cognica implements the PostgreSQL wire protocol—enabling immediate compatibility with thousands of existing tools.
This strategic choice provides:
- Zero migration friction: Applications connect to Cognica using existing PostgreSQL drivers
- Tooling ecosystem: psql, pgAdmin, DBeaver, Metabase, Tableau all work immediately
- ORM compatibility: SQLAlchemy, Hibernate, ActiveRecord connect without modification
- Operational familiarity: DBAs use familiar commands and workflows
28.1.2 Protocol Version 3.0
Cognica implements PostgreSQL Protocol Version 3.0, introduced in PostgreSQL 7.4 (2003) and still current. The protocol uses a simple message-based format over TCP:
Message Structure:
+------+--------+------------------+
| Type | Length | Payload |
+------+--------+------------------+
1B 4B (Length-4) B
- Type: Single ASCII character identifying message type
- Length: 32-bit big-endian integer including itself but excluding type byte
- Payload: Message-specific data
The startup message is exceptional—it omits the type byte:
+--------+------------------+
| Length | Payload |
+--------+------------------+
4B (Length-4) B
28.1.3 Byte Order and String Encoding
The protocol uses network byte order (big-endian) for all multi-byte integers:
auto read_int32(const uint8_t* buf) -> int32_t {
return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
}
auto write_int32(uint8_t* buf, int32_t value) -> void {
buf[0] = (value >> 24) & 0xFF;
buf[1] = (value >> 16) & 0xFF;
buf[2] = (value >> 8) & 0xFF;
buf[3] = value & 0xFF;
}
Strings are null-terminated C-style strings encoded in the client's declared encoding (typically UTF-8).
28.1.4 Frontend vs Backend Messages
The protocol distinguishes message direction:
Frontend Messages (Client to Server):
| Type | Name | Purpose |
|---|---|---|
| (none) | StartupMessage | Initiate connection |
| 'Q' | Query | Simple query execution |
| 'P' | Parse | Prepare a statement |
| 'B' | Bind | Bind parameters to statement |
| 'D' | Describe | Get statement/portal metadata |
| 'E' | Execute | Execute a portal |
| 'S' | Sync | Synchronization point |
| 'X' | Terminate | Close connection |
Backend Messages (Server to Client):
| Type | Name | Purpose |
|---|---|---|
| 'R' | Authentication | Authentication request/response |
| 'T' | RowDescription | Column metadata |
| 'D' | DataRow | Result row data |
| 'C' | CommandComplete | Query completion |
| 'Z' | ReadyForQuery | Server ready for next query |
| 'E' | ErrorResponse | Error information |
28.2 Connection Lifecycle
28.2.1 Session States
A PostgreSQL session progresses through well-defined states (src/cognica/net/pgsql/pgsql_session.hpp):
enum class SessionState {
kInitial, // Awaiting startup message
kSSLNegotiation, // SSL handshake in progress
kAuthentication, // Authentication in progress
kReady, // Ready for queries (idle)
kInTransaction, // Inside transaction block
kFailedTransaction, // Transaction failed, awaiting ROLLBACK
kCopyIn, // Receiving COPY data
kCopyOut, // Sending COPY data
kClosing // Connection closing
};
28.2.2 Startup Sequence
The connection begins with a startup message containing protocol version and parameters:
StartupMessage:
+--------+---------+----------------------------------+
| Length | Version | Parameters (key=value pairs) |
+--------+---------+----------------------------------+
4B 4B null-terminated strings, ends with \0
Protocol Version Encoding:
constexpr int32_t kProtocolVersion3 = 196608; // (3 << 16) | 0
Standard Parameters:
user: Username for authenticationdatabase: Target database nameoptions: Command-line optionsapplication_name: Client application identifierclient_encoding: Character encoding (e.g., "UTF8")
Implementation (pgsql_session.cpp):
auto PostgreSQLSession::handle_startup_message_(const MessageReader& reader)
-> void {
auto version = reader.read_int32();
if (version == kSSLRequestCode) {
handle_ssl_request_();
return;
}
if (version == kCancelRequestCode) {
handle_cancel_request_(reader);
return;
}
if (version != kProtocolVersion3) {
send_error_(ErrorBuilder::protocol_violation(
fmt::format("Unsupported protocol version: {}", version)));
close_();
return;
}
// Parse parameters
while (reader.remaining() > 1) {
auto key = reader.read_cstring();
if (key.empty()) break;
auto value = reader.read_cstring();
startup_params_[key] = value;
}
username_ = startup_params_["user"];
database_ = startup_params_["database"];
begin_authentication_();
}
28.2.3 SSL/TLS Negotiation
Clients request SSL by sending a special startup message:
SSLRequest:
+--------+----------+
| Length | SSLCode |
+--------+----------+
4B 4B (80877103)
The server responds with a single byte:
'S': SSL available, proceed with handshake'N': SSL not available, continue unencrypted
auto PostgreSQLSession::handle_ssl_request_() -> void {
if (ssl_enabled_) {
// Accept SSL
write_byte_('S');
flush_write_buffer_();
start_ssl_handshake_();
} else {
// Reject SSL
write_byte_('N');
flush_write_buffer_();
// Client will send regular startup message
state_ = SessionState::kInitial;
}
}
After successful SSL handshake, the client sends a regular startup message over the encrypted channel.
28.2.4 Server Response After Startup
After successful authentication, the server sends several messages before ReadyForQuery:
- ParameterStatus messages for server configuration:
void send_initial_parameters_() {
send_parameter_status_("server_version", "17.0");
send_parameter_status_("server_encoding", "UTF8");
send_parameter_status_("client_encoding", "UTF8");
send_parameter_status_("DateStyle", "ISO, MDY");
send_parameter_status_("TimeZone", "UTC");
send_parameter_status_("integer_datetimes", "on");
send_parameter_status_("standard_conforming_strings", "on");
}
- BackendKeyData for query cancellation:
void send_backend_key_data_() {
// Process ID and secret key for cancel requests
MessageWriter writer;
writer.write_char('K');
writer.write_int32(12); // Length
writer.write_int32(process_id_);
writer.write_int32(secret_key_);
queue_message_(writer.data());
}
- ReadyForQuery indicating server is ready:
void send_ready_for_query_(TransactionStatus status) {
MessageWriter writer;
writer.write_char('Z');
writer.write_int32(5); // Length
writer.write_char(static_cast<char>(status));
queue_message_(writer.data());
}
28.3 Authentication
28.3.1 Authentication Methods
PostgreSQL supports multiple authentication mechanisms. Cognica implements:
enum class AuthenticationType : int32_t {
kOk = 0, // Authentication successful
kCleartextPassword = 3, // Send password in cleartext
kMD5Password = 5, // Send MD5-hashed password
kSASL = 10, // SCRAM-SHA-256 initial
kSASLContinue = 11, // SCRAM-SHA-256 continue
kSASLFinal = 12, // SCRAM-SHA-256 final
};
28.3.2 SCRAM-SHA-256 Authentication
SCRAM (Salted Challenge Response Authentication Mechanism) provides secure password verification without transmitting the password:
Protocol Flow:
Implementation (src/cognica/net/pgsql/pgsql_auth.hpp):
class ScramAuthenticator {
public:
explicit ScramAuthenticator(const std::string& stored_password);
// Process client-first message, return server-first
auto process_client_first(std::string_view client_first)
-> std::expected<std::string, std::string>;
// Process client-final message, return server-final
auto process_client_final(std::string_view client_final)
-> std::expected<std::string, std::string>;
auto is_authenticated() const -> bool { return state_ == State::kCompleted; }
private:
enum class State {
kInitial,
kClientFirstReceived,
kServerFirstSent,
kCompleted,
kFailed
};
State state_{State::kInitial};
std::string stored_key_;
std::string server_key_;
std::string salt_;
int iterations_;
std::string client_nonce_;
std::string server_nonce_;
std::string auth_message_;
};
Key Derivation:
The SCRAM protocol derives keys from the password:
Password Storage Format:
SCRAM-SHA-256$<iterations>:<salt>$<StoredKey>:<ServerKey>
Example:
SCRAM-SHA-256$4096:salt123base64$StoredKeyBase64:ServerKeyBase64
28.3.3 Authentication Message Flow
void PostgreSQLSession::begin_authentication_() {
state_ = SessionState::kAuthentication;
auto auth_method = config_.get_auth_method(username_, database_);
switch (auth_method) {
case AuthMethod::kTrust:
// No authentication required
complete_authentication_();
break;
case AuthMethod::kScramSha256: {
// Request SCRAM-SHA-256
auto stored_password = credential_store_.get_password(username_);
scram_auth_ = std::make_unique<ScramAuthenticator>(stored_password);
MessageWriter writer;
writer.build_authentication_sasl({"SCRAM-SHA-256"});
queue_message_(writer.data());
flush_write_buffer_();
break;
}
}
}
28.4 Simple Query Protocol
28.4.1 Query Message
The Simple Query protocol executes SQL in a single round-trip:
Query Message ('Q'):
+------+--------+------------------+
| 'Q' | Length | Query String \0 |
+------+--------+------------------+
Response Sequence:
- For SELECT:
RowDescription->DataRow* ->CommandComplete - For INSERT/UPDATE/DELETE:
CommandComplete - For errors:
ErrorResponse - Finally:
ReadyForQuery
28.4.2 RowDescription Message
Describes the columns of a result set:
void MessageWriter::build_row_description(
const std::vector<ColumnInfo>& columns) {
write_char('T');
auto length_pos = reserve_length_();
write_int16(columns.size());
for (const auto& col : columns) {
write_cstring(col.name);
write_int32(col.table_oid); // Table OID (0 if not from table)
write_int16(col.column_index); // Column index (0 if not from table)
write_int32(col.type_oid); // Data type OID
write_int16(col.type_size); // Type size (-1 for variable)
write_int32(col.type_modifier); // Type modifier (-1 if none)
write_int16(col.format_code); // 0=text, 1=binary
}
fill_length_(length_pos);
}
28.4.3 DataRow Message
Contains one row of result data:
void MessageWriter::build_data_row(const std::vector<std::optional<std::string>>& values) {
write_char('D');
auto length_pos = reserve_length_();
write_int16(values.size());
for (const auto& value : values) {
if (value.has_value()) {
write_int32(value->size());
write_bytes(value->data(), value->size());
} else {
write_int32(-1); // NULL
}
}
fill_length_(length_pos);
}
28.4.4 CommandComplete Message
Indicates successful command completion with a tag:
void MessageWriter::build_command_complete(std::string_view tag) {
write_char('C');
auto length_pos = reserve_length_();
write_cstring(tag);
fill_length_(length_pos);
}
Command Tags:
SELECT n: Query returned n rowsINSERT oid n: Inserted n rows (oid for single-row insert)UPDATE n: Updated n rowsDELETE n: Deleted n rowsCREATE TABLE: DDL completion
28.4.5 Simple Query Execution
void PostgreSQLSession::handle_query_message_(const MessageReader& reader) {
auto query = reader.read_cstring();
try {
auto result = executor_.execute(query);
if (result.has_rows()) {
send_row_description_(result.columns());
for (const auto& row : result.rows()) {
send_data_row_(row);
}
}
send_command_complete_(result.command_tag());
} catch (const SQLException& e) {
send_error_response_(e);
}
send_ready_for_query_(get_transaction_status_());
}
28.5 Extended Query Protocol
28.5.1 Protocol Overview
The Extended Query Protocol separates parsing from execution, enabling:
- Prepared statements: Parse once, execute many times
- Parameter binding: Safe parameter substitution
- Binary data: Efficient binary format for parameters and results
- Partial execution: Fetch results in batches
Message Sequence:
Parse -> Bind -> Describe (optional) -> Execute -> Sync
28.5.2 Parse Message
Creates a prepared statement from a query string:
Parse Message ('P'):
+------+--------+------+-------+--------+----------+
| 'P' | Length | Name | Query | nParams| ParamOIDs|
+------+--------+------+-------+--------+----------+
Implementation:
void PostgreSQLSession::handle_parse_(const MessageReader& reader) {
auto stmt_name = reader.read_cstring();
auto query = reader.read_cstring();
auto num_params = reader.read_int16();
std::vector<Oid> param_oids;
for (int i = 0; i < num_params; i++) {
param_oids.push_back(reader.read_int32());
}
// Parse and analyze query
auto parse_result = parser_.parse(query);
if (!parse_result.ok()) {
set_extended_error_state_();
send_error_(parse_result.error());
return;
}
// Compute query fingerprint for plan caching
auto fingerprint = libpg_query::fingerprint(query);
// Check plan cache
std::shared_ptr<CompiledPlan> plan;
if (auto cached = plan_cache_.get(fingerprint)) {
plan = cached;
} else {
plan = compiler_.compile(parse_result.tree(), param_oids);
plan_cache_.put(fingerprint, plan);
}
// Store prepared statement
auto stmt = std::make_shared<PreparedStatement>(
stmt_name, query, param_oids, plan);
prepared_statements_[stmt_name] = stmt;
send_parse_complete_();
}
28.5.3 Bind Message
Binds parameter values to a prepared statement, creating a portal:
Bind Message ('B'):
+---+------+--------+------+----------+--------+----------+--------+---------+
|'B'|Length|Portal |Stmt |nFmtCodes |FmtCodes|nParams |ParamLen|ParamData|...
+---+------+--------+------+----------+--------+----------+--------+---------+
Format Code Rules:
- 0 format codes: All parameters use text format
- 1 format code: All parameters use that format
- N format codes: Per-parameter format specification
Implementation:
void PostgreSQLSession::handle_bind_(const MessageReader& reader) {
auto portal_name = reader.read_cstring();
auto stmt_name = reader.read_cstring();
// Read parameter format codes
auto num_format_codes = reader.read_int16();
std::vector<int16_t> param_formats(num_format_codes);
for (int i = 0; i < num_format_codes; i++) {
param_formats[i] = reader.read_int16();
}
// Read parameter values
auto num_params = reader.read_int16();
std::vector<std::optional<std::vector<uint8_t>>> param_values;
for (int i = 0; i < num_params; i++) {
auto len = reader.read_int32();
if (len == -1) {
param_values.push_back(std::nullopt); // NULL
} else {
param_values.push_back(reader.read_bytes(len));
}
}
// Read result format codes
auto num_result_formats = reader.read_int16();
std::vector<int16_t> result_formats(num_result_formats);
for (int i = 0; i < num_result_formats; i++) {
result_formats[i] = reader.read_int16();
}
// Get prepared statement
auto stmt = prepared_statements_[stmt_name];
if (!stmt) {
set_extended_error_state_();
send_error_(ErrorBuilder::invalid_prepared_statement(stmt_name));
return;
}
// Convert parameters to VMValues
auto params = convert_parameters_(param_values, param_formats,
stmt->param_types());
// Create portal
auto portal = std::make_shared<Portal>(
portal_name, stmt, std::move(params), result_formats);
portals_[portal_name] = portal;
send_bind_complete_();
}
28.5.4 Describe Message
Requests metadata about a statement or portal:
Describe Message ('D'):
+------+--------+------+------+
| 'D' | Length | Type | Name |
+------+--------+------+------+
'S' or 'P'
For Statements ('S'):
- Sends
ParameterDescription: Parameter type OIDs - Sends
RowDescriptionorNoData: Result columns
For Portals ('P'):
- Sends
RowDescriptionorNoData: Result columns with bound parameters
void PostgreSQLSession::handle_describe_(const MessageReader& reader) {
auto type = reader.read_char();
auto name = reader.read_cstring();
if (type == 'S') {
// Describe statement
auto stmt = prepared_statements_[name];
send_parameter_description_(stmt->param_types());
if (stmt->returns_rows()) {
send_row_description_(stmt->columns());
} else {
send_no_data_();
}
} else {
// Describe portal
auto portal = portals_[name];
if (portal->returns_rows()) {
send_row_description_(portal->columns());
} else {
send_no_data_();
}
}
}
28.5.5 Execute Message
Executes a portal with optional row limit:
Execute Message ('E'):
+------+--------+--------+----------+
| 'E' | Length | Portal | MaxRows |
+------+--------+--------+----------+
Implementation:
void PostgreSQLSession::handle_execute_(const MessageReader& reader) {
auto portal_name = reader.read_cstring();
auto max_rows = reader.read_int32(); // 0 = no limit
auto portal = portals_[portal_name];
if (!portal) {
set_extended_error_state_();
send_error_(ErrorBuilder::invalid_portal(portal_name));
return;
}
// Send RowDescription if not already sent
if (!portal->row_description_sent() && portal->returns_rows()) {
send_row_description_(portal->columns());
portal->mark_row_description_sent();
}
// Execute with row limit
auto rows_sent = 0;
while (portal->has_more_rows()) {
auto row = portal->fetch_row();
send_data_row_(row, portal->result_formats());
rows_sent++;
if (max_rows > 0 && rows_sent >= max_rows) {
portal->mark_suspended();
send_portal_suspended_();
return;
}
}
send_command_complete_(portal->command_tag());
portal->mark_exhausted();
}
28.5.6 Sync Message
Marks a synchronization point and requests ReadyForQuery:
void PostgreSQLSession::handle_sync_() {
// Clear extended query error state
clear_extended_error_state_();
// Flush output buffer
flush_write_buffer_();
// Send ready for query
send_ready_for_query_(get_transaction_status_());
}
28.5.7 Error Handling in Extended Query
When an error occurs during Parse, Bind, Describe, or Execute, the session enters an error state where subsequent messages (except Sync) are discarded:
void PostgreSQLSession::handle_extended_message_(char type,
const MessageReader& reader) {
if (extended_error_state_ && type != 'S') {
// Discard message, wait for Sync
return;
}
switch (type) {
case 'P': handle_parse_(reader); break;
case 'B': handle_bind_(reader); break;
case 'D': handle_describe_(reader); break;
case 'E': handle_execute_(reader); break;
case 'S': handle_sync_(); break;
case 'C': handle_close_(reader); break;
}
}
void PostgreSQLSession::set_extended_error_state_() {
extended_error_state_ = true;
}
void PostgreSQLSession::clear_extended_error_state_() {
extended_error_state_ = false;
}
This behavior prevents cascading errors when a pipeline of messages fails partway through.
28.6 Prepared Statements and Plan Caching
28.6.1 PreparedStatement Structure
class PreparedStatement {
public:
PreparedStatement(std::string name,
std::string query,
std::vector<Oid> param_types,
std::shared_ptr<CompiledPlan> plan);
auto name() const -> const std::string&;
auto query() const -> const std::string&;
auto param_types() const -> const std::vector<Oid>&;
auto columns() const -> const std::vector<ColumnInfo>&;
auto plan() const -> std::shared_ptr<CompiledPlan>;
// For cache invalidation
auto table_dependencies() const -> const std::vector<TableId>&;
auto fingerprint() const -> uint64_t;
private:
std::string name_;
std::string query_;
std::vector<Oid> param_types_;
std::shared_ptr<CompiledPlan> plan_;
uint64_t fingerprint_;
};
28.6.2 Query Fingerprinting
Cognica uses libpg_query to compute query fingerprints for plan cache lookup:
auto compute_fingerprint(const std::string& query) -> uint64_t {
auto result = pg_query_fingerprint(query.c_str());
if (result.error) {
throw SQLException(result.error->message);
}
auto fingerprint = result.fingerprint_int;
pg_query_free_fingerprint_result(result);
return fingerprint;
}
The fingerprint normalizes:
- Literal values:
WHERE id = 1andWHERE id = 2have the same fingerprint - Whitespace and comments
- Parameter numbering
28.6.3 Plan Cache
class PlanCache {
public:
auto get(uint64_t fingerprint) -> std::shared_ptr<CompiledPlan>;
auto put(uint64_t fingerprint, std::shared_ptr<CompiledPlan> plan) -> void;
auto invalidate(TableId table) -> void;
private:
struct CacheEntry {
std::shared_ptr<CompiledPlan> plan;
std::vector<TableId> dependencies;
std::chrono::steady_clock::time_point last_used;
};
std::unordered_map<uint64_t, CacheEntry> cache_;
std::unordered_multimap<TableId, uint64_t> dependency_index_;
std::mutex mutex_;
};
Cache Invalidation:
When a table's schema changes (DDL), all cached plans depending on that table are invalidated:
void PlanCache::invalidate(TableId table) {
std::lock_guard lock(mutex_);
auto range = dependency_index_.equal_range(table);
for (auto it = range.first; it != range.second; ++it) {
cache_.erase(it->second);
}
dependency_index_.erase(table);
}
28.6.4 Direct CVM Execution
For prepared statements, Cognica compiles the query to CVM bytecode with LOAD_PARAM instructions:
auto compile_prepared_query(const ParseTree& tree,
const std::vector<Oid>& param_types)
-> CompiledPlan {
Compiler compiler;
// Parameters become LOAD_PARAM instructions
for (size_t i = 0; i < param_types.size(); i++) {
compiler.register_parameter(i, oid_to_type(param_types[i]));
}
return compiler.compile(tree);
}
At execution time, bound parameters are loaded directly without SQL string manipulation—eliminating SQL injection risks:
void execute_prepared(const CompiledPlan& plan,
const std::vector<VMValue>& params) {
CVMContext ctx;
// Set parameter values
for (size_t i = 0; i < params.size(); i++) {
ctx.set_parameter(i, params[i]);
}
// Execute bytecode
cvm_.execute(plan.bytecode(), ctx);
}
28.7 Data Type Handling
28.7.1 Type OID Mapping
PostgreSQL identifies types by Object Identifiers (OIDs). Cognica maps between OIDs and internal types:
constexpr Oid kOidBool = 16;
constexpr Oid kOidInt2 = 21;
constexpr Oid kOidInt4 = 23;
constexpr Oid kOidInt8 = 20;
constexpr Oid kOidFloat4 = 700;
constexpr Oid kOidFloat8 = 701;
constexpr Oid kOidText = 25;
constexpr Oid kOidVarchar = 1043;
constexpr Oid kOidTimestamp = 1114;
constexpr Oid kOidTimestamptz = 1184;
constexpr Oid kOidJson = 114;
constexpr Oid kOidJsonb = 3802;
constexpr Oid kOidUuid = 2950;
constexpr Oid kOidBytea = 17;
auto cognica_type_to_oid(ColumnType type) -> Oid {
switch (type) {
case ColumnType::kBool: return kOidBool;
case ColumnType::kInt32: return kOidInt4;
case ColumnType::kInt64: return kOidInt8;
case ColumnType::kFloat: return kOidFloat4;
case ColumnType::kDouble: return kOidFloat8;
case ColumnType::kString: return kOidText;
case ColumnType::kTimestamp: return kOidTimestamptz;
case ColumnType::kJson: return kOidJsonb;
case ColumnType::kBytes: return kOidBytea;
default: return kOidText;
}
}
28.7.2 Text vs Binary Format
The protocol supports two data formats:
Text Format (format code 0):
- Human-readable ASCII/UTF-8 representation
- Portable across versions
- Easier to debug
Binary Format (format code 1):
- Network byte order encoding
- More efficient for large values
- Type-specific encoding
auto encode_value_text(const VMValue& value, ColumnType type) -> std::string {
switch (type) {
case ColumnType::kBool:
return value.as_bool() ? "t" : "f";
case ColumnType::kInt32:
return std::to_string(value.as_int32());
case ColumnType::kInt64:
return std::to_string(value.as_int64());
case ColumnType::kDouble:
return fmt::format("{}", value.as_double());
case ColumnType::kTimestamp:
return format_timestamp(value.as_timestamp());
default:
return value.as_string();
}
}
auto encode_value_binary(const VMValue& value, ColumnType type)
-> std::vector<uint8_t> {
std::vector<uint8_t> buf;
switch (type) {
case ColumnType::kBool:
buf.push_back(value.as_bool() ? 1 : 0);
break;
case ColumnType::kInt32:
write_int32_be(buf, value.as_int32());
break;
case ColumnType::kInt64:
write_int64_be(buf, value.as_int64());
break;
case ColumnType::kDouble:
write_double_be(buf, value.as_double());
break;
// ...
}
return buf;
}
28.7.3 Parameter Conversion
Incoming parameters are converted from wire format to VMValue:
auto convert_parameter(const std::vector<uint8_t>& data,
int16_t format,
Oid type_oid) -> VMValue {
if (format == 0) {
// Text format
std::string text(data.begin(), data.end());
return parse_text_value(text, type_oid);
} else {
// Binary format
return parse_binary_value(data, type_oid);
}
}
auto parse_text_value(const std::string& text, Oid type_oid) -> VMValue {
switch (type_oid) {
case kOidBool:
return VMValue(text == "t" || text == "true" || text == "1");
case kOidInt4:
return VMValue(std::stoi(text));
case kOidInt8:
return VMValue(std::stoll(text));
case kOidFloat8:
return VMValue(std::stod(text));
case kOidTimestamptz:
return VMValue(parse_timestamp(text));
default:
return VMValue(text);
}
}
28.8 Error Handling
28.8.1 Error Response Format
PostgreSQL errors include extensive metadata:
struct ErrorFields {
char severity; // 'S': ERROR, WARNING, etc.
std::string sqlstate; // 'C': 5-character SQL state code
std::string message; // 'M': Primary message
std::string detail; // 'D': Detail message
std::string hint; // 'H': Hint for resolution
std::string position; // 'P': Cursor position in query
std::string where; // 'W': Call stack
std::string schema_name; // 's': Schema name
std::string table_name; // 't': Table name
std::string column_name; // 'c': Column name
std::string constraint_name;// 'n': Constraint name
std::string file; // 'F': Source file
std::string line; // 'L': Source line
std::string routine; // 'R': Function name
};
28.8.2 SQL State Codes
Cognica implements standard PostgreSQL error codes (src/cognica/net/pgsql/pgsql_error_codes.hpp):
namespace sqlstate {
// Class 00 - Successful Completion
constexpr const char* kSuccessfulCompletion = "00000";
// Class 08 - Connection Exception
constexpr const char* kConnectionException = "08000";
constexpr const char* kProtocolViolation = "08P01";
// Class 23 - Integrity Constraint Violation
constexpr const char* kIntegrityConstraintViolation = "23000";
constexpr const char* kUniqueViolation = "23505";
constexpr const char* kForeignKeyViolation = "23503";
// Class 42 - Syntax Error or Access Rule Violation
constexpr const char* kSyntaxError = "42601";
constexpr const char* kUndefinedTable = "42P01";
constexpr const char* kUndefinedColumn = "42703";
// Class 57 - Operator Intervention
constexpr const char* kQueryCanceled = "57014";
constexpr const char* kIdleSessionTimeout = "57P05";
}
28.8.3 Error Builder
A helper class constructs common errors:
class ErrorBuilder {
public:
static auto syntax_error(std::string_view message, int position = -1)
-> ErrorFields {
return ErrorFields{
.severity = 'E',
.sqlstate = sqlstate::kSyntaxError,
.message = std::string(message),
.position = position >= 0 ? std::to_string(position) : ""
};
}
static auto undefined_table(std::string_view table_name) -> ErrorFields {
return ErrorFields{
.severity = 'E',
.sqlstate = sqlstate::kUndefinedTable,
.message = fmt::format("relation \"{}\" does not exist", table_name),
.table_name = std::string(table_name)
};
}
static auto query_canceled() -> ErrorFields {
return ErrorFields{
.severity = 'E',
.sqlstate = sqlstate::kQueryCanceled,
.message = "canceling statement due to user request"
};
}
};
28.8.4 Sending Error Response
void MessageWriter::build_error_response(const ErrorFields& fields) {
write_char('E');
auto length_pos = reserve_length_();
write_char('S');
write_cstring(fields.severity == 'E' ? "ERROR" : "WARNING");
write_char('V');
write_cstring(fields.severity == 'E' ? "ERROR" : "WARNING");
write_char('C');
write_cstring(fields.sqlstate);
write_char('M');
write_cstring(fields.message);
if (!fields.detail.empty()) {
write_char('D');
write_cstring(fields.detail);
}
if (!fields.hint.empty()) {
write_char('H');
write_cstring(fields.hint);
}
// ... other fields
write_char('\0'); // Terminator
fill_length_(length_pos);
}
28.9 Transaction State Management
28.9.1 Transaction Status
The ReadyForQuery message indicates current transaction state:
enum class TransactionStatus : char {
kIdle = 'I', // Not in transaction block
kInTransaction = 'T', // In transaction block
kFailed = 'E' // Failed transaction block
};
28.9.2 Implicit vs Explicit Transactions
Implicit Transaction (autocommit):
INSERT INTO users (name) VALUES ('Alice');
-- Automatically committed
Explicit Transaction:
BEGIN;
INSERT INTO users (name) VALUES ('Alice');
INSERT INTO accounts (user_id) VALUES (1);
COMMIT;
28.9.3 Failed Transaction Handling
After an error in a transaction block, all subsequent commands fail until ROLLBACK:
void PostgreSQLSession::execute_in_transaction_(const std::string& query) {
if (transaction_status_ == TransactionStatus::kFailed) {
if (!is_rollback_command(query)) {
send_error_(ErrorBuilder::in_failed_transaction());
return;
}
}
try {
auto result = executor_.execute(query, transaction_.get());
send_result_(result);
if (is_commit_command(query) || is_rollback_command(query)) {
transaction_status_ = TransactionStatus::kIdle;
transaction_.reset();
}
} catch (const SQLException& e) {
send_error_(e);
transaction_status_ = TransactionStatus::kFailed;
}
}
28.10 Query Cancellation
28.10.1 Cancel Request Protocol
Clients can cancel running queries via a separate connection:
CancelRequest:
+--------+------------+-----+--------+
| Length | CancelCode | PID | Secret |
+--------+------------+-----+--------+
4B 4B 4B 4B
The cancel code is 80877102.
28.10.2 Implementation
void PostgreSQLSession::handle_cancel_request_(const MessageReader& reader) {
auto process_id = reader.read_int32();
auto secret_key = reader.read_int32();
// Find and cancel the target session
auto cancelled = session_registry_.cancel_session(process_id, secret_key);
// Close this connection (cancel requests don't get responses)
close_();
}
The target session checks for cancellation during query execution:
void QueryExecutor::check_cancellation_() {
if (session_->is_cancelled()) {
throw SQLException(ErrorBuilder::query_canceled());
}
}
28.10.3 Session Registry
The SessionRegistry tracks active sessions for cancellation and monitoring:
class SessionRegistry {
public:
void register_session(int32_t pid, int32_t secret_key,
PostgreSQLSession* session);
void unregister_session(int32_t pid);
auto cancel_session(int32_t pid, int32_t secret_key) -> bool;
auto terminate_session(int32_t pid) -> bool;
// For pg_stat_activity
auto get_all_sessions() const -> std::vector<SessionInfo>;
private:
struct Entry {
int32_t secret_key;
PostgreSQLSession* session;
std::chrono::system_clock::time_point backend_start;
};
std::unordered_map<int32_t, Entry> sessions_;
std::shared_mutex mutex_;
};
28.11 COPY Protocol
28.11.1 COPY FROM (Import)
28.11.2 COPY TO (Export)
28.11.3 CopyInResponse/CopyOutResponse
void MessageWriter::build_copy_in_response(int8_t format,
const std::vector<int16_t>& formats) {
write_char('G');
auto length_pos = reserve_length_();
write_int8(format); // 0=text, 1=binary
write_int16(formats.size());
for (auto f : formats) {
write_int16(f);
}
fill_length_(length_pos);
}
28.12 Async I/O Architecture
28.12.1 Boost.ASIO Integration
Cognica uses Boost.ASIO for non-blocking I/O:
class PostgreSQLSession : public std::enable_shared_from_this<PostgreSQLSession> {
public:
PostgreSQLSession(asio::io_context& io_context,
asio::ssl::context& ssl_context);
private:
using SSLStream = asio::ssl::stream<asio::ip::tcp::socket>;
void do_read_header_();
void do_read_body_(size_t body_length);
void do_write_();
asio::io_context& io_context_;
std::unique_ptr<SSLStream> socket_;
asio::strand<asio::io_context::executor_type> strand_;
std::vector<uint8_t> read_buffer_;
std::vector<uint8_t> write_buffer_;
};
28.12.2 Read Pipeline
void PostgreSQLSession::do_read_header_() {
auto self = shared_from_this();
// Read 5 bytes: 1 type + 4 length
asio::async_read(
*socket_,
asio::buffer(read_buffer_.data(), 5),
asio::bind_executor(strand_, [this, self](auto ec, auto bytes) {
if (ec) {
handle_error_(ec);
return;
}
auto type = read_buffer_[0];
auto length = read_int32_be(read_buffer_.data() + 1);
do_read_body_(length - 4);
}));
}
void PostgreSQLSession::do_read_body_(size_t body_length) {
auto self = shared_from_this();
read_buffer_.resize(5 + body_length);
asio::async_read(
*socket_,
asio::buffer(read_buffer_.data() + 5, body_length),
asio::bind_executor(strand_, [this, self](auto ec, auto bytes) {
if (ec) {
handle_error_(ec);
return;
}
on_message_complete_();
do_read_header_(); // Continue reading
}));
}
28.12.3 Write Pipeline
void PostgreSQLSession::queue_message_(std::span<const uint8_t> data) {
write_buffer_.insert(write_buffer_.end(), data.begin(), data.end());
}
void PostgreSQLSession::flush_write_buffer_() {
if (write_buffer_.empty() || write_in_progress_) {
return;
}
write_in_progress_ = true;
auto self = shared_from_this();
asio::async_write(
*socket_,
asio::buffer(write_buffer_),
asio::bind_executor(strand_, [this, self](auto ec, auto bytes) {
write_in_progress_ = false;
if (ec) {
handle_error_(ec);
return;
}
write_buffer_.clear();
// Check if more data queued during write
if (!pending_writes_.empty()) {
flush_write_buffer_();
}
}));
}
28.13 Performance Considerations
28.13.1 Connection Pooling
While Cognica doesn't implement connection pooling internally, the protocol supports it through external poolers (PgBouncer, Pgpool-II):
- Session pooling: Connections assigned for entire session
- Transaction pooling: Connections assigned per transaction
- Statement pooling: Connections assigned per statement
Cognica's stateless query execution supports all pooling modes.
28.13.2 Prepared Statement Benefits
Prepared statements provide significant performance benefits:
| Aspect | Simple Query | Prepared Statement |
|---|---|---|
| Parsing | Every execution | Once |
| Planning | Every execution | Once (cached) |
| Parameter safety | String escaping | Binary binding |
| Network overhead | Full query text | Parameter values only |
28.13.3 Binary Format Benefits
Binary format reduces CPU and bandwidth:
| Type | Text Size | Binary Size | Savings |
|---|---|---|---|
| int32 | 1-11 bytes | 4 bytes | Variable |
| int64 | 1-20 bytes | 8 bytes | Variable |
| double | 1-24 bytes | 8 bytes | Variable |
| timestamp | 26 bytes | 8 bytes | 69% |
| UUID | 36 bytes | 16 bytes | 56% |
28.14 Summary
Cognica's PostgreSQL Wire Protocol implementation enables seamless integration with the PostgreSQL ecosystem:
- Protocol Completeness: Full support for both Simple Query and Extended Query protocols
- Security: SCRAM-SHA-256 authentication, SSL/TLS encryption
- Performance: Plan caching, binary format, prepared statement optimization
- Compatibility: Standard error codes, transaction semantics, cancellation support
- Async Architecture: Non-blocking I/O for high concurrency
Key implementation highlights:
- Session state machine managing connection lifecycle
- Extended Query Protocol with Parse/Bind/Describe/Execute phases
- Query fingerprinting for plan cache lookup
- Direct CVM execution for prepared statements
- Comprehensive error handling with PostgreSQL-compatible codes
The wire protocol layer serves as Cognica's primary interface, translating between the PostgreSQL world and Cognica's internal execution engine.