Skip to content

Logical Replication #924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: 16/edge
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ options:
Enables tracking of function call counts and time used. Specify pl to track only procedural-language functions
type: string
default: "none"
logical_replication_subscription_request:
type: string
default: "{}"
memory_maintenance_work_mem:
description: |
Sets the maximum memory (KB) to be used for maintenance operations.
Expand Down
319 changes: 318 additions & 1 deletion lib/charms/postgresql_k8s/v1/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,54 @@ class PostgreSQLUpdateUserPasswordError(Exception):
"""Exception raised when updating a user password fails."""


class PostgreSQLDatabaseExistsError(Exception):
"""Exception raised during database existence check."""


class PostgreSQLTableExistsError(Exception):
"""Exception raised during table existence check."""


class PostgreSQLIsTableEmptyError(Exception):
"""Exception raised during table emptiness check."""


class PostgreSQLCreatePublicationError(Exception):
"""Exception raised when creating PostgreSQL publication."""


class PostgreSQLPublicationExistsError(Exception):
"""Exception raised during PostgreSQL publication existence check."""


class PostgreSQLAlterPublicationError(Exception):
"""Exception raised when altering PostgreSQL publication."""


class PostgreSQLDropPublicationError(Exception):
"""Exception raised when dropping PostgreSQL publication."""


class PostgreSQLCreateSubscriptionError(Exception):
"""Exception raised when creating PostgreSQL subscription."""


class PostgreSQLSubscriptionExistsError(Exception):
"""Exception raised during PostgreSQL subscription existence check."""


class PostgreSQLUpdateSubscriptionError(Exception):
"""Exception raised when updating PostgreSQL subscription."""


class PostgreSQLRefreshSubscriptionError(Exception):
"""Exception raised when refreshing PostgreSQL subscription."""


class PostgreSQLDropSubscriptionError(Exception):
"""Exception raised when dropping PostgreSQL subscription."""


class PostgreSQLCreatePredefinedRolesError(Exception):
"""Exception raised when creating predefined roles."""

