Skip to content

Seth.samuel/agent health #20543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions datadog_checks_base/datadog_checks/base/checks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,13 @@ def database_monitoring_metadata(self, raw_event):

aggregator.submit_event_platform_event(self, self.check_id, to_native_string(raw_event), "dbm-metadata")

def database_monitoring_agent_health(self, raw_event):
# type: (str) -> None
if raw_event is None:
return

aggregator.submit_event_platform_event(self, self.check_id, to_native_string(raw_event), "dbm-activity")

def event_platform_event(self, raw_event, event_track_type):
# type: (str, str) -> None
"""Send an event platform event.
Expand Down
30 changes: 16 additions & 14 deletions postgres/datadog_checks/postgres/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def __init__(self, check, config, shutdown_callback):
self._collect_pg_settings_enabled = is_affirmative(config.settings_metadata_config.get("enabled", False))
self._collect_schemas_enabled = is_affirmative(config.schemas_metadata_config.get("enabled", False))
self._is_schemas_collection_in_progress = False
self._extensions_cached = None
self._pg_settings_cached = None
self._time_since_last_extension_query = 0
self._time_since_last_settings_query = 0
Expand Down Expand Up @@ -300,20 +301,21 @@ def report_postgres_extensions(self):
elapsed_s = time.time() - self._time_since_last_extension_query
if elapsed_s >= self.pg_extensions_collection_interval and self._collect_extensions_enabled:
self._extensions_cached = self._collect_postgres_extensions()
event = {
"host": self._check.reported_hostname,
"database_instance": self._check.database_identifier,
"agent_version": datadog_agent.get_version(),
"dbms": "postgres",
"kind": "pg_extension",
"collection_interval": self.collection_interval,
"dbms_version": payload_pg_version(self._check.version),
"tags": self._tags_no_db,
"timestamp": time.time() * 1000,
"cloud_metadata": self._check.cloud_metadata,
"metadata": self._extensions_cached,
}
self._check.database_monitoring_metadata(json.dumps(event, default=default_json_event_encoding))
if self._extensions_cached:
event = {
"host": self._check.reported_hostname,
"database_instance": self._check.database_identifier,
"agent_version": datadog_agent.get_version(),
"dbms": "postgres",
"kind": "pg_extension",
"collection_interval": self.collection_interval,
"dbms_version": payload_pg_version(self._check.version),
"tags": self._tags_no_db,
"timestamp": time.time() * 1000,
"cloud_metadata": self._check.cloud_metadata,
"metadata": self._extensions_cached,
}
self._check.database_monitoring_metadata(json.dumps(event, default=default_json_event_encoding))

@tracked_method(agent_check_getter=agent_check_getter)
def _collect_postgres_extensions(self):
Expand Down
35 changes: 35 additions & 0 deletions postgres/datadog_checks/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import copy
import functools
import os
import math
from string import Template
from time import time
from typing import Dict

import psycopg2
from cachetools import TTLCache
Expand Down Expand Up @@ -159,6 +161,31 @@ def __init__(self, name, init_config, instances):
ttl=self._config.database_instance_collection_interval,
) # type: TTLCache

instance_config = copy.deepcopy(self.instance)
instance_config['password'] = "***"
init_event = {
"instance_config": instance_config
}
# "instance_config": json.dumps(instance_config, default=default_json_event_encoding),
# "agent_config": json.dumps(self.agentConfig, default=default_json_event_encoding),
self.send_health_event("initialization", init_event)

def send_health_event(self, event_type, event: Dict[str, any]):
message = {
"dbm_type": "agent_health",
"event_type": event_type,
"database_instance": self.database_identifier,
"ddsource": "postgres",
"ddagentversion": datadog_agent.get_version(),
"ddtags": self.tags,
"timestamp": time() * 1000,
"event": event,
}
self.log.info("Sending health event: %s", json.dumps(message, default=default_json_event_encoding))
self.database_monitoring_query_activity(
json.dumps(message, default=default_json_event_encoding)
)

def _build_autodiscovery(self):
if not self._config.discovery_config['enabled']:
return None
Expand Down Expand Up @@ -257,6 +284,7 @@ def db(self):
self.log.warning(
"Connection to the database %s has been interrupted, closing connection", self._config.dbname
)
self.send_health_event("disconnect", {})
try:
self._db.close()
except Exception:
Expand All @@ -270,14 +298,21 @@ def db(self):

def _connection_health_check(self, conn):
try:
# CHAOS TESTING DO NOT MERGE
# disconnect for every other minute
if math.floor(time()) % 120 < 60:
raise psycopg2.OperationalError("Simulated connection failure for chaos testing")

# run a simple query to check if the connection is healthy
# health check should run after a connection is established
with conn.cursor(cursor_factory=CommenterCursor) as cursor:
cursor.execute("SELECT 1")
cursor.fetchall()
self.send_health_event("connection_success", {})
except psycopg2.OperationalError as e:
err_msg = f"Database {self._config.dbname} connection health check failed: {str(e)}"
self.log.error(err_msg)
self.send_health_event("connection_error", {"error": err_msg})
raise DatabaseHealthCheckError(err_msg)

@property
Expand Down
3 changes: 3 additions & 0 deletions postgres/datadog_checks/postgres/statement_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,15 +685,18 @@ def _get_db_explain_setup_state(self, dbname):
self._log.warning(
"cannot collect execution plans due to a database error in dbname=%s: %s", dbname, repr(e)
)
self._check.send_health_event("explain_plan_error", {"error": DBExplainError.database_error.value, "error_message": str(e)})
return DBExplainError.database_error, e

try:
result = self._run_explain(dbname, EXPLAIN_VALIDATION_QUERY, EXPLAIN_VALIDATION_QUERY)
except psycopg2.errors.InvalidSchemaName as e:
self._log.warning("cannot collect execution plans due to invalid schema in dbname=%s: %s", dbname, repr(e))
self._check.send_health_event("explain_plan_error", {"error": DBExplainError.invalid_schema.value})
self._emit_run_explain_error(dbname, DBExplainError.invalid_schema, e)
return DBExplainError.invalid_schema, e
except psycopg2.errors.DatatypeMismatch as e:
self._check.send_health_event("explain_plan_error", {"error": DBExplainError.datatype_mismatch.value})
self._emit_run_explain_error(dbname, DBExplainError.datatype_mismatch, e)
return DBExplainError.datatype_mismatch, e
except psycopg2.DatabaseError as e:
Expand Down
4 changes: 4 additions & 0 deletions postgres/tests/test_statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,7 @@ def wait(conn):
# only can see own queries
return

dbm_activity_event = [e for e in dbm_activity_event if e.get('dbm_type') == 'activity']
event = dbm_activity_event[0]
assert event['host'] == "stubbed.hostname"
assert event['ddsource'] == "postgres"
Expand Down Expand Up @@ -1286,6 +1287,7 @@ def wait(conn):
time.sleep(dbm_instance['query_activity']['collection_interval'])
run_one_check(check)
dbm_activity_event = aggregator.get_event_platform_events("dbm-activity")
dbm_activity_event = [e for e in dbm_activity_event if e.get('dbm_type') == 'activity']
event = dbm_activity_event[1]
assert len(event['postgres_activity']) > 0
# find bob's query
Expand Down Expand Up @@ -1374,6 +1376,7 @@ def test_activity_reported_hostname(
run_one_check(check)

dbm_activity = aggregator.get_event_platform_events("dbm-activity")
dbm_activity = [e for e in dbm_activity if e.get('dbm_type') == 'activity']
assert dbm_activity, "should have at least one activity sample"
assert dbm_activity[0]['host'] == expected_hostname

Expand Down Expand Up @@ -1823,6 +1826,7 @@ def test_disabled_activity_or_explain_plans(
conn.cursor().execute(query, (arg,))
run_one_check(check)
dbm_activity = aggregator.get_event_platform_events("dbm-activity")
dbm_activity = [e for e in dbm_activity if e.get('dbm_type') == 'activity']
dbm_samples = aggregator.get_event_platform_events("dbm-samples")

if POSTGRES_VERSION.split('.')[0] == "9" and pg_stat_activity_view == "pg_stat_activity":
Expand Down
Loading