Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Example of using pgwire as a postgres proxy #80

Open
osawyerr opened this issue Apr 18, 2023 · 21 comments
Open

Example of using pgwire as a postgres proxy #80

osawyerr opened this issue Apr 18, 2023 · 21 comments

Comments

@osawyerr
Copy link

Was looking at the examples and there isn't one that actually connects to a real postgres instance in the backend. For example to use pgwire as a proxy to a 'real' postgres server. i.e. maintaining different client sessions and forwarding them to a real postgres instance.

@sunng87
Copy link
Owner

sunng87 commented Apr 19, 2023

hi @osawyerr , it's possible to write such a proxy but we need to define the purpose of it so we can choose at which layer we will proxy the traffic. For example, we can write a Layer-4 proxy by simply forward any tcp traffic to backend, or a Layer-7 one that understands postgresql's protocol or even sql statements.

If I'm creating a basic Layer-7 example, I will let the proxy to finish the startup and forward further query traffic to backend. I will find time for that.

@lucasyvas
Copy link

lucasyvas commented Oct 28, 2023

My vote is for Layer-7. Personally, I'd like to be able to fundamentally understand the request, execute the query, apply transforms to the result rows, then hand them back out.

@batmilkyway
Copy link

batmilkyway commented Dec 5, 2023

I'll 3rd the request for a l7 proxy example, @sunng87 . My use case is an auth n/z aware proxy that can make authorization decisions based on the query.
Even something as simple as how to connect to a backend postgres and forward/reply messages back and forth.

@sunng87
Copy link
Owner

sunng87 commented Dec 5, 2023

Let me do this in weekend if everything goes well.

@batmilkyway
Copy link

Let me do this in weekend if everything goes well.

Thank you, looking forward to it!

@sunng87
Copy link
Owner

sunng87 commented Dec 10, 2023

I have an unfinished proxy example that uses tokio_postgres in SimpleQueryHandler to forward requests to upstream. There is some residual work to convert data format from tokio_postgres to pgwire.

However, for a complete proxy solution, we will need to implement tokio layer for using pgwire data format as client for finer and better control over messages. That's beyond my time budget for this project. I welcome contributors to join the development if you are interested in this feature.

@adriangb
Copy link

adriangb commented Mar 6, 2024

I'll +1 this request.

My use case is to intercept every message and validate incoming queries.
I wrote a Python version of this that worked by starting two async tasks, one that listened to the server and pushed to the client and another that listened to the client and pushed to the server. Thus I didn't need any sort of state machine.

@FelixMalfait
Copy link

We also need this for https://github.com/twentyhq/twenty

I've never tried Replit bounties but thought it could be an occasion to give it a shot... Let's see if it works!
https://replit.com/bounties/@felix54/rust-postgres-proxy

@usocoder
Copy link

https://github.com/usocoder/felix/tree/main/twenty-postgres-proxy

Hey there! My name its Ritchie from New Zealand i have developed a program that acts like a security guard for a database I created a new folder called twenty postgres proxy to organize key files. The core of our solution is a Rust-based program that performs several crucial tasks. It listens for incoming database connection attempts, verifies if the user trying to connect has permission, and if authorized, restricts the user to only the sections of the database they’re allowed to access. It also forwards their queries to the actual database and returns the responses. I created a "config.json" file that allows easy customization of where the program should listen for connections, the location of the real database, and who is allowed to connect and what parts of the database they can access.
here is my email for instant contact
[email protected]

0x819ffeE0e30BBB4309282297C7F26D11a9cD3350
here is my Eth address if this is successful,
or email what payment methods you can do.

@FelixMalfait
Copy link

Thanks @usocoder - tbh I think the bounty was a mistake (apologies @sunng87 for the spam it just created), we got low quality contributions and I wouldn't do it this way again. @usocoder your code doesn't work on my side, it doesn't have any test / no readme and relies on an old version of pgwire (0.12 vs 0.25 today)

@sunng87
Copy link
Owner

sunng87 commented Oct 13, 2024

@FelixMalfait no worry. I'm quite interested in your progress with pgwire on a postgres proxy. I've listed it in my projects for pgwire #204 . I just don't have enough bandwidth for building this project. But for anyone who is or will be working on this direction with pgwire, I will offer support.

@kernel-loophole
Copy link

@sunng87 would love to work on this .need some guide line and steps to do this.

@kernel-loophole
Copy link

@sunng87 can you describe a bit about StatelessMakeHandler in your example

use pgwire::api::{ClientInfo, MakeHandler, StatelessMakeHandler, Type};

@sunng87
Copy link
Owner

sunng87 commented Oct 17, 2024