Expand Down Expand Up @@ -276,6 +324,7 @@ def create_user(
user: str,
password: Optional[str] = None,
admin: bool = False,
replication: bool = False,
extra_user_roles: Optional[List[str]] = None,
) -> None:
"""Creates a database user.
Expand All @@ -284,6 +333,7 @@ def create_user(
user: user to be created.
password: password to be assigned to the user.
admin: whether the user should have additional admin privileges.
replication: whether the user should have replication privileges.
extra_user_roles: additional privileges and/or roles to be assigned to the user.
"""
try:
Expand Down Expand Up @@ -317,7 +367,7 @@ def create_user(
user_definition = "ALTER ROLE {}"
else:
user_definition = "CREATE ROLE {} "
user_definition += f"WITH LOGIN{' SUPERUSER' if admin else ''} ENCRYPTED PASSWORD '{password}'"
user_definition += f"WITH LOGIN{' SUPERUSER' if admin else ''}{' REPLICATION' if replication else ''} ENCRYPTED PASSWORD '{password}'"
if privileges:
user_definition += f" {' '.join(privileges)}"
cursor.execute(SQL("BEGIN;"))
Expand Down Expand Up @@ -492,6 +542,74 @@ def grant_relation_access_group_memberships(self) -> None:
if connection is not None:
connection.close()

def grant_replication_privileges(
self,
user: str,
database: str,
schematables: list[str],
old_schematables: list[str] | None = None,
) -> None:
"""Grant CONNECT privilege on database and SELECT privilege on tables.

Args:
user: target user for privileges grant.
database: database to grant CONNECT privilege on.
schematables: list of tables with schema notation to grant SELECT privileges on.
old_schematables: list of tables with schema notation to revoke all privileges from.
"""
with self._connect_to_database(database=database) as connection, connection.cursor() as cursor:
cursor.execute(
SQL("GRANT CONNECT ON DATABASE {} TO {};").format(
Identifier(database), Identifier(user)
)
)
if old_schematables:
cursor.execute(
SQL("REVOKE ALL PRIVILEGES ON TABLE {} FROM {};").format(
SQL(",").join(
Identifier(schematable.split(".")[0], schematable.split(".")[1])
for schematable in old_schematables
),
Identifier(user),
)
)
cursor.execute(
SQL("GRANT SELECT ON TABLE {} TO {};").format(
SQL(",").join(
Identifier(schematable.split(".")[0], schematable.split(".")[1])
for schematable in schematables
),
Identifier(user),
)
)

def revoke_replication_privileges(
self, user: str, database: str, schematables: list[str]
) -> None:
"""Revoke all privileges from tables and database.

Args:
user: target user for privileges revocation.
database: database to remove all privileges from.
schematables: list of tables with schema notation to revoke all privileges from.
"""
with self._connect_to_database(database=database) as connection, connection.cursor() as cursor:
cursor.execute(
SQL("REVOKE ALL PRIVILEGES ON TABLE {} FROM {};").format(
SQL(",").join(
Identifier(schematable.split(".")[0], schematable.split(".")[1])
for schematable in schematables
),
Identifier(user),
)
)
cursor.execute(
SQL("REVOKE ALL PRIVILEGES ON DATABASE {} FROM {};").format(
Identifier(database), Identifier(user)
)
)
pass

def enable_disable_extensions(
self, extensions: Dict[str, bool], database: Optional[str] = None
) -> None:
Expand Down Expand Up @@ -867,6 +985,205 @@ def is_restart_pending(self) -> bool:
if connection:
connection.close()

def database_exists(self, db: str) -> bool:
"""Check whether specified database exists."""
try:
with self._connect_to_database() as connection, connection.cursor() as cursor:
cursor.execute(
SQL("SELECT datname FROM pg_database WHERE datname={};").format(Literal(db))
)
return cursor.fetchone() is not None
except psycopg2.Error as e:
logger.error(f"Failed to check Postgresql database existence: {e}")
raise PostgreSQLDatabaseExistsError() from e

def table_exists(self, db: str, schema: str, table: str) -> bool:
"""Check whether specified table in database exists."""
try:
with self._connect_to_database(database=db) as connection, connection.cursor() as cursor:
cursor.execute(
SQL(
"SELECT tablename FROM pg_tables WHERE schemaname={} AND tablename={};"
).format(Literal(schema), Literal(table))
)
return cursor.fetchone() is not None
except psycopg2.Error as e:
logger.error(f"Failed to check Postgresql table existence: {e}")
raise PostgreSQLTableExistsError() from e

def is_table_empty(self, db: str, schema: str, table: str) -> bool:
"""Check whether table is empty."""
try:
with (
self._connect_to_database(database=db) as connection,
connection.cursor() as cursor,
):
cursor.execute(SQL("SELECT COUNT(1) FROM {};").format(Identifier(schema, table)))
return cursor.fetchone()[0] == 0
except psycopg2.Error as e:
logger.error(f"Failed to check whether table is empty: {e}")
raise PostgreSQLIsTableEmptyError() from e

def create_publication(self, db: str, name: str, schematables: list[str]) -> None:
"""Create PostgreSQL publication."""
try:
with self._connect_to_database(database=db) as connection, connection.cursor() as cursor:
cursor.execute(
SQL("CREATE PUBLICATION {} FOR TABLE {};").format(
Identifier(name),
SQL(",").join(
Identifier(schematable.split(".")[0], schematable.split(".")[1])
for schematable in schematables
),
)
)
except psycopg2.Error as e:
logger.error(f"Failed to create Postgresql publication: {e}")
raise PostgreSQLCreatePublicationError() from e

def publication_exists(self, db: str, publication: str) -> bool:
"""Check whether specified subscription in database exists."""
try:
with self._connect_to_database(database=db) as connection, connection.cursor() as cursor:
cursor.execute(
SQL("SELECT pubname FROM pg_publication WHERE pubname={};").format(
Literal(publication)
)
)
return cursor.fetchone() is not None
except psycopg2.Error as e:
logger.error(f"Failed to check Postgresql publication existence: {e}")
raise PostgreSQLPublicationExistsError() from e

def alter_publication(self, db: str, name: str, schematables: list[str]) -> None:
"""Alter PostgreSQL publication."""
try:
with self._connect_to_database(database=db) as connection, connection.cursor() as cursor:
cursor.execute(
SQL("ALTER PUBLICATION {} SET TABLE {};").format(
Identifier(name),
SQL(",").join(
Identifier(schematable.split(".")[0], schematable.split(".")[1])
for schematable in schematables
),
)
)
except psycopg2.Error as e:
logger.error(f"Failed to alter Postgresql publication: {e}")
raise PostgreSQLAlterPublicationError() from e

def drop_publication(self, db: str, publication: str) -> None:
"""Drop PostgreSQL publication."""
try:
with self._connect_to_database(database=db) as connection, connection.cursor() as cursor:
cursor.execute(
SQL("DROP PUBLICATION IF EXISTS {};").format(
Identifier(publication),
)
)
except psycopg2.Error as e:
logger.error(f"Failed to drop Postgresql publication: {e}")
raise PostgreSQLDropPublicationError() from e

def create_subscription(
self,
subscription: str,
host: str,
db: str,
user: str,
password: str,
publication: str,
replication_slot: str,
) -> None:
"""Create PostgreSQL subscription."""
try:
with (
self._connect_to_database(database=db) as connection,
connection.cursor() as cursor,
):
cursor.execute(
SQL(
"CREATE SUBSCRIPTION {} CONNECTION {} PUBLICATION {} WITH (copy_data=true,create_slot=false,enabled=true,slot_name={});"
).format(
Identifier(subscription),
Literal(f"host={host} dbname={db} user={user} password={password}"),
Identifier(publication),
Identifier(replication_slot),
)
)
except psycopg2.Error as e:
logger.error(f"Failed to create Postgresql subscription: {e}")
raise PostgreSQLCreateSubscriptionError() from e

def subscription_exists(self, db: str, subscription: str) -> bool:
"""Check whether specified subscription in database exists."""
try:
with self._connect_to_database(database=db) as connection, connection.cursor() as cursor:
cursor.execute(
SQL("SELECT subname FROM pg_subscription WHERE subname={};").format(
Literal(subscription)
)
)
return cursor.fetchone() is not None
except psycopg2.Error as e:
logger.error(f"Failed to check Postgresql subscription existence: {e}")
raise PostgreSQLSubscriptionExistsError() from e

def update_subscription(self, db: str, subscription: str, host: str, user: str, password: str):
"""Update PostgreSQL subscription connection details."""
try:
with self._connect_to_database(database=db) as connection, connection.cursor() as cursor:
cursor.execute(
SQL("ALTER SUBSCRIPTION {} CONNECTION {}").format(
Identifier(subscription),
Literal(f"host={host} dbname={db} user={user} password={password}"),
)
)
except psycopg2.Error as e:
logger.error(f"Failed to update Postgresql subscription: {e}")
raise PostgreSQLUpdateSubscriptionError() from e

def refresh_subscription(self, db: str, subscription: str):
"""Refresh PostgreSQL subscription to pull publication changes."""
connection = None
try:
connection = self._connect_to_database(database=db)
with connection.cursor() as cursor:
cursor.execute(
SQL("ALTER SUBSCRIPTION {} REFRESH PUBLICATION").format(
Identifier(subscription)
)
)
except psycopg2.Error as e:
logger.error(f"Failed to refresh Postgresql subscription: {e}")
raise PostgreSQLRefreshSubscriptionError() from e
finally:
if connection is not None:
connection.close()

def drop_subscription(self, db: str, subscription: str) -> None:
"""Drop PostgreSQL subscription."""
try:
with self._connect_to_database(database=db) as connection, connection.cursor() as cursor:
cursor.execute(
SQL("ALTER SUBSCRIPTION {} DISABLE;").format(
Identifier(subscription),
)
)
cursor.execute(
SQL("ALTER SUBSCRIPTION {} SET (slot_name=NONE);").format(
Identifier(subscription),
)
)
cursor.execute(
SQL("DROP SUBSCRIPTION {};").format(
Identifier(subscription),
)
)
except psycopg2.Error as e:
logger.error(f"Failed to drop Postgresql subscription: {e}")
raise PostgreSQLDropSubscriptionError() from e

@staticmethod
def build_postgresql_group_map(group_map: Optional[str]) -> List[Tuple]:
"""Build the PostgreSQL authorization group-map.
Expand Down
7 changes: 7 additions & 0 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ provides:
interface: postgresql_async
limit: 1
optional: true
logical-replication-offer:
interface: postgresql_logical_replication
optional: true
database:
interface: postgresql_client
cos-agent:
Expand All @@ -45,6 +48,10 @@ requires:
interface: tls-certificates
limit: 1
optional: true
logical-replication:
interface: postgresql_logical_replication
limit: 1
optional: true
client-certificates:
interface: tls-certificates
limit: 1
Expand Down
Loading
Loading