Skip to content

Fix #90 - disable parallel scans for read-only replicas #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/include/postgres_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class PostgresConnection {
void Execute(const string &query);
unique_ptr<PostgresResult> Query(const string &query);

PostgresVersion GetPostgresVersion();

vector<IndexInfo> GetIndexInfo(const string &table_name);

void BeginCopyTo(ClientContext &context, PostgresCopyState &state, PostgresCopyFormat format,
Expand Down
2 changes: 1 addition & 1 deletion src/include/postgres_scanner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class PostgresScanFunction : public TableFunction {
public:
PostgresScanFunction();

static void PrepareBind(ClientContext &context, PostgresBindData &bind);
static void PrepareBind(PostgresVersion version, ClientContext &context, PostgresBindData &bind);
};

class PostgresScanFunctionFilterPushdown : public TableFunction {
Expand Down
3 changes: 3 additions & 0 deletions src/include/postgres_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "duckdb.hpp"
#include <libpq-fe.h>
#include "postgres_version.hpp"

namespace duckdb {
class PostgresSchemaEntry;
Expand Down Expand Up @@ -44,6 +45,8 @@ class PostgresUtils {
static bool SupportedPostgresOid(const LogicalType &input);
static LogicalType RemoveAlias(const LogicalType &type);
static PostgresType CreateEmptyPostgresType(const LogicalType &type);

static PostgresVersion ExtractPostgresVersion(const string &version);
};

} // namespace duckdb
51 changes: 51 additions & 0 deletions src/include/postgres_version.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//===----------------------------------------------------------------------===//
// DuckDB
//
// postgres_version.hpp
//
//
//===----------------------------------------------------------------------===//

#pragma once

#include "duckdb/common/common.hpp"

namespace duckdb {

struct PostgresVersion {
PostgresVersion() {
}
PostgresVersion(idx_t major_v, idx_t minor_v, idx_t patch_v = 0) : major_v(major_v), minor_v(minor_v), patch_v(patch_v) {
}

idx_t major_v = 0;
idx_t minor_v = 0;
idx_t patch_v = 0;

inline bool operator<(const PostgresVersion &rhs) const {
if (major_v < rhs.major_v) {
return true;
}
if (major_v > rhs.major_v) {
return false;
}
if (minor_v < rhs.minor_v) {
return true;
}
if (minor_v > rhs.minor_v) {
return false;
}
return patch_v < rhs.patch_v;
};
inline bool operator<=(const PostgresVersion &rhs) const {
return !(rhs < *this);
};
inline bool operator>(const PostgresVersion &rhs) const {
return rhs < *this;
};
inline bool operator>=(const PostgresVersion &rhs) const {
return !(*this < rhs);
};
};

} // namespace duckdb
8 changes: 7 additions & 1 deletion src/include/storage/postgres_catalog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ class PostgresSchemaEntry;

class PostgresCatalog : public Catalog {
public:
explicit PostgresCatalog(AttachedDatabase &db_p, const string &path, AccessMode access_mode);
explicit PostgresCatalog(PostgresVersion version, AttachedDatabase &db_p, const string &path,
AccessMode access_mode);
~PostgresCatalog();

string path;
Expand Down Expand Up @@ -52,6 +53,10 @@ class PostgresCatalog : public Catalog {

DatabaseSize GetDatabaseSize(ClientContext &context) override;

PostgresVersion GetPostgresVersion() const {
return version;
}

//! Label all postgres scans in the sub-tree as requiring materialization
//! This is used for e.g. insert queries that have both (1) a scan from a postgres table, and (2) a sink into one
static void MaterializePostgresScans(PhysicalOperator &op);
Expand All @@ -70,6 +75,7 @@ class PostgresCatalog : public Catalog {
void DropSchema(ClientContext &context, DropInfo &info) override;

private:
PostgresVersion version;
PostgresSchemaSet schemas;
PostgresConnectionPool connection_pool;
};
Expand Down
6 changes: 6 additions & 0 deletions src/postgres_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ void PostgresConnection::Execute(const string &query) {
Query(query);
}

PostgresVersion PostgresConnection::GetPostgresVersion() {
auto result = Query("SHOW server_version;");
auto version = PostgresUtils::ExtractPostgresVersion(result->GetString(0, 0));
return version;
}

bool PostgresConnection::IsOpen() {
return connection.get();
}
Expand Down
26 changes: 18 additions & 8 deletions src/postgres_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,25 @@ struct PostgresGlobalState : public GlobalTableFunctionState {
}
};

void PostgresScanFunction::PrepareBind(ClientContext &context, PostgresBindData &bind_data) {
void PostgresScanFunction::PrepareBind(PostgresVersion version, ClientContext &context, PostgresBindData &bind_data) {
// we create a transaction here, and get the snapshot id so the parallel
// reader threads can use the same snapshot
auto &con = bind_data.connection;
auto result = con.Query("SELECT pg_is_in_recovery(), pg_export_snapshot()");
bind_data.in_recovery = result->GetBool(0, 0);
bind_data.snapshot = "";

if (!bind_data.in_recovery) {
bind_data.snapshot = result->GetString(0, 1);
// pg_stat_wal_receiver was introduced in PostgreSQL 9.6
if (version >= PostgresVersion(9, 6, 0)) {
auto result = con.Query("SELECT pg_is_in_recovery(), pg_export_snapshot(), (select count(*) from pg_stat_wal_receiver)");
bind_data.in_recovery = result->GetBool(0, 0) || result->GetInt64(0, 2) > 0;
bind_data.snapshot = "";
if (!bind_data.in_recovery) {
bind_data.snapshot = result->GetString(0, 1);
}
} else {
auto result = con.Query("SELECT pg_is_in_recovery(), pg_export_snapshot()");
bind_data.in_recovery = result->GetBool(0, 0);
bind_data.snapshot = "";
if (!bind_data.in_recovery) {
bind_data.snapshot = result->GetString(0, 1);
}
}

Value pages_per_task;
Expand Down Expand Up @@ -85,7 +94,8 @@ static unique_ptr<FunctionData> PostgresBind(ClientContext &context, TableFuncti

bind_data->connection = PostgresConnection::Open(bind_data->dsn);
bind_data->connection.Execute("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY");
PostgresScanFunction::PrepareBind(context, *bind_data);
auto version = bind_data->connection.GetPostgresVersion();
PostgresScanFunction::PrepareBind(version, context, *bind_data);

// query the table schema so we can interpret the bits in the pages
auto info = PostgresTableSet::GetTableInfo(bind_data->connection, bind_data->schema_name, bind_data->table_name);
Expand Down
4 changes: 3 additions & 1 deletion src/postgres_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ namespace duckdb {

static unique_ptr<Catalog> PostgresAttach(StorageExtensionInfo *storage_info, AttachedDatabase &db, const string &name,
AttachInfo &info, AccessMode access_mode) {
return make_uniq<PostgresCatalog>(db, info.path, access_mode);
auto connection = PostgresConnection::Open(info.path);
auto version = connection.GetPostgresVersion();
return make_uniq<PostgresCatalog>(version, db, info.path, access_mode);
}

static unique_ptr<TransactionManager> PostgresCreateTransactionManager(StorageExtensionInfo *storage_info,
Expand Down
40 changes: 40 additions & 0 deletions src/postgres_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,4 +325,44 @@ uint32_t PostgresUtils::ToPostgresOid(const LogicalType &input) {
}
}

PostgresVersion PostgresUtils::ExtractPostgresVersion(const string &version_str) {
PostgresVersion result;
idx_t pos = 0;
// scan for the first digit
while(pos < version_str.size() && !StringUtil::CharacterIsDigit(version_str[pos])) {
pos++;
}
for(idx_t version_idx = 0; version_idx < 3; version_idx++) {
idx_t digit_start = pos;
while(pos < version_str.size() && StringUtil::CharacterIsDigit(version_str[pos])) {
pos++;
}
if (digit_start == pos) {
// no digits
break;
}
// our version is at [digit_start..pos)
auto digit_str = version_str.substr(digit_start, pos - digit_start);
auto digit = std::strtoll(digit_str.c_str(), 0, 10);
switch(version_idx) {
case 0:
result.major_v = digit;
break;
case 1:
result.minor_v = digit;
break;
default:
result.patch_v = digit;
break;
}

// check if the next character is a dot, if not we stop
if (pos >= version_str.size() || version_str[pos] != '.') {
break;
}
pos++;
}
return result;
}

}
4 changes: 2 additions & 2 deletions src/storage/postgres_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

namespace duckdb {

PostgresCatalog::PostgresCatalog(AttachedDatabase &db_p, const string &path, AccessMode access_mode)
: Catalog(db_p), path(path), access_mode(access_mode), schemas(*this) {
PostgresCatalog::PostgresCatalog(PostgresVersion version, AttachedDatabase &db_p, const string &path, AccessMode access_mode)
: Catalog(db_p), path(path), access_mode(access_mode), version(version), schemas(*this) {
Value connection_limit;
auto &db_instance = db_p.GetDatabase();
if (db_instance.TryGetCurrentSetting("pg_connection_limit", connection_limit)) {
Expand Down
5 changes: 3 additions & 2 deletions src/storage/postgres_table_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ void PostgresTableEntry::BindUpdateConstraints(LogicalGet &, LogicalProjection &
}

TableFunction PostgresTableEntry::GetScanFunction(ClientContext &context, unique_ptr<FunctionData> &bind_data) {
auto &pg_catalog = catalog.Cast<PostgresCatalog>();
auto &transaction = Transaction::Get(context, catalog).Cast<PostgresTransaction>();
auto &conn = transaction.GetConnection();

Expand All @@ -46,7 +47,7 @@ TableFunction PostgresTableEntry::GetScanFunction(ClientContext &context, unique
result->transaction = &transaction;
result->connection = PostgresConnection(conn.GetConnection());

PostgresScanFunction::PrepareBind(context, *result);
PostgresScanFunction::PrepareBind(pg_catalog.GetPostgresVersion(), context, *result);
for(auto &col : columns.Logical()) {
result->types.push_back(col.GetType());
}
Expand All @@ -57,7 +58,7 @@ TableFunction PostgresTableEntry::GetScanFunction(ClientContext &context, unique

// check how many threads we can actually use
if (result->max_threads > 1) {
auto &connection_pool = catalog.Cast<PostgresCatalog>().GetConnectionPool();
auto &connection_pool = pg_catalog.GetConnectionPool();
result->connection_reservation = connection_pool.AllocateConnections(result->max_threads);
result->max_threads = result->connection_reservation.GetConnectionCount();
}
Expand Down
10 changes: 10 additions & 0 deletions test/sql/storage/attach_non_existent.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# name: test/sql/storage/attach_non_existent.test
# description: Test attaching to a database that does not exist
# group: [storage]

require postgres_scanner

statement error
ATTACH 'dbname=dbdoesnotexistx' AS s1 (TYPE POSTGRES)
----
does not exist