@kernel-loophole Great to hear! This StatelessMakHandler is out of date. You can find a basic usage of the API in our sqlite example.

I shared my idea of building Postgres middleware in here. In the initial version, we can have StartupHandler, SimpleQueryHandler and ExtendedQueryHandler simply forward incoming messages to upstream. This requires us to:

  1. implement some client logic to connect to upstream postgres, sending PgwireFrontendMessage and receiving PgwireBackendMessage
  2. implement all those on_xxx handlers in the traits I mentioned, forward the message.

Once we finished 1 and 2, it should be a working layer-7 proxy. We can start to add:

  • Upstream connection management
  • API to monitoring or customize message forwarding

@kernel-loophole
Copy link

@sunng87 Thanks for sharing the details and the plan for building the Postgres middleware! I’ll take a look at the sqlite example for a basic usage reference. The approach you outlined for implementing the client logic and handlers to create a working layer-7 proxy makes sense. Once the initial setup is complete, adding connection management and monitoring APIs will definitely enhance the middleware. I'll start by exploring the traits you mentioned and keep you posted on the progress. Looking forward to collaborating on this!

@kernel-loophole
Copy link

hi @sunng87 make some changes in proxy for postgres. implemented StatelessMakeHandler.can you review it master...kernel-loophole:pgwire:master

@sunng87
Copy link
Owner

sunng87 commented Oct 25, 2024

hello @kernel-loophole could you rebase latest master?

@kernel-loophole
Copy link

yes sure @sunng87 will do

@tbicr
Copy link
Contributor

tbicr commented Nov 2, 2024

I found that making proxy with tokio_postgres can be quite tricky as RowDescription information can vanish for simple_query or query_typed and it a bit complicated to repack same messages especially RowDescription and DataRow.

So I use SimpleQueryHandler.on_query and tokio_postgres.client.InnerClient to get simple messages repack, however it still require small patching of tokio_postgres to get it work:

use std::fmt::Debug;
use std::sync::Arc;

use async_trait::async_trait;
use fallible_iterator::FallibleIterator;
use futures::{Sink, SinkExt, StreamExt};

use bytes::BytesMut;
use pgwire::api::auth::md5pass::{hash_md5_password, Md5PasswordAuthStartupHandler};
use pgwire::api::auth::{AuthSource, DefaultServerParameterProvider, LoginInfo, Password};
use pgwire::api::copy::NoopCopyHandler;
use pgwire::api::query::{PlaceholderExtendedQueryHandler, SimpleQueryHandler};
use pgwire::api::results::Response;
use pgwire::api::ClientInfo;
use pgwire::api::{PgWireConnectionState, PgWireHandlerFactory};
use pgwire::error::{PgWireError, PgWireResult};
use pgwire::messages::data::{DataRow, FieldDescription, RowDescription};
use pgwire::messages::response::{
    CommandComplete, EmptyQueryResponse, ReadyForQuery, TransactionStatus,
};
use pgwire::messages::simplequery::Query;
use pgwire::messages::PgWireBackendMessage;
use pgwire::tokio::process_socket;
use postgres_protocol::message::backend::Message;
use tokio::net::TcpListener;
use tokio_postgres::{Client, GenericClient, NoTls};

pub struct PostgresBackend {
    upstream_client: Arc<Client>,
}

struct DummyAuthSource;

#[async_trait]
impl AuthSource for DummyAuthSource {
    async fn get_password(&self, login_info: &LoginInfo) -> PgWireResult<Password> {
        println!("login info: {:?}", login_info);

        let salt = vec![0, 0, 0, 0];
        let password = "pencil";

        let hash_password =
            hash_md5_password(login_info.user().as_ref().unwrap(), password, salt.as_ref());
        Ok(Password::new(Some(salt), hash_password.as_bytes().to_vec()))
    }
}

