Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Bash commands

- `cargo check` to test that the code compiles. It shouldn't contain warnings. This is quicker than `cargo build`.
- `cargo fmt` to reformat code according to Rust standards.
- `cargo nextest run <test name>` to run a specific test
- `cargo nextest run --test-threads=1` to run all tests. Make sure to use `--test-threads=1` because some tests conflict with each other.

# Code style

Use standard Rust code style. Use `cargo fmt` to reformat code automatically after every edit.

# Workflow

- Prefer to run individual tests with `cargo nextest run <name of the test here>`. This is much faster.
- A local PostgreSQL server is required for some tests to pass. Set it up and create a database called "pgdog". Create a user called "pgdog" with password "pgdog".
- Ignore files in all folders except `./pgdog`.

# About the project

PgDog is a connection pooler for Postgres that can shard databases. It implements the Postgres network protocol and uses pg_query to parse SQL queries. It aims to be 100% compatible with Postgres, without clients knowing they are talking to a proxy.
14 changes: 7 additions & 7 deletions integration/logical/pgdog.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ database_name = "pgdog"
min_pool_size = 0
shard = 0

[[databases]]
name = "destination"
host = "127.0.0.1"
port = 5434
database_name = "pgdog"
min_pool_size = 0
shard = 1
# [[databases]]
# name = "destination"
# host = "127.0.0.1"
# port = 5434
# database_name = "pgdog"
# min_pool_size = 0
# shard = 1

[[sharded_tables]]
database = "destination"
Expand Down
3 changes: 3 additions & 0 deletions pgdog/src/backend/schema/sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ pub enum Error {

#[error("cluster has no databases")]
NoDatabases,

#[error("missing entity in dump")]
MissingEntity,
}
83 changes: 72 additions & 11 deletions pgdog/src/backend/schema/sync/pg_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
Cluster,
},
config::config,
frontend::router::parser::Table,
frontend::router::parser::{sequence::Sequence, Column, Table},
};

use tokio::process::Command;
Expand Down Expand Up @@ -158,8 +158,10 @@ pub struct PgDumpOutput {
pub enum SyncState {
PreData,
PostData,
Cutover,
}

#[derive(Debug)]
pub enum Statement<'a> {
Index {
table: Table<'a>,
Expand All @@ -175,15 +177,28 @@ pub enum Statement<'a> {
Other {
sql: &'a str,
},

SequenceOwner {
column: Column<'a>,
sequence: Sequence<'a>,
sql: &'a str,
},

SequenceSetMax {
sequence: Sequence<'a>,
sql: String,
},
}

impl<'a> Deref for Statement<'a> {
type Target = &'a str;
type Target = str;
fn deref(&self) -> &Self::Target {
match self {
Self::Index { sql, .. } => sql,
Self::Table { sql, .. } => sql,
Self::Other { sql } => sql,
Self::Index { sql, .. } => *sql,
Self::Table { sql, .. } => *sql,
Self::SequenceOwner { sql, .. } => *sql,
Self::Other { sql } => *sql,
Self::SequenceSetMax { sql, .. } => sql.as_str(),
}
}
}
Expand Down Expand Up @@ -276,9 +291,30 @@ impl PgDumpOutput {
}
}

NodeEnum::AlterSeqStmt(_stmt) => {
if state == SyncState::PreData {
result.push(original.into());
NodeEnum::AlterSeqStmt(stmt) => {
if matches!(state, SyncState::PreData | SyncState::Cutover) {
let sequence = stmt
.sequence
.as_ref()
.map(Table::from)
.ok_or(Error::MissingEntity)?;
let sequence = Sequence::from(sequence);
let column = stmt.options.first().ok_or(Error::MissingEntity)?;
let column =
Column::try_from(column).map_err(|_| Error::MissingEntity)?;

if state == SyncState::PreData {
result.push(Statement::SequenceOwner {
column,
sequence,
sql: original,
});
} else {
let sql = sequence
.setval_from_column(&column)
.map_err(|_| Error::MissingEntity)?;
result.push(Statement::SequenceSetMax { sequence, sql })
}
}
}

Expand Down Expand Up @@ -381,7 +417,9 @@ mod test {
.await
.unwrap();

let output = output.statements(SyncState::PreData).unwrap();
let output_pre = output.statements(SyncState::PreData).unwrap();
let output_post = output.statements(SyncState::PostData).unwrap();
let output_cutover = output.statements(SyncState::Cutover).unwrap();

let mut dest = test_server().await;
dest.execute("DROP SCHEMA IF EXISTS test_pg_dump_execute_dest CASCADE")
Expand All @@ -395,7 +433,7 @@ mod test {
.await
.unwrap();

for stmt in output {
for stmt in output_pre {
// Hack around us using the same database as destination.
// I know, not very elegant.
let stmt = stmt.replace("pgdog.", "test_pg_dump_execute_dest.");
Expand All @@ -408,7 +446,30 @@ mod test {
.unwrap();
assert_eq!(id[0], i + 1); // Sequence has made it over.

// Unique index has not made it over tho.
// Unique index didn't make it over.
}

dest.execute("DELETE FROM test_pg_dump_execute_dest.test_pg_dump_execute")
.await
.unwrap();

for stmt in output_post {
let stmt = stmt.replace("pgdog.", "test_pg_dump_execute_dest.");
dest.execute(stmt).await.unwrap();
}

let q = "INSERT INTO test_pg_dump_execute_dest.test_pg_dump_execute VALUES (DEFAULT, 'test@test', NOW()) RETURNING id";
assert!(dest.execute(q).await.is_ok());
let err = dest.execute(q).await.err().unwrap();
assert!(err.to_string().contains(
r#"duplicate key value violates unique constraint "test_pg_dump_execute_email_key""#
)); // Unique index made it over.

assert_eq!(output_cutover.len(), 1);
for stmt in output_cutover {
let stmt = stmt.replace("pgdog.", "test_pg_dump_execute_dest.");
assert!(stmt.starts_with("SELECT setval('"));
dest.execute(stmt).await.unwrap();
}

dest.execute("DROP SCHEMA test_pg_dump_execute_dest CASCADE")
Expand Down
10 changes: 10 additions & 0 deletions pgdog/src/backend/schema/sync/progress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ pub enum Item {
schema: String,
name: String,
},
// SequenceOwner {
// sequence: String,
// owner: String,
// },
Other {
sql: String,
},
Expand Down Expand Up @@ -89,6 +93,12 @@ impl From<&Statement<'_>> for Item {
Statement::Other { sql } => Item::Other {
sql: sql.to_string(),
},
Statement::SequenceOwner { sql, .. } => Item::Other {
sql: sql.to_string(),
},
Statement::SequenceSetMax { sql, .. } => Item::Other {
sql: sql.to_string(),
},
}
}
}
Expand Down
Loading
Loading