Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion postgres/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Configuration used for [`Pool`] creation.

use std::{env, fmt, net::IpAddr, str::FromStr, time::Duration};
use std::{env, fmt, net::IpAddr, str::FromStr, sync::Arc, time::Duration};

use tokio_postgres::config::{
ChannelBinding as PgChannelBinding, LoadBalanceHosts as PgLoadBalanceHosts,
Expand Down Expand Up @@ -117,6 +117,19 @@ pub struct Config {
pub pool: Option<PoolConfig>,
}

/// A [`deadpool::managed::ClientConfigProvider`] that returns a static
/// configuration for tokio_postgres.
#[derive(Debug)]
pub struct StaticPostgresConfigProvider {
pub config: Arc<tokio_postgres::Config>,
}

impl deadpool::managed::ClientConfigProvider<tokio_postgres::Config, tokio_postgres::Error> for StaticPostgresConfigProvider {
async fn get_config(&self) -> Result<Arc<tokio_postgres::Config>, tokio_postgres::Error> {
Ok(self.config.clone())
}
}

/// This error is returned if there is something wrong with the configuration
#[derive(Debug)]
pub enum ConfigError {
Expand Down
74 changes: 65 additions & 9 deletions postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use std::{
};

use deadpool::managed;
use deadpool::managed::ClientConfigProvider;
#[cfg(not(target_arch = "wasm32"))]
use tokio::spawn;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -77,18 +78,23 @@ pub type Client = Object;
type RecycleResult = managed::RecycleResult<Error>;
type RecycleError = managed::RecycleError<Error>;

/// [`Manager`] for creating and recycling PostgreSQL connections.
/// [`BaseManager`] for creating and recycling PostgreSQL connections with custom config providers.
///
/// [`Manager`]: managed::Manager
pub struct Manager {
/// [`BaseManager`]: managed::Manager
pub struct BaseManager<C: ClientConfigProvider<tokio_postgres::Config, Error> + Send + Sync> {
config: ManagerConfig,
pg_config: PgConfig,
pg_config: C,
connect: Box<dyn Connect>,
/// [`StatementCaches`] of [`Client`]s handed out by the [`Pool`].
pub statement_caches: StatementCaches,
}

impl Manager {
/// [`Manager`] for creating and recycling PostgreSQL connections.
///
/// [`Manager`]: managed::Manager
pub type Manager = BaseManager<config::StaticPostgresConfigProvider>;

impl BaseManager<config::StaticPostgresConfigProvider> {
#[cfg(not(target_arch = "wasm32"))]
/// Creates a new [`Manager`] using the given [`tokio_postgres::Config`] and
/// `tls` connector.
Expand Down Expand Up @@ -124,7 +130,56 @@ impl Manager {
) -> Self {
Self {
config,
pg_config,
pg_config: config::StaticPostgresConfigProvider {
config: Arc::new(pg_config),
},
connect: Box::new(connect),
statement_caches: StatementCaches::default(),
}
}
}

impl<C: ClientConfigProvider<tokio_postgres::Config, Error> + Send + Sync> BaseManager<C> {

#[cfg(not(target_arch = "wasm32"))]
/// Creates a new [`Manager`] using the given type implementing
/// [`ClientConfigProvider<tokio_postgres::Config, Error>`],
/// `tls` connector.
pub fn new_provider<T>(pg_config_provider: C, tls: T) -> Self
where
T: MakeTlsConnect<Socket> + Clone + Sync + Send + 'static,
T::Stream: Sync + Send,
T::TlsConnect: Sync + Send,
<T::TlsConnect as TlsConnect<Socket>>::Future: Send,
{
Self::from_config_provider(pg_config_provider, tls, ManagerConfig::default())
}

#[cfg(not(target_arch = "wasm32"))]
/// Create a new [`Manager`] using the given type implementing
/// [`ClientConfigProvider<tokio_postgres::Config, Error>`], and
/// `tls` connector and [`ManagerConfig`].
pub fn from_config_provider<T>(pg_config_provider: C, tls: T, config: ManagerConfig) -> Self
where
T: MakeTlsConnect<Socket> + Clone + Sync + Send + 'static,
T::Stream: Sync + Send,
T::TlsConnect: Sync + Send,
<T::TlsConnect as TlsConnect<Socket>>::Future: Send,
{
Self::from_connect_provider(pg_config_provider, ConfigConnectImpl { tls }, config)
}

/// Create a new [`Manager`] using the given type implementing
/// [`ClientConfigProvider<tokio_postgres::Config, Error>`], and
/// `connect` impl and [`ManagerConfig`].
pub fn from_connect_provider(
pg_config_provider: C,
connect: impl Connect + 'static,
config: ManagerConfig,
) -> Self {
Self {
config,
pg_config: pg_config_provider,
connect: Box::new(connect),
statement_caches: StatementCaches::default(),
}
Expand All @@ -135,19 +190,20 @@ impl fmt::Debug for Manager {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Manager")
.field("config", &self.config)
.field("pg_config", &self.pg_config)
//.field("pg_config", &self.pg_config.get_config())
//.field("connect", &self.connect)
.field("statement_caches", &self.statement_caches)
.finish()
}
}

impl managed::Manager for Manager {
impl<C: ClientConfigProvider<tokio_postgres::Config, Error> + Send + Sync> managed::Manager for BaseManager<C> {
type Type = ClientWrapper;
type Error = Error;

async fn create(&self) -> Result<ClientWrapper, Error> {
let (client, conn_task) = self.connect.connect(&self.pg_config).await?;
let config = self.pg_config.get_config().await?;
let (client, conn_task) = self.connect.connect(&config).await?;
let client_wrapper = ClientWrapper::new(client, conn_task);
self.statement_caches
.attach(&client_wrapper.statement_cache);
Expand Down
8 changes: 8 additions & 0 deletions src/managed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ pub use self::{
/// Result type of the [`Manager::recycle()`] method.
pub type RecycleResult<E> = Result<(), RecycleError<E>>;

/// Abstract trait to be refined into traits that can be implemented
/// by types that can be used as a [`ClientConfigProvider`] for short
/// lived client configurations.
pub trait ClientConfigProvider<T, E> {
/// Tries to get a configuration for the client that can be used to create new connections.
fn get_config(&self) -> impl Future<Output = Result<Arc<T>, E>> + Send;
}

/// Manager responsible for creating new [`Object`]s or recycling existing ones.
pub trait Manager: Sync + Send {
/// Type of [`Object`]s that this [`Manager`] creates and recycles.
Expand Down
Loading