Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions psqlextra/backend/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ def add_list_partition(
"""

# asserts the model is a model set up for partitioning
self._partitioning_properties_for_model(model)
meta = self._partitioning_properties_for_model(model)

table_name = self.create_partition_table_name(model, name)

Expand All @@ -762,6 +762,22 @@ def add_list_partition(
",".join(["%s" for _ in range(len(values))]),
)

if getattr(meta, "sub_key", None) and len(meta.sub_key) > 0:
sub_partitioning_key_sql = ", ".join(
self.quote_name(field_name) for field_name in meta.sub_key
)

last_brace_idx = sql.rfind(")") + 1
sql = (
sql[:last_brace_idx]
+ self.sql_partition_by
% (
meta.sub_method.upper(),
sub_partitioning_key_sql,
)
+ sql[last_brace_idx:]
)

with transaction.atomic(using=self.connection.alias):
self.execute(sql, values)

Expand Down Expand Up @@ -1100,9 +1116,17 @@ def _partitioning_properties_for_model(model: Type[Model]):
)
% model.__name__
)

if meta.sub_method and len(meta.sub_key)==0:
raise ImproperlyConfigured(
(
"Model '%s' is not properly configured to be partitioned."
" 'sub_method' is specified '%s', but no 'sub_key' is defined."
)
% (model.__name__, meta.sub_method)
)

try:
for field_name in meta.key:
for field_name in meta.key + meta.sub_key:
model._meta.get_field(field_name)
except FieldDoesNotExist:
raise ImproperlyConfigured(
Expand Down
2 changes: 2 additions & 0 deletions psqlextra/contrib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from .model_data_migrator import PostgresModelDataMigrator
from .static_row import StaticRowQueryCompiler, StaticRowQuerySet
from .transaction import no_transaction
from .category_current_time import partition_by_category_and_current_time

__all__ = [
"PostgresModelDataMigrator",
"PostgresModelDataMigratorState" "StaticRowQuery",
"StaticRowQueryCompiler",
"StaticRowQuerySet",
"no_transaction",
"partition_by_category_and_current_time",
]
3 changes: 3 additions & 0 deletions psqlextra/contrib/category_current_time/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .shorthands import partition_by_category_and_current_time

__all__ = ["partition_by_category_and_current_time"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from datetime import datetime, timezone
from typing import Any, Generator, Optional
from dateutil.relativedelta import relativedelta

from psqlextra.partitioning.strategy import PostgresPartitioningStrategy
from psqlextra.partitioning.time_partition_size import PostgresTimePartitionSize

from .partition import PostgresListPartition, PostgresTimeSubPartition


class PostgresCategoryCurrentTimePartitioningStrategy(PostgresPartitioningStrategy):
"""Implments a category and time based partitioning strategy where
on the first level a partition is created for each category and on the
second level each partition contains values for a specific time period.

All buckets will be equal in size and start at the start of the
unit. With monthly partitioning, partitions start on the 1st and
with weekly partitioning, partitions start on monday, with hourly
partitioning, partitions start at 00:00.
"""

def __init__(
self,
categories: list[Any],
size: PostgresTimePartitionSize,
count: int,
max_age: Optional[relativedelta] = None,
name_format: Optional[tuple[str, str]] = None,
) -> None:
"""Initializes a new instance of :see:PostgresTimePartitioningStrategy.

Arguments:
categories:
The list of categories to create first-level partitions for.

size:
The size of each partition.

count:
The amount of partitions to create ahead
from the current date/time.

max_age:
Maximum age of a partition. Partitions
older than this are deleted during
auto cleanup.

name_format:
Optional name format for the partitions supplied as a tuple. The first value is used
for the category partitions, the second for the time partitions.
"""

self.size = size
self.count = count
self.max_age = max_age
self.name_format = name_format or (None, None)
self.categories = categories

def to_create(self) -> Generator[PostgresTimeSubPartition, None, None]:
for category in self.categories:
lp = PostgresListPartition(
values=[category], name_format=self.name_format[0]
)
yield lp

current_datetime = self.size.start(self.get_start_datetime())

for _ in range(self.count):
yield PostgresTimeSubPartition(
parent_partition=lp,
start_datetime=current_datetime,
size=self.size,
name_format=self.name_format[1],
)

current_datetime += self.size.as_delta()

def to_delete(self) -> Generator[PostgresTimeSubPartition, None, None]:
if not self.max_age:
return

current_datetime = self.size.start(self.get_start_datetime() - self.max_age)

while True:
for category in self.categories:
lp = PostgresListPartition(
values=[category], name_format=self.name_format[0]
)

yield PostgresTimeSubPartition(
parent_partition=lp,
start_datetime=current_datetime,
size=self.size,
name_format=self.name_format[1],
)

current_datetime -= self.size.as_delta()

def get_start_datetime(self) -> datetime:
return datetime.now(timezone.utc)
120 changes: 120 additions & 0 deletions psqlextra/contrib/category_current_time/partition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from datetime import datetime
from typing import Any, Optional, Type

from contextlib import contextmanager

from psqlextra.backend.schema import PostgresSchemaEditor
from psqlextra.models import PostgresPartitionedModel

from psqlextra.partitioning.time_partition import (
PostgresTimePartition,
PostgresTimePartitionSize,
)

from psqlextra.partitioning import PostgresPartition
from psqlextra.partitioning.error import PostgresPartitioningError

@contextmanager
def patch_model(model: Type[PostgresPartitionedModel], parent_partition_name: str):
"""Context manager that ensures the model's table is created and
deleted properly.
"""
original_table = model._meta.db_table
try:
model._meta.db_table = original_table + "_" + parent_partition_name
yield model
finally:
model._meta.db_table = original_table

class PostgresTimeSubPartition(PostgresTimePartition):
"""Base class for a PostgreSQL table sub-partition in a range partitioned
table."""

def __init__(
self,
parent_partition: PostgresPartition,
size: PostgresTimePartitionSize,
start_datetime: datetime,
name_format: Optional[str] = None,
) -> None:
super().__init__(
size=size,
start_datetime=start_datetime,
name_format=name_format,
)
self.parent_partition = parent_partition

def name(self) -> str:
name_format = self.name_format or self._unit_name_format.get(self.size.unit)
if not name_format:
raise PostgresPartitioningError("Unknown size/unit")

return (
self.parent_partition.name()
+ "_"
+ self.start_datetime.strftime(name_format).lower()
)

def create(
self,
model: Type[PostgresPartitionedModel],
schema_editor: PostgresSchemaEditor,
comment: Optional[str] = None,
) -> None:
with patch_model(model, self.parent_partition.name()) as managed_model:
super().create(
model=managed_model,
schema_editor=schema_editor,
comment=comment,
)

def delete(
self,
model: Type[PostgresPartitionedModel],
schema_editor: PostgresSchemaEditor,
) -> None:
with patch_model(model, self.parent_partition.name()) as managed_model:
super().delete(
model=managed_model,
schema_editor=schema_editor,
)

class PostgresListPartition(PostgresPartition):
"""Base class for a PostgreSQL table partition in a list partitioned
table."""

def __init__(self, values: list[Any], name_format: Optional[str] = None) -> None:
self.values = values
self.name_format = name_format or "%s"

def name(self) -> str:
return self.name_format % "_".join(str(v).lower() for v in self.values)

def deconstruct(self) -> dict:
return {
**super().deconstruct(),
"values": self.values,
}

def create(
self,
model: Type[PostgresPartitionedModel],
schema_editor: PostgresSchemaEditor,
comment: Optional[str] = None,
) -> None:
schema_editor.add_list_partition(
model=model,
name=self.name(),
values=self.values,
comment=comment,
)

def delete(
self,
model: Type[PostgresPartitionedModel],
schema_editor: PostgresSchemaEditor,
) -> None:
schema_editor.delete_partition(model, self.name())


__all__ = ["PostgresListPartition"]
87 changes: 87 additions & 0 deletions psqlextra/contrib/category_current_time/shorthands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from typing import Optional, Type

from dateutil.relativedelta import relativedelta

from psqlextra.models import PostgresPartitionedModel

from psqlextra.partitioning.config import PostgresPartitioningConfig
from psqlextra.partitioning import PostgresTimePartitionSize

from .category_current_time_strategy import (
PostgresCategoryCurrentTimePartitioningStrategy,
)


def partition_by_category_and_current_time(
model: Type[PostgresPartitionedModel],
count: int,
categories: list[int] = None,
years: Optional[int] = None,
months: Optional[int] = None,
weeks: Optional[int] = None,
days: Optional[int] = None,
hours: Optional[int] = None,
max_age: Optional[relativedelta] = None,
name_format: Optional[str] = None,
) -> PostgresPartitioningConfig:
"""Short-hand for generating a partitioning config that partitions the
specified model by time.

One specifies one of the `years`, `months`, `weeks`
or `days` parameter to indicate the size of each
partition. These parameters cannot be combined.

Arguments:
count:
The amount of partitions to create ahead of
the current date/time.

categories:
The list of categories to create first-level partitions for.

years:
The amount of years each partition should contain.

months:
The amount of months each partition should contain.

weeks:
The amount of weeks each partition should contain.

days:
The amount of days each partition should contain.

hours:
The amount of hours each partition should contain.

max_age:
The maximum age of a partition (calculated from the
start of the partition).

Partitions older than this are deleted when running
a delete/cleanup run.

name_format:
The datetime format supplied as a tuple.
The first value creates the first level partition names
and the second value is passed to datetime.strftime to generate the
second level partition name.
"""

size = PostgresTimePartitionSize(
years=years, months=months, weeks=weeks, days=days, hours=hours
)

return PostgresPartitioningConfig(
model=model,
strategy=PostgresCategoryCurrentTimePartitioningStrategy(
categories=categories or [],
size=size,
count=count,
max_age=max_age,
name_format=name_format,
),
)


__all_ = ["partition_by_category_and_current_time"]
Loading