diff --git a/backend/kernelCI_app/management/commands/update_db.py b/backend/kernelCI_app/management/commands/update_db.py index b27c4611b..3511767d5 100644 --- a/backend/kernelCI_app/management/commands/update_db.py +++ b/backend/kernelCI_app/management/commands/update_db.py @@ -1,10 +1,11 @@ import json +from typing import Generator from django.core.management.base import BaseCommand, CommandError -from django.db import connections +from django.db import connections, models import logging from django.conf import settings from kernelCI_app.models import Issues, Checkouts, Builds, Tests, Incidents -from datetime import timedelta +from datetime import datetime, timedelta from django.utils import timezone logger = logging.getLogger(__name__) @@ -16,8 +17,7 @@ SELECT_BATCH_SIZE = 25000 -def parse_interval(interval_str: str): - now = timezone.now() +def parse_interval(interval_str: str) -> datetime: parts = interval_str.split() if len(parts) != 2: raise ValueError(f"Invalid interval format: {interval_str}") @@ -25,14 +25,16 @@ def parse_interval(interval_str: str): value, unit = parts value = int(value) - if unit.lower() in ["hour", "hours"]: + if unit.lower() in ["minute", "minutes"]: + delta = timedelta(minutes=value) + elif unit.lower() in ["hour", "hours"]: delta = timedelta(hours=value) elif unit.lower() in ["day", "days"]: delta = timedelta(days=value) else: raise ValueError(f"Unsupported time unit: {unit}") - return now - delta + return timezone.now() - delta class Command(BaseCommand): @@ -40,10 +42,14 @@ class Command(BaseCommand): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.start_interval = None - self.end_interval = None - self.start_timestamp = None - self.end_timestamp = None + self.start_interval: str + self.end_interval: str + self.start_timestamp: datetime + self.end_timestamp: datetime + self.related_data_only: bool + self.origins: list[str] + self.origin_condition: str + if settings.USE_DASHBOARD_DB: self.kcidb_connection = connections["kcidb"] self.dashboard_conn_name = "default" @@ -56,11 +62,13 @@ def add_arguments(self, parser): "--start-interval", type=str, help="Start interval for filtering data ('x days' or 'x hours' format)", + required=True, ) parser.add_argument( "--end-interval", type=str, help="End interval for filtering data ('x days' or 'x hours' format)", + required=True, ) parser.add_argument( "--table", @@ -68,19 +76,38 @@ def add_arguments(self, parser): help="""Table name to limit the migration to (optional, if not provided all tables will be migrated)""", ) + parser.add_argument( + "--related-data-only", + action="store_true", + help="""Only retrieves data that are related to the existing data. + This allows to follow foreign key constraints, + but it almost certainly won't retrieve all data in the given interval.""", + ) + parser.add_argument( + "--origins", + type=lambda s: [origin.strip() for origin in s.split(",")], + help="Limit database changes to specific origins (comma-separated list)." + + " If not provided, any origin will be considered", + default=[], + ) - def handle(self, *args, **options): - self.start_interval = options.get("start_interval") - self.end_interval = options.get("end_interval") - table = options.get("table", None) - - if not self.start_interval or not self.end_interval: - self.stdout.write( - self.style.ERROR( - "Both self.start_interval and self.end_interval must be provided. Aborting." - ) - ) - return + def handle( + self, + *args, + start_interval: str, + end_interval: str, + table: str, + origins: list[str], + related_data_only, + **options, + ): + self.start_interval = start_interval + self.end_interval = end_interval + self.related_data_only = related_data_only + self.origins = origins + self.origin_condition = ( + f"AND origin IN ({','.join(['%s'] * len(origins))})" if origins else "" + ) self.start_timestamp = parse_interval(self.start_interval) self.end_timestamp = parse_interval(self.end_interval) @@ -133,24 +160,49 @@ def handle(self, *args, **options): logger.error(f"Error updating database: {str(e)}") raise CommandError("Command failed") from e + def get_related_data( + self, *, model: models.Model, field_name: str, filter_timestamp: bool = True + ) -> tuple[set[str], str]: + """Gets the related ids and makes the condition string""" + + # Avoids making an unnecessary query + if self.related_data_only is False: + return set(), "" + + values = model.objects.using(self.dashboard_conn_name) + if filter_timestamp: + values = values.filter( + field_timestamp__gte=self.start_timestamp, + field_timestamp__lte=self.end_timestamp, + ) + values = values.values_list("id", flat=True) + related_ids = set(values) + + related_condition = ( + f"AND {field_name} IN ({",".join(["%s"] * len(related_ids))})" + ) + + return related_ids, related_condition + # ISSUES ######################################## def select_issues_data(self) -> list[tuple]: - query = """ + query = f""" SELECT _timestamp, id, version, origin, report_url, report_subject, culprit_code, culprit_tool, culprit_harness, comment, misc, categories FROM issues - WHERE _timestamp >= NOW() - INTERVAL %(start_interval)s - AND _timestamp <= NOW() - INTERVAL %(end_interval)s + WHERE _timestamp >= NOW() - INTERVAL %s + AND _timestamp <= NOW() - INTERVAL %s + {self.origin_condition} ORDER BY _timestamp """ - params = { - "start_interval": self.start_interval, - "end_interval": self.end_interval, - } + query_params = [ + self.start_interval, + self.end_interval, + ] + self.origins with self.kcidb_connection.cursor() as kcidb_cursor: - kcidb_cursor.execute(query, params) + kcidb_cursor.execute(query, query_params) return kcidb_cursor.fetchall() def insert_issues_data(self, records: list[tuple]) -> int: @@ -185,7 +237,7 @@ def insert_issues_data(self, records: list[tuple]) -> int: self.stdout.write(f"Processed {total_inserted} Issues records") return total_inserted - def migrate_issues(self): + def migrate_issues(self) -> None: """Migrate Issues data from default to dashboard_db""" self.stdout.write("\nMigrating Issues...") @@ -195,7 +247,7 @@ def migrate_issues(self): # CHECKOUTS ######################################## def select_checkouts_data(self) -> list[tuple]: - query = """ + query = f""" SELECT _timestamp, id, origin, tree_name, git_repository_url, git_commit_hash, git_commit_name, git_repository_branch, patchset_files, patchset_hash, message_id, comment, start_time, @@ -203,17 +255,18 @@ def select_checkouts_data(self) -> list[tuple]: git_repository_branch_tip, git_commit_tags, origin_builds_finish_time, origin_tests_finish_time FROM checkouts - WHERE _timestamp >= NOW() - INTERVAL %(start_interval)s - AND _timestamp <= NOW() - INTERVAL %(end_interval)s + WHERE _timestamp >= NOW() - INTERVAL %s + AND _timestamp <= NOW() - INTERVAL %s + {self.origin_condition} ORDER BY _timestamp """ - params = { - "start_interval": self.start_interval, - "end_interval": self.end_interval, - } + query_params = [ + self.start_interval, + self.end_interval, + ] + self.origins with self.kcidb_connection.cursor() as kcidb_cursor: - kcidb_cursor.execute(query, params) + kcidb_cursor.execute(query, query_params) return kcidb_cursor.fetchall() def insert_checkouts_data(self, records: list[tuple]) -> int: @@ -271,20 +324,11 @@ def migrate_checkouts(self) -> None: # BUILDS ######################################## def select_builds_data(self) -> list[tuple]: - checkout_ids = set( - ( - Checkouts.objects.using(self.dashboard_conn_name) - .filter( - field_timestamp__gte=self.start_timestamp, - field_timestamp__lte=self.end_timestamp, - ) - .values_list("id", flat=True) - ) + related_checkout_ids, related_condition = self.get_related_data( + model=Checkouts, field_name="checkout_id" ) - - if len(checkout_ids) == 0: + if self.related_data_only and len(related_checkout_ids) == 0: return [] - checkout_id_placeholders = ",".join(["%s"] * len(checkout_ids)) query = f""" SELECT _timestamp, checkout_id, id, origin, comment, start_time, @@ -292,16 +336,23 @@ def select_builds_data(self) -> list[tuple]: output_files, config_name, config_url, log_url, log_excerpt, misc, status FROM builds - WHERE builds.checkout_id IN ({checkout_id_placeholders}) - AND _timestamp >= NOW() - INTERVAL %s + WHERE _timestamp >= NOW() - INTERVAL %s AND _timestamp <= NOW() - INTERVAL %s + {related_condition} + {self.origin_condition} ORDER BY _timestamp, id """ + query_params = ( + [ + self.start_interval, + self.end_interval, + ] + + list(related_checkout_ids) + + self.origins + ) with self.kcidb_connection.cursor() as kcidb_cursor: - kcidb_cursor.execute( - query, list(checkout_ids) + [self.start_interval, self.end_interval] - ) + kcidb_cursor.execute(query, query_params) return kcidb_cursor.fetchall() def insert_builds_data(self, records: list[tuple]) -> int: @@ -350,19 +401,12 @@ def migrate_builds(self) -> None: self.stdout.write("Builds migration completed") # TESTS ######################################## - def select_tests_data(self): - existing_build_ids = set( - Builds.objects.using(self.dashboard_conn_name) - .filter( - field_timestamp__gte=self.start_timestamp, - field_timestamp__lte=self.end_timestamp, - ) - .values_list("id", flat=True) + def select_tests_data(self) -> Generator[list[tuple], None, list[tuple]]: + related_build_ids, related_condition = self.get_related_data( + model=Builds, field_name="build_id" ) - - if len(existing_build_ids) == 0: + if self.related_data_only and len(related_build_ids) == 0: return [] - build_id_placeholders = ",".join(["%s"] * len(existing_build_ids)) tests_query = f""" SELECT _timestamp, build_id, id, origin, environment_comment, @@ -371,15 +415,20 @@ def select_tests_data(self): number_value, environment_compatible, number_prefix, number_unit, input_files FROM tests - WHERE build_id IN ({build_id_placeholders}) - AND _timestamp >= NOW() - INTERVAL %s + WHERE _timestamp >= NOW() - INTERVAL %s AND _timestamp <= NOW() - INTERVAL %s + {related_condition} + {self.origin_condition} ORDER BY _timestamp, id """ - query_params = list(existing_build_ids) + [ - self.start_interval, - self.end_interval, - ] + query_params = ( + [ + self.start_interval, + self.end_interval, + ] + + list(related_build_ids) + + self.origins + ) with self.kcidb_connection.cursor() as kcidb_cursor: kcidb_cursor.execute(tests_query, query_params) @@ -441,13 +490,11 @@ def migrate_tests(self) -> None: # INCIDENTS ######################################## def select_incidents_data(self) -> list[tuple]: - existing_issues_ids = set( - Issues.objects.using(self.dashboard_conn_name).values_list("id", flat=True) + related_issue_ids, related_condition = self.get_related_data( + model=Issues, field_name="issue_id", filter_timestamp=False ) - - if len(existing_issues_ids) == 0: + if self.related_data_only and len(related_issue_ids) == 0: return [] - issue_id_placeholders = ",".join(["%s"] * len(existing_issues_ids)) # Though we can filter with the build and test ID, filtering by # issue ID is more consistent since incidents can be triggered for @@ -456,16 +503,21 @@ def select_incidents_data(self) -> list[tuple]: SELECT _timestamp, id, origin, issue_id, issue_version, build_id, test_id, present, comment, misc FROM incidents - WHERE issue_id IN ({issue_id_placeholders}) - AND _timestamp >= NOW() - INTERVAL %s + WHERE _timestamp >= NOW() - INTERVAL %s AND _timestamp <= NOW() - INTERVAL %s + {related_condition} + {self.origin_condition} ORDER BY _timestamp """ - query_params = list(existing_issues_ids) + [ - self.start_interval, - self.end_interval, - ] + query_params = ( + [ + self.start_interval, + self.end_interval, + ] + + list(related_issue_ids) + + self.origins + ) with self.kcidb_connection.cursor() as kcidb_cursor: kcidb_cursor.execute(query, query_params) @@ -475,52 +527,58 @@ def select_incidents_data(self) -> list[tuple]: def insert_incidents_data(self, records: list[tuple]) -> int: original_incidents: list[Incidents] = [] - proposed_issue_ids: set[tuple[str, int]] = set() - proposed_build_ids: set[str] = set() - proposed_test_ids: set[str] = set() + existing_issue_ids: set[tuple[str, int]] = set() + existing_build_ids: set[str] = set() + existing_test_ids: set[str] = set() skipped_incidents = 0 - for record in records: - issue_id = record[3] - issue_version = record[4] - build_id = record[5] - test_id = record[6] - proposed_issue_ids.add((issue_id, issue_version)) - if build_id: - proposed_build_ids.add(build_id) - if test_id: - proposed_test_ids.add(test_id) - - existing_issue_ids = set( - Issues.objects.using(self.dashboard_conn_name) - .filter(id__in=[issue[0] for issue in proposed_issue_ids]) - .values_list("id", flat=True) - ) + if self.related_data_only: + proposed_issue_ids: set[tuple[str, int]] = set() + proposed_build_ids: set[str] = set() + proposed_test_ids: set[str] = set() + + for record in records: + issue_id = record[3] + issue_version = record[4] + build_id = record[5] + test_id = record[6] + proposed_issue_ids.add((issue_id, issue_version)) + if build_id: + proposed_build_ids.add(build_id) + if test_id: + proposed_test_ids.add(test_id) + + existing_issue_ids = set( + Issues.objects.using(self.dashboard_conn_name) + .filter(id__in=[issue[0] for issue in proposed_issue_ids]) + .values_list("id", flat=True) + ) - existing_build_ids = set( - Builds.objects.using(self.dashboard_conn_name) - .filter(id__in=proposed_build_ids) - .values_list("id", flat=True) - ) + existing_build_ids = set( + Builds.objects.using(self.dashboard_conn_name) + .filter(id__in=proposed_build_ids) + .values_list("id", flat=True) + ) - existing_test_ids = set( - Tests.objects.using(self.dashboard_conn_name) - .filter(id__in=proposed_test_ids) - .values_list("id", flat=True) - ) + existing_test_ids = set( + Tests.objects.using(self.dashboard_conn_name) + .filter(id__in=proposed_test_ids) + .values_list("id", flat=True) + ) # Incidents that don't have a related issue, build or test in the dashboard_db - # will be skipped to preserve the foreign key constraints + # will be skipped to preserve the foreign key constraints unless explicited for record in records: issue_id = record[3] issue_version = record[4] build_id = record[5] test_id = record[6] - if issue_id in existing_issue_ids: - if (build_id is not None and build_id not in existing_build_ids) or ( - test_id is not None and test_id not in existing_test_ids - ): + if not self.related_data_only or issue_id in existing_issue_ids: + if ( + (build_id is not None and build_id not in existing_build_ids) + or (test_id is not None and test_id not in existing_test_ids) + ) and self.related_data_only: skipped_incidents += 1 continue