#[async_trait]
impl SimpleQueryHandler for PostgresBackend {
    async fn on_query<C>(&self, client: &mut C, query: Query) -> PgWireResult<()>
    where
        C: ClientInfo + Sink<PgWireBackendMessage> + Unpin + Send + Sync,
        C::Error: Debug,
        PgWireError: From<<C as Sink<PgWireBackendMessage>>::Error>,
    {
        client.set_state(PgWireConnectionState::QueryInProgress);
        let query_string = query.query;

        match self.upstream_client.simple_query_raw(&query_string).await {
            Ok(mut resp) => {
                while let Ok(message) = resp.responses.next().await {
                    match message {
                        Message::CommandComplete(body) => {
                            client
                                .send(PgWireBackendMessage::CommandComplete(CommandComplete::new(
                                    String::from(body.tag().unwrap()),
                                )))
                                .await?;
                        }
                        Message::EmptyQueryResponse => {
                            client
                                .feed(PgWireBackendMessage::EmptyQueryResponse(
                                    EmptyQueryResponse::new(),
                                ))
                                .await?;
                        }
                        Message::RowDescription(body) => {
                            let fields = body
                                .fields()
                                .map_err(|err| PgWireError::ApiError(Box::new(err)))
                                .map(|field| {
                                    Ok(FieldDescription::new(
                                        field.name().to_string(),
                                        field.table_oid() as i32,
                                        field.column_id(),
                                        field.type_oid(),
                                        field.type_size(),
                                        field.type_modifier(),
                                        field.format(),
                                    ))
                                })
                                .collect()?;
                            client
                                .send(PgWireBackendMessage::RowDescription(RowDescription::new(
                                    fields,
                                )))
                                .await?;
                        }
                        Message::DataRow(body) => {
                            client
                                .feed(PgWireBackendMessage::DataRow(DataRow::new(
                                    BytesMut::from(body.buffer()),
                                    body.len() as i16,
                                )))
                                .await?;
                        }
                        Message::ReadyForQuery(body) => {
                            client
                                .send(PgWireBackendMessage::ReadyForQuery(ReadyForQuery::new(
                                    TransactionStatus::try_from(body.status()).unwrap(),
                                )))
                                .await?;
                        }
                        _ => {
                            unreachable!()
                        }
                    }
                }
            }
            Err(err) => {
                return Err(PgWireError::ApiError(Box::new(err)));
            }
        }

        Ok(())
    }

    async fn do_query<'a, C>(
        &self,
        _client: &mut C,
        _query: &'a str,
    ) -> PgWireResult<Vec<Response<'a>>>
    where
        C: ClientInfo + Unpin + Send + Sync,
    {
        Ok(vec![])
    }
}

impl PostgresBackend {
    async fn new() -> PostgresBackend {
        let (client, connection) =
            tokio_postgres::connect("host=127.0.0.1 user=postgres password=test", NoTls)
                .await
                .expect("Cannot client upstream connection");
        tokio::spawn(async move {
            if let Err(e) = connection.await {
                eprintln!("Upstream connection error: {}", e);
            }
        });

        PostgresBackend {
            upstream_client: Arc::new(client),
        }
    }
}

struct PostgresBackendFactory {
    handler: Arc<PostgresBackend>,
}

impl PgWireHandlerFactory for PostgresBackendFactory {
    type StartupHandler =
        Md5PasswordAuthStartupHandler<DummyAuthSource, DefaultServerParameterProvider>;
    type SimpleQueryHandler = PostgresBackend;
    type ExtendedQueryHandler = PlaceholderExtendedQueryHandler;
    type CopyHandler = NoopCopyHandler;

    fn simple_query_handler(&self) -> Arc<Self::SimpleQueryHandler> {
        self.handler.clone()
    }

    fn extended_query_handler(&self) -> Arc<Self::ExtendedQueryHandler> {
        Arc::new(PlaceholderExtendedQueryHandler)
    }

    fn startup_handler(&self) -> Arc<Self::StartupHandler> {
        let mut parameters = DefaultServerParameterProvider::default();
        parameters.server_version = rusqlite::version().to_owned();

        Arc::new(Md5PasswordAuthStartupHandler::new(
            Arc::new(DummyAuthSource),
            Arc::new(parameters),
        ))
    }

    fn copy_handler(&self) -> Arc<Self::CopyHandler> {
        Arc::new(NoopCopyHandler)
    }
}

#[tokio::main]
pub async fn main() {
    let factory = Arc::new(PostgresBackendFactory {
        handler: Arc::new(PostgresBackend::new().await),
    });

    let server_addr = "127.0.0.1:5433";
    let listener = TcpListener::bind(server_addr).await.unwrap();
    println!("Listening to {}", server_addr);
    loop {
        let incoming_socket = listener.accept().await.unwrap();
        let factory_ref = factory.clone();

        tokio::spawn(async move { process_socket(incoming_socket.0, None, factory_ref).await });
    }
}

@sunng87
Copy link
Owner

sunng87 commented Nov 3, 2024

@tbicr That's true. That's why I only use tokio-postgres for a POC. If we are going to build a proxy on top of pgwire, we will need to implement a low level client using pgwire's codec. It will have access to every message between frontend and backend.

@tbicr
Copy link
Contributor

tbicr commented Nov 3, 2024

IMHO tokio_postgres already has quite good enough low level InnerClient (except it private), I see real power for my case combining low level message passing for that I don't need to change and hi level api for custom logic (query replacing before it executed)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants