Skip to content

save #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft

save #78

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ docs
web
.github
.ropeproject
integration
pgdog/tests
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ WORKDIR /build

RUN rm /bin/sh && ln -s /bin/bash /bin/sh
RUN source ~/.cargo/env && \
cargo build --release
cargo build

FROM ubuntu:latest
ENV RUST_LOG=info
RUN apt update && \
apt install -y ca-certificates && \
update-ca-certificates

COPY --from=builder /build/target/release/pgdog /usr/local/bin/pgdog
COPY --from=builder /build/target/debug/pgdog /usr/local/bin/pgdog

WORKDIR /pgdog
STOPSIGNAL SIGINT
Expand Down
23 changes: 23 additions & 0 deletions examples/gel/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
services:
gel:
image: geldata/gel:latest
environment:
GEL_SERVER_PASSWORD: gel
GEL_SERVER_TLS_CERT_MODE: generate_self_signed
GEL_SERVER_BACKEND_DSN: postgres://postgres:postgres@pgdog:6432/postgres
postgres:
image: postgres:latest
environment:
POSTGRES_PASSWORD: postgres
pgdog:
build:
dockerfile: Dockerfile
context: ../../
environment:
RUST_LOG: trace
command:
- "/usr/local/bin/pgdog"
- "--database-url=postgres://postgres:postgres@postgres:5432/E___edgedbsys__"
- "--database-url=postgres://postgres:postgres@postgres:5432/postgres"
- "run"
- "--min-pool-size=0"
3,828 changes: 3,828 additions & 0 deletions examples/gel/log.txt

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions examples/gel/pgdog.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[[databases]]
host = "postgres"
1 change: 1 addition & 0 deletions pgdog/src/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod show_config;
pub mod show_lists;
pub mod show_peers;
pub mod show_pools;
pub mod show_prepared_statements;
pub mod show_query_cache;
pub mod show_servers;
pub mod show_stats;
Expand Down
11 changes: 9 additions & 2 deletions pgdog/src/admin/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use super::{
pause::Pause, prelude::Message, reconnect::Reconnect, reload::Reload,
reset_query_cache::ResetQueryCache, setup_schema::SetupSchema, show_clients::ShowClients,
show_config::ShowConfig, show_lists::ShowLists, show_peers::ShowPeers, show_pools::ShowPools,
show_query_cache::ShowQueryCache, show_servers::ShowServers, show_stats::ShowStats,
show_version::ShowVersion, shutdown::Shutdown, Command, Error,
show_prepared_statements::ShowPreparedStatements, show_query_cache::ShowQueryCache,
show_servers::ShowServers, show_stats::ShowStats, show_version::ShowVersion,
shutdown::Shutdown, Command, Error,
};

use tracing::debug;
Expand All @@ -27,6 +28,7 @@ pub enum ParseResult {
SetupSchema(SetupSchema),
Shutdown(Shutdown),
ShowLists(ShowLists),
ShowPreparedStatements(ShowPreparedStatements),
}

impl ParseResult {
Expand All @@ -50,6 +52,7 @@ impl ParseResult {
SetupSchema(setup_schema) => setup_schema.execute().await,
Shutdown(shutdown) => shutdown.execute().await,
ShowLists(show_lists) => show_lists.execute().await,
ShowPreparedStatements(show) => show.execute().await,
}
}

