From 6ca26f92f794721ae29c8dd967afba84e78306ea Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Mon, 16 Jun 2025 15:18:13 -0400 Subject: [PATCH 1/3] WIP --- .../datadog_checks/base/checks/base.py | 7 +++++++ postgres/datadog_checks/postgres/postgres.py | 19 +++++++++++++++++++ postgres/tests/test_statements.py | 4 ++++ 3 files changed, 30 insertions(+) diff --git a/datadog_checks_base/datadog_checks/base/checks/base.py b/datadog_checks_base/datadog_checks/base/checks/base.py index bae1c3051494d..2eb81df49b6f7 100644 --- a/datadog_checks_base/datadog_checks/base/checks/base.py +++ b/datadog_checks_base/datadog_checks/base/checks/base.py @@ -699,6 +699,13 @@ def database_monitoring_metadata(self, raw_event): aggregator.submit_event_platform_event(self, self.check_id, to_native_string(raw_event), "dbm-metadata") + def database_monitoring_agent_health(self, raw_event): + # type: (str) -> None + if raw_event is None: + return + + aggregator.submit_event_platform_event(self, self.check_id, to_native_string(raw_event), "dbm-activity") + def event_platform_event(self, raw_event, event_track_type): # type: (str, str) -> None """Send an event platform event. diff --git a/postgres/datadog_checks/postgres/postgres.py b/postgres/datadog_checks/postgres/postgres.py index 1f7424bd57de5..1941ff13495d5 100644 --- a/postgres/datadog_checks/postgres/postgres.py +++ b/postgres/datadog_checks/postgres/postgres.py @@ -159,6 +159,25 @@ def __init__(self, name, init_config, instances): ttl=self._config.database_instance_collection_interval, ) # type: TTLCache + instance_config = copy.deepcopy(self.instance) + instance_config['password'] = "***" + init_event = { + "dbm_type": "agent_health", + "event_type": "initialization", + "database_instance": self.database_identifier, + "ddsource": "postgres", + "ddagentversion": datadog_agent.get_version(), + "ddtags": self.tags, + "timestamp": time() * 1000, + "instance_config": json.dumps(instance_config, default=default_json_event_encoding), + "agent_config": json.dumps(self.agentConfig, default=default_json_event_encoding), + } + self.log.info("agent config: %s", self.agentConfig) + self.log.info("init event: %s", init_event) + self.database_monitoring_query_activity( + json.dumps(init_event, default=default_json_event_encoding) + ) + def _build_autodiscovery(self): if not self._config.discovery_config['enabled']: return None diff --git a/postgres/tests/test_statements.py b/postgres/tests/test_statements.py index 2c07c5e639943..0a2790de03b6d 100644 --- a/postgres/tests/test_statements.py +++ b/postgres/tests/test_statements.py @@ -1200,6 +1200,7 @@ def wait(conn): # only can see own queries return + dbm_activity_event = [e for e in dbm_activity_event if e.get('dbm_type') == 'activity'] event = dbm_activity_event[0] assert event['host'] == "stubbed.hostname" assert event['ddsource'] == "postgres" @@ -1286,6 +1287,7 @@ def wait(conn): time.sleep(dbm_instance['query_activity']['collection_interval']) run_one_check(check) dbm_activity_event = aggregator.get_event_platform_events("dbm-activity") + dbm_activity_event = [e for e in dbm_activity_event if e.get('dbm_type') == 'activity'] event = dbm_activity_event[1] assert len(event['postgres_activity']) > 0 # find bob's query @@ -1374,6 +1376,7 @@ def test_activity_reported_hostname( run_one_check(check) dbm_activity = aggregator.get_event_platform_events("dbm-activity") + dbm_activity = [e for e in dbm_activity if e.get('dbm_type') == 'activity'] assert dbm_activity, "should have at least one activity sample" assert dbm_activity[0]['host'] == expected_hostname @@ -1823,6 +1826,7 @@ def test_disabled_activity_or_explain_plans( conn.cursor().execute(query, (arg,)) run_one_check(check) dbm_activity = aggregator.get_event_platform_events("dbm-activity") + dbm_activity = [e for e in dbm_activity if e.get('dbm_type') == 'activity'] dbm_samples = aggregator.get_event_platform_events("dbm-samples") if POSTGRES_VERSION.split('.')[0] == "9" and pg_stat_activity_view == "pg_stat_activity": From b701ea700d6cd95ca0a6d5967c97bba1f87b3023 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Tue, 17 Jun 2025 15:51:04 -0400 Subject: [PATCH 2/3] WIP --- postgres/datadog_checks/postgres/metadata.py | 30 ++++++++++--------- postgres/datadog_checks/postgres/postgres.py | 28 +++++++++++++---- .../postgres/statement_samples.py | 3 ++ 3 files changed, 41 insertions(+), 20 deletions(-) diff --git a/postgres/datadog_checks/postgres/metadata.py b/postgres/datadog_checks/postgres/metadata.py index ad84ac7151ac8..a00f56128aba6 100644 --- a/postgres/datadog_checks/postgres/metadata.py +++ b/postgres/datadog_checks/postgres/metadata.py @@ -267,6 +267,7 @@ def __init__(self, check, config, shutdown_callback): self._collect_pg_settings_enabled = is_affirmative(config.settings_metadata_config.get("enabled", False)) self._collect_schemas_enabled = is_affirmative(config.schemas_metadata_config.get("enabled", False)) self._is_schemas_collection_in_progress = False + self._extensions_cached = None self._pg_settings_cached = None self._time_since_last_extension_query = 0 self._time_since_last_settings_query = 0 @@ -300,20 +301,21 @@ def report_postgres_extensions(self): elapsed_s = time.time() - self._time_since_last_extension_query if elapsed_s >= self.pg_extensions_collection_interval and self._collect_extensions_enabled: self._extensions_cached = self._collect_postgres_extensions() - event = { - "host": self._check.reported_hostname, - "database_instance": self._check.database_identifier, - "agent_version": datadog_agent.get_version(), - "dbms": "postgres", - "kind": "pg_extension", - "collection_interval": self.collection_interval, - "dbms_version": payload_pg_version(self._check.version), - "tags": self._tags_no_db, - "timestamp": time.time() * 1000, - "cloud_metadata": self._check.cloud_metadata, - "metadata": self._extensions_cached, - } - self._check.database_monitoring_metadata(json.dumps(event, default=default_json_event_encoding)) + if self._extensions_cached: + event = { + "host": self._check.reported_hostname, + "database_instance": self._check.database_identifier, + "agent_version": datadog_agent.get_version(), + "dbms": "postgres", + "kind": "pg_extension", + "collection_interval": self.collection_interval, + "dbms_version": payload_pg_version(self._check.version), + "tags": self._tags_no_db, + "timestamp": time.time() * 1000, + "cloud_metadata": self._check.cloud_metadata, + "metadata": self._extensions_cached, + } + self._check.database_monitoring_metadata(json.dumps(event, default=default_json_event_encoding)) @tracked_method(agent_check_getter=agent_check_getter) def _collect_postgres_extensions(self): diff --git a/postgres/datadog_checks/postgres/postgres.py b/postgres/datadog_checks/postgres/postgres.py index 1941ff13495d5..40779394c2f76 100644 --- a/postgres/datadog_checks/postgres/postgres.py +++ b/postgres/datadog_checks/postgres/postgres.py @@ -5,8 +5,10 @@ import copy import functools import os +import math from string import Template from time import time +from typing import Dict import psycopg2 from cachetools import TTLCache @@ -162,20 +164,26 @@ def __init__(self, name, init_config, instances): instance_config = copy.deepcopy(self.instance) instance_config['password'] = "***" init_event = { + "instance_config": instance_config + } + # "instance_config": json.dumps(instance_config, default=default_json_event_encoding), + # "agent_config": json.dumps(self.agentConfig, default=default_json_event_encoding), + self.send_health_event("initialization", init_event) + + def send_health_event(self, event_type, event: Dict[str, any]): + message = { "dbm_type": "agent_health", - "event_type": "initialization", + "event_type": event_type, "database_instance": self.database_identifier, "ddsource": "postgres", "ddagentversion": datadog_agent.get_version(), "ddtags": self.tags, "timestamp": time() * 1000, - "instance_config": json.dumps(instance_config, default=default_json_event_encoding), - "agent_config": json.dumps(self.agentConfig, default=default_json_event_encoding), + "event": event, } - self.log.info("agent config: %s", self.agentConfig) - self.log.info("init event: %s", init_event) + self.log.info("Sending health event: %s", json.dumps(message, default=default_json_event_encoding)) self.database_monitoring_query_activity( - json.dumps(init_event, default=default_json_event_encoding) + json.dumps(message, default=default_json_event_encoding) ) def _build_autodiscovery(self): @@ -276,6 +284,7 @@ def db(self): self.log.warning( "Connection to the database %s has been interrupted, closing connection", self._config.dbname ) + self.send_health_event("disconnect", {}) try: self._db.close() except Exception: @@ -289,14 +298,21 @@ def db(self): def _connection_health_check(self, conn): try: + # CHAOS TESTING DO NOT MERGE + # disconnect for every other minute + if math.floor(time.time()) % 120 < 60: + raise psycopg2.OperationalError("Simulated connection failure for chaos testing") + # run a simple query to check if the connection is healthy # health check should run after a connection is established with conn.cursor(cursor_factory=CommenterCursor) as cursor: cursor.execute("SELECT 1") cursor.fetchall() + self.send_health_event("connection_success", {}) except psycopg2.OperationalError as e: err_msg = f"Database {self._config.dbname} connection health check failed: {str(e)}" self.log.error(err_msg) + self.send_health_event("connection_error", {"error": err_msg}) raise DatabaseHealthCheckError(err_msg) @property diff --git a/postgres/datadog_checks/postgres/statement_samples.py b/postgres/datadog_checks/postgres/statement_samples.py index bdb4b9b349edf..82ca85e7478ee 100644 --- a/postgres/datadog_checks/postgres/statement_samples.py +++ b/postgres/datadog_checks/postgres/statement_samples.py @@ -685,15 +685,18 @@ def _get_db_explain_setup_state(self, dbname): self._log.warning( "cannot collect execution plans due to a database error in dbname=%s: %s", dbname, repr(e) ) + self._check.send_health_event("explain_plan_error", {"error": DBExplainError.database_error.value, "error_message": str(e)}) return DBExplainError.database_error, e try: result = self._run_explain(dbname, EXPLAIN_VALIDATION_QUERY, EXPLAIN_VALIDATION_QUERY) except psycopg2.errors.InvalidSchemaName as e: self._log.warning("cannot collect execution plans due to invalid schema in dbname=%s: %s", dbname, repr(e)) + self._check.send_health_event("explain_plan_error", {"error": DBExplainError.invalid_schema.value}) self._emit_run_explain_error(dbname, DBExplainError.invalid_schema, e) return DBExplainError.invalid_schema, e except psycopg2.errors.DatatypeMismatch as e: + self._check.send_health_event("explain_plan_error", {"error": DBExplainError.datatype_mismatch.value}) self._emit_run_explain_error(dbname, DBExplainError.datatype_mismatch, e) return DBExplainError.datatype_mismatch, e except psycopg2.DatabaseError as e: From bb08e1e193e0907b8299ee240c5c3715f8764b34 Mon Sep 17 00:00:00 2001 From: Seth Samuel Date: Mon, 23 Jun 2025 09:40:32 -0400 Subject: [PATCH 3/3] WIP --- postgres/datadog_checks/postgres/postgres.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/postgres/datadog_checks/postgres/postgres.py b/postgres/datadog_checks/postgres/postgres.py index 40779394c2f76..9cbc567cbf61f 100644 --- a/postgres/datadog_checks/postgres/postgres.py +++ b/postgres/datadog_checks/postgres/postgres.py @@ -300,7 +300,7 @@ def _connection_health_check(self, conn): try: # CHAOS TESTING DO NOT MERGE # disconnect for every other minute - if math.floor(time.time()) % 120 < 60: + if math.floor(time()) % 120 < 60: raise psycopg2.OperationalError("Simulated connection failure for chaos testing") # run a simple query to check if the connection is healthy