Expand All @@ -73,6 +76,7 @@ impl ParseResult {
SetupSchema(setup_schema) => setup_schema.name(),
Shutdown(shutdown) => shutdown.name(),
ShowLists(show_lists) => show_lists.name(),
ShowPreparedStatements(show) => show.name(),
}
}
}
Expand Down Expand Up @@ -101,6 +105,9 @@ impl Parser {
"stats" => ParseResult::ShowStats(ShowStats::parse(&sql)?),
"version" => ParseResult::ShowVersion(ShowVersion::parse(&sql)?),
"lists" => ParseResult::ShowLists(ShowLists::parse(&sql)?),
"prepared" => {
ParseResult::ShowPreparedStatements(ShowPreparedStatements::parse(&sql)?)
}
command => {
debug!("unknown admin show command: '{}'", command);
return Err(Error::Syntax);
Expand Down
29 changes: 29 additions & 0 deletions pgdog/src/admin/show_prepared_statements.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use crate::frontend::PreparedStatements;

use super::prelude::*;

#[derive(Debug, Clone)]
pub struct ShowPreparedStatements;

#[async_trait]
impl Command for ShowPreparedStatements {
fn name(&self) -> String {
"SHOW PREPARED STATEMENTS".into()
}

fn parse(_: &str) -> Result<Self, Error> {
Ok(Self)
}

async fn execute(&self) -> Result<Vec<Message>, Error> {
let statements = PreparedStatements::global().lock().clone();
let mut messages =
vec![RowDescription::new(&[Field::text("name"), Field::text("statement")]).message()?];
for (name, parse) in statements.names() {
let mut dr = DataRow::new();
dr.add(name).add(parse.query());
messages.push(dr.message()?);
}
Ok(messages)
}
}
4 changes: 4 additions & 0 deletions pgdog/src/backend/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,10 @@ impl Server {
}
}

pub fn mark_prepared(&mut self, name: &str) {
self.prepared_statements.prepared(name);
}

pub async fn describe_statement(&mut self, name: &str) -> Result<Vec<Message>, Error> {
if !self.in_sync() {
return Err(Error::NotInSync);
Expand Down
2 changes: 1 addition & 1 deletion pgdog/src/frontend/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Buffer {
pub fn full(&self) -> bool {
if let Some(message) = self.buffer.last() {
// Flush (F) | Sync (F) | Query (F) | CopyDone (F) | CopyFail (F)
if matches!(message.code(), 'H' | 'S' | 'Q' | 'c' | 'f') {
if matches!(message.code(), 'H' | 'S' | 'Q' | 'c' | 'f' | 'E') {
return true;
}

Expand Down
31 changes: 31 additions & 0 deletions pgdog/src/frontend/client/extended.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
//! Extended protocol state machine.

use std::collections::VecDeque;

use crate::net::{Message, Protocol};

#[derive(Debug, Clone, PartialEq)]
enum Action {
Forward,
Drop,
}

#[derive(Default, Debug, Clone)]
pub struct Extended {
received: VecDeque<char>,
queue: VecDeque<Action>,
}

impl Extended {
pub fn forward(&mut self) {
self.queue.push_back(Action::Forward);
}

pub fn swallow(&mut self) {
self.queue.push_back(Action::Drop);
}

pub fn should_forward(&mut self) -> bool {
self.queue.pop_front().unwrap_or(Action::Forward) == Action::Forward
}
}
5 changes: 4 additions & 1 deletion pgdog/src/frontend/client/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{

use tracing::debug;

use super::{Client, Error};
use super::{extended::Extended, Client, Error};

/// Mutable internals used by both client and server message handlers.
///
Expand All @@ -32,6 +32,8 @@ pub(super) struct Inner {
pub(super) start_transaction: Option<BufferedQuery>,
/// Client-wide comms.
pub(super) comms: Comms,
/// Extended queue.
pub(super) extended: Extended,
}

impl Inner {
Expand Down Expand Up @@ -63,6 +65,7 @@ impl Inner {
async_: false,
start_transaction: None,
comms: client.comms.clone(),
extended: Extended::default(),
})
}

Expand Down
27 changes: 27 additions & 0 deletions pgdog/src/frontend/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::net::messages::{
use crate::net::{parameter::Parameters, Stream};

pub mod counter;
pub mod extended;
pub mod inner;

use inner::{Inner, InnerBorrow};
Expand Down Expand Up @@ -248,6 +249,30 @@ impl Client {

self.streaming = matches!(command, Some(Command::StartReplication));

match command {
Some(Command::Prepare { name, statement }) => {
self.prepared_statements.manual_prepare(&name, &statement)
}
Some(Command::Execute { name, params }) => {
self.prepared_statements.manual_execute(&name);
}
Some(Command::Multiple(ref commands)) => {
for command in commands {
match command {
Command::Prepare { name, statement } => {
self.prepared_statements.manual_prepare(&name, &statement)
}
Command::Execute { name, params } => {
self.prepared_statements.manual_execute(&name);
}
_ => (),
}
}
}

_ => (),
}

if !connected {
match command {
Some(Command::StartTransaction(query)) => {
Expand Down Expand Up @@ -453,6 +478,8 @@ impl Client {
timer.unwrap().elapsed().as_secs_f64() * 1000.0
);

println!("buffer: {:?}", buffer);

Ok(buffer)
}

Expand Down
12 changes: 8 additions & 4 deletions pgdog/src/frontend/prepared_statements/global_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ fn global_name(counter: usize) -> String {
}

#[derive(Debug, Clone)]
struct StoredParse {
parse: Parse,
row_description: Option<RowDescription>,
pub struct StoredParse {
pub parse: Parse,
pub row_description: Option<RowDescription>,
}

impl StoredParse {
Expand All @@ -31,7 +31,7 @@ struct CacheKey {
data_types: Arc<Vec<i32>>,
}

#[derive(Default, Debug)]
#[derive(Default, Debug, Clone)]
pub struct GlobalCache {
statements: HashMap<CacheKey, usize>,
names: HashMap<String, StoredParse>, // Ideally this holds an entry to `statements`. Maybe an Arc?
Expand Down Expand Up @@ -94,4 +94,8 @@ impl GlobalCache {
pub fn is_empty(&self) -> bool {
self.len() == 0
}

pub fn names(&self) -> &HashMap<String, StoredParse> {
&self.names
}
}
14 changes: 14 additions & 0 deletions pgdog/src/frontend/prepared_statements/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,20 @@ impl PreparedStatements {
Ok(message)
}

/// Record a statement prepared manually.
pub fn manual_prepare(&mut self, name: &str, statement: &str) {
let parse = Parse::named(name, statement);
self.insert(parse);
}

pub fn manual_execute(&mut self, name: &str) {
let actual = self.name(name);
println!("actual: {:?}, name: {}", actual, name);
if let Some(actual) = actual {
self.requests.push(PreparedRequest::new(actual, false));
}
}

/// Register prepared statement with the global cache.
fn insert(&mut self, parse: Parse) -> Parse {
let (_new, name) = { self.global.lock().insert(&parse) };
Expand Down
Loading
Loading