diff --git a/docs/Logging.md b/docs/Logging.md new file mode 100644 index 0000000..c36c567 --- /dev/null +++ b/docs/Logging.md @@ -0,0 +1,326 @@ +# Logging (user facing) + +This should entirely focus on the user side. How logigng works, and then how to use the stuff in daqptyools + +--- +Updates as of mid February + +# Logging for Python in DUNE-DAQ + +Welcome, fellow beavers! This page provides a user's guide to how logging is done in Python in the context of DUNE-DAQ. + +## Basics + +The bulk of the loggign functionality in drunc and other Python applications is built off the cool [Python logging framework](https://docs.python.org/3/library/logging.html), with its mission defined below: + +> This module defines functions and classes which implement a flexible event logging system for applications and libraries. + +It is worth a read to understand how logging works in Python, however the salient points are covered below. + +In general, the built in logging module allows for producing severity-classified diagnostic events, which can be filtered, formatted, or routed as necessary. These logs automatically contain useful information including the timestamp, module, and context of the message. + +The core object of the logging functionality in Python is the logger. A logging instance, `log`, can be initialised as follows. The phrase "Hello, World!" is used as an input to the logger, with this message being bundled up by other useful information, including the severity level, to form whats known as a LogRecord. This record will then be transmitted as required. + +```python +import logging +log = logging.getLogger("Demo") +log.warning("Hello, world!") + +>> Hello, World! +``` + +### Severity levels + +Every record has an attached severity level, which can be used to flag how important a log record is. By default, Python has 5 main levels and one 'notset' level as shown in the image below[^1]: + +![log_level_overview](img/loglevels.png) + +[^1]: more can be defined as required, see Python's logging manual. + +Each logging instance can have an attached severity level. If it has one, then only records that have the same severity level or higher will be transmitted. + +```python +import logging +log = logging.getLogger("Demo", level = logging.WARNING) + +log.info("This will not print") +log.warning("This will print") + +>> This will print +``` + +### Handlers + +Handlers are a key concept of logging in Python, as they control how the records are processed and formatted. There are several default one that the DAQ uses, and there are also several ones that are custom defined for the purposes of the DAQ. + +The example below shows an example of a file handler, a stream handler, and a webhook handler. As can be seen, each of the records are processed and formatted by each of the handlers and transmitted in each of their respective ways. + +![drunc_overview](img/handlers.png) + +Importantly, each handler can have its own associated severity level! In the example above, it is certainly possible to have the WebHookHandler to only transmit if a record is of the level Warning or higher. + + + +### Filters +A sidegrade and important add on for the loggers is the filters, whos primary purpose is to decide if an error should be transmitted or not. Filters can be attached to both the logger instance as well as any handlers attached to the logger instance itself. + +When a log record arrives, it will first be processed by the filters attached to the loggers first. Should they pass, the record is then passed onto each handler as shown before, where they are then further processed by each handler's attached filters. Only when they pass will a log be record be transmitted. + +![filters](img/filters.png) + +### Inheritance + +Another key part of logging in Python is the inheritance feature. Loggers are organised in a heirarchical fashion and so it is possible to initialise descendant loggers by chaining the names together with a period, such as "root.parent.child". + +By default, loggers will inheret certain properties of the parent: +- severity level of the logger +- handlers (and all attached properties, including severity level and filters on handlers) + +![inheritance](img/inheritance.png) + + + +Note that a particular exceptoin is that they _don't_ inheret any filters attached to the logger itself. + +A useful diagram to peruse is the [logging flow in the official docs](https://docs.python.org/2/howto/logging.html#logging-flow). + + +## Using logging with daqpytools + +The [daqpytools](https://github.com/DUNE-DAQ/daqpytools) contains several quality of life improvements to DAQ pytools, the most relevant to this document of which being the logging tools. + +These include: +- standardised ways of initialising top-level 'root' loggers +- constructors for default logging instances +- many bespoke handlers +- filters relevant to the DAQ +- handler configurations + + +A lot of these features can be demonstrated via the logging demonstrator functionality. With the DUNE environments loaded, simply run + +``` +daqpytools-logging-demonstrator +``` + +and view the help string to learn more, and view the script itself in the repository to see how it is implemented. + + + +### Initialising a handler + +Initialising a handler is simple: + +```python +from daqpytools.logging.logger import get_daq_logger +test_logger = get_daq_logger( + logger_name = "test_logger", + log_level = "INFO", + use_parent_handlers = True, + rich_handler = True, + stream_handlers = False +) + +test_logger.warning("Hello, world!") +``` + +As shown above, initialising a logging instance with a specific handler is as easy as modifying a flag in the constructor. + + +The core philosophy of the logging framework in daqpytools is that each logger should only have _one_ instance of a specific type of logger. This means that while a single logger can have both a Rich and a Stream handler, a single logger cannot have _two_ Rich handlers to prevent duplicating messages. + + +Please refer to the docstrings for the most up to date definitions on the options and what handlers may or may not be included. There are some exceptions which are clearly labelled in the document. Choosing which handler to trigger on a per-message instance is an advanced feature of logging in DUNE-DAQ, so please refer to the advanced section of this guide. + + +### Walkthrough of existing handlers and filters + +As seen in the previous section, there are several handlers and filters that are present in the daqpytools that may readily be used. What follows will be a brief description of each handlers as well as a quick example, but for more complete docs please refer to the docstrings and the logging demonstrator. + +Remember that by default, any messages received by the logger will be transmitted to _all_ available handlers that are attached to the logger. + + +#### Rich handler + +The Rich handler should be the 'default' handler for any messages that should be transmitted in the terminal. This handler has great support of colors, and delivers a complete message out to the terminal to make it easy to view and also trace back to the relevant message. + +![rich_demo](img/demo_rich.png) + + +#### File handler + +As the name suggests, the file handler is used to transmit messages directly to a log file. Unlike stream and rich handlers, instead of defining a boolean in the constructor the user must supply the _filename_ of the target file for the messages to go into. + +![file_demo](img/demo_file.png) + + +#### Stream handlers + +Stream handlers are used to transmit messages directly to the terminal without any color formatting. This is of great use for the logs of the controllers in drunc, which has its own method of capturing logs via a capture of the terminal output and a pipe to the relevant log file. + +Note that the stream handlers consist of two handlers, one which outputs do `stdout` and another to `stderr`. The latter will only transmit if the record severity level is Error or higher. + +![streams_demo](img/demo_streams.png) + +#### ERS Kafka handler + +The ERS Kafka handler is used to transmit ERS messages via Kafka, which is incredibly useful to show on the dashboards messages as they happen. + +This handler is not included in the default list of handlers to emit. An extra configuration must be used to properly transmit this message; eg. + +```python +from daqpytools.logging.handlers import HandlerType +from daqpytools.logging.logger import get_daq_logger + +main_logger: logging.Logger = get_daq_logger( + logger_name="daqpytools_logging_demonstrator", + ers_kafka_handler=True +) + +main_logger.error( + "ERS Message", + extra={"handlers": [HandlerType.Protobufstream]} +) +``` + +See the advanced section for more details. + +![ers_demo](img/demo_ers.png) + +**Notes** +At the moment, by default they will be sent via the following: +``` +session_name: session_tester +topic: ers_stream +addres: monkafka.cern +port: 30092 +``` + + +#### Throttle filter + +There are times when an application decides to send a huge amount of logs of a single message in a very short time, which can overwhelm the systems. When such an event occurs, it is wise to throttle the output coming out. + +The throttle filter replicates the same logic that exists in the ERS C++ implementation, which dynamically limits how many messages get transmitted. The filter is by default attached to the _logger_ instance, with no support for this filter being attached to a specific handler just yet. + +Initialising the filter takes in two argument: + - `initial_treshold`: number of initial occurences to let through immediately + - `time_limit`: time window in seconds for resetting state + +The basic logic is as follows. + +1. The first N messages will instantly get transmitted, up to `initial_treshold` +2. The next 10 messages will be suppressed, with the next single message reported at the end +3. The next 100 messages will be suppressed, with the next single message reported at the end +4. This continues, with the treshold increaseing by 10x everytime +5. After `time_limit` seconds after the last message, the filter gets reset, allowing messages to be sent once more + + +For the throttle filter, a 'log record' is **uniquely** defined by the record's pathname and linenumber. Therefore, 50 records that contain the same 'message' but defined in different line numbers in the script will not be erroneously filtered. + + +An example is as follows: + +```python +from daqpytools.logging.handlers import HandlerType +from daqpytools.logging.logger import get_daq_logger + +main_logger: logging.Logger = get_daq_logger( + logger_name="daqpytools_logging_demonstrator", + stream_handlers=True + throttle=True +) + +emit_err = lambda i: main_logger.info( + f"Throttle test {i}", + extra={"handlers": [HandlerType.Rich, HandlerType.Throttle]}, +) + +for i in range(50): + emit_err(i) + main_logger.warning("Sleeping for 30 seconds") + time.sleep(30) +for i in range(1000): + emit_err(i) +``` + +Which will behave as expected. + +![throttle_demo](img/demo_throttle.png) + +**Note** +By default, throttle filters obtained via `get_daq_logger` will be initialised with an `initial_treshold` of 30 and a `time_limit` of 30. + +**Note** +Similarly to the ERS Kafka handler, this filter is not enabled by default, hence requiring the use of HandlerTypes. See the Advanced section for more info. + +### Advanced logging + +The above walkthrough should be sufficient for the vast majority of logging. However, there are a few advanced features in daqpytools that would benefit the user. These mainly are targetted towards a high degree of customisability for the user, including the ability to choose which of the attached handlers will transmit a given message, and automatic routing of messages to certain handlers. + +#### Choosing handlers with HandlerTypes + +Lets say you have a logger with an attached Rich handler and File handler as below, and that there are two messages you want to log. However, one of them should only be sent to the file, and the other one should be sent to the terminal via the rich handler. + +``` +log = get_daq_logger("example", rich_handler=True, file_handler="logging.log") +``` + +These can be done by using the `HandlerTypes` enum. The loggers defined here have a special ability to read which HandlerTypes are supplied and to only transmit to the required handlers. + +There is a very specific syntax that must be followed involving the `extra` kwarg: + +``` +from daqpytools.logging.handlers import HandlerType + +log.info("This will only be sent to the Rich", extra={"handlers": [HandlerType.Rich]}) +log.info("This will only be sent to the File", extra={"handlers": [HandlerType.File]}) +log.info("You can even send to both", extra={"handlers": [HandlerType.Rich, HandlerType.File]}) +``` + +Naturally, if tell the logger to transmit a message where the associated Handler is not attached will cause it to do nothing. For example above, using HandlerTypes.Stream will do nothing. + + + +#### The need for handler streams + +Within the DUNE DAQ ecosystem, there are several other configurations that interplay. These include the OpMon and ERS, which require several handlers for each configuration. These can be best thought of as 'streams' which interact with several other handlers, as shown in an example below. + +![streams](img/streams.png) + +The native implemention in drunc and most applications is referred to as the 'Base' stream, which will only require interactions with the Rich, File, and Stream handlers. **This is why the ERS Kafka Handler and the Throttle filter need to be 'activated' with HandlerTypes**. + +The ERS configuration is defined in OKS, [for example here](https://github.com/DUNE-DAQ/daqsystemtest/blob/974965be6e96aff969c69a380ed34aa96705e802/config/daqsystemtest/ccm.data.xml#L189), and are automatically parsed by daqpytools as they get used. A special feature of the ERS configuration is that the relevant Handlers are severity-level dependent; ERS Fatal and ERS info may have a different set of handler requirements + +#### Log Handler Conf + + +To deal with the constraints set above, a handler configuration dataclass called `LogHandlerConf` is constructed to define the relevant configurations and a system of filters is initialised with every handler. When passing a log record that needs to be processed via a specific stream, the log record and the handler configuration is passed to the relevant logger, after which it is processed. + +An example of which is shown in the logging demonstrator, copied here. + +```python +handlerconf = LogHandlerConf() + +main_logger.warning("Handlerconf Base", extra=handlerconf.Base) +main_logger.warning("Handlerconf Opmon", extra=handlerconf.Opmon) +``` + + +#### Using ERS +By default, the LogHandlerConf does not initialise the ERS stream because it requires the ERS environment variables to be defined. Should these variables exists, there are two ways to initialise the ERS stream with the LogHandlerConf. + +```python +# By default init_ers is false +# When this is the case, ERS Streams are _not_ defined. Will survive without ERS envs being defined +LHC_no_init = LogHandlerConf() == LogHandlerConf(init_ers=False) + +print(LHC_no_init.Base) # Success +print(LHC_no_init.ERS) # Throws ERS stream not initialised. Call init_ERS() first + +# Later on, when ERS envs are defined, can be initialised +LHC_no_init.init_ERS() +print(LHC_no_init.ERS) # Success + +``` + diff --git a/docs/img/demo_ers.png b/docs/img/demo_ers.png new file mode 100644 index 0000000..e93a51c Binary files /dev/null and b/docs/img/demo_ers.png differ diff --git a/docs/img/demo_file.png b/docs/img/demo_file.png new file mode 100644 index 0000000..42c7eb7 Binary files /dev/null and b/docs/img/demo_file.png differ diff --git a/docs/img/demo_rich.png b/docs/img/demo_rich.png new file mode 100644 index 0000000..eed45a3 Binary files /dev/null and b/docs/img/demo_rich.png differ diff --git a/docs/img/demo_streams.png b/docs/img/demo_streams.png new file mode 100644 index 0000000..ceeb8d3 Binary files /dev/null and b/docs/img/demo_streams.png differ diff --git a/docs/img/demo_throttle.png b/docs/img/demo_throttle.png new file mode 100644 index 0000000..803422c Binary files /dev/null and b/docs/img/demo_throttle.png differ diff --git a/docs/img/filters.png b/docs/img/filters.png new file mode 100644 index 0000000..986c93c Binary files /dev/null and b/docs/img/filters.png differ diff --git a/docs/img/handlers.png b/docs/img/handlers.png new file mode 100644 index 0000000..fd23592 Binary files /dev/null and b/docs/img/handlers.png differ diff --git a/docs/img/inheritance.png b/docs/img/inheritance.png new file mode 100644 index 0000000..7f93c42 Binary files /dev/null and b/docs/img/inheritance.png differ diff --git a/docs/img/loglevels.png b/docs/img/loglevels.png new file mode 100644 index 0000000..75aa5d8 Binary files /dev/null and b/docs/img/loglevels.png differ diff --git a/docs/img/streams.png b/docs/img/streams.png new file mode 100644 index 0000000..7b4cfc1 Binary files /dev/null and b/docs/img/streams.png differ diff --git a/src/daqpytools/apps/logging_demonstrator.py b/src/daqpytools/apps/logging_demonstrator.py index 400f6f1..981f6d6 100644 --- a/src/daqpytools/apps/logging_demonstrator.py +++ b/src/daqpytools/apps/logging_demonstrator.py @@ -180,6 +180,34 @@ def test_handlertypes(main_logger: logging.Logger) -> None: extra={"handlers": [HandlerType.Rich, HandlerType.Protobufstream]} ) +ers_envs = { + "DUNEDAQ_ERS_WARNING" : "erstrace,throttle,lstdout", + "DUNEDAQ_ERdS_INFO" : "erstrace,throttle,lstdout", + "DUNEDAQ_ERS_FATAL" : "erstrace,lstdout", + "DUNEDAQ_ERS_ERROR" : ( + "erstrace," + "throttle," + "lstdout," + "protobufstream(monkafka.cern.ch:30092)" + ) +} + +def obtain_original_envs(envs: dict) -> dict: + return { + key: os.environ.get(key) + for key in envs + } + + + +def restore_original_envs(original:dict) -> None: + for var_name, original_value in original.items(): + if original_value is None: + os.environ.pop(var_name, None) + else: + os.environ[var_name] = original_value + + def test_handlerconf(main_logger: logging.Logger) -> None: """Demonstrates the main functionality of the handlerconf. With ERS support. @@ -193,15 +221,12 @@ def test_handlerconf(main_logger: logging.Logger) -> None: main_logger.warning("Handlerconf Opmon", extra=handlerconf.Opmon) #* Interlude: Inject sample environment variables - os.environ["DUNEDAQ_ERS_WARNING"] = "erstrace,throttle,lstdout" - os.environ["DUNEDAQ_ERS_INFO"] = "erstrace,throttle,lstdout" - os.environ["DUNEDAQ_ERS_FATAL"] = "erstrace,lstdout" - os.environ["DUNEDAQ_ERS_ERROR"] = ( - "erstrace," - "throttle," - "lstdout," - "protobufstream(monkafka.cern.ch:30092)" - ) + # Save envs from current shell before we rewrite + saved_envs = obtain_original_envs(ers_envs) + + for key, value in ers_envs.items(): + # Inject ERS envs into the python shell + os.environ[key] = value info_out = f"{os.getenv('DUNEDAQ_ERS_ERROR')=}" main_logger.info(info_out) @@ -213,17 +238,33 @@ def test_handlerconf(main_logger: logging.Logger) -> None: # HandlerConf will require that these variables are defined! # They come from the OKS, so whatever tools you have should have this up # You can also initialise via handlerconf = LogHandlerConf(init_ers=True) - handlerconf.init_ers_stream() + try: + handlerconf.init_ers_stream() + + #* Test ERS Streams + main_logger.warning("ERS Warning erstrace,throttle,lstdout", extra=handlerconf.ERS) + main_logger.info("ERS Info erstrace,throttle,lstdout", extra=handlerconf.ERS) + main_logger.critical("ERS Fatal erstrace,lstdout", extra=handlerconf.ERS) + main_logger.debug("ERS Debug none", extra=handlerconf.ERS) + main_logger.error("ERS Error erstrace,throttle,lstdout," + "protobufstream(monkafka.cern.ch:30092)", + extra=handlerconf.ERS + ) + except Exception as e: + main_logger.error(f"UHhhh not sure what happened here {e}") + finally: + + # Restore original environment variables + ## Note that in https://dagster.io/blog/ + ## python-environment-variables#modifying-and-adding-environment-variables + ## It looks like os.environ only affects the local python shell, and doesn't + ## "leak" out into the bash envs. + ## However, for best practices this is still done + + restore_original_envs(saved_envs) + + - #* Test ERS Streams - main_logger.warning("ERS Warning erstrace,throttle,lstdout", extra=handlerconf.ERS) - main_logger.info("ERS Info erstrace,throttle,lstdout", extra=handlerconf.ERS) - main_logger.critical("ERS Fatal erstrace,lstdout", extra=handlerconf.ERS) - main_logger.debug("ERS Debug none", extra=handlerconf.ERS) - main_logger.error("ERS Error erstrace,throttle,lstdout," - "protobufstream(monkafka.cern.ch:30092)", - extra=handlerconf.ERS - ) class AllOptionsCommand(click.Command): """Parse the arguments passed and validate they are acceptable, otherwise print the diff --git a/src/daqpytools/logging/handlers.py b/src/daqpytools/logging/handlers.py index 3b5a1e1..29b8503 100644 --- a/src/daqpytools/logging/handlers.py +++ b/src/daqpytools/logging/handlers.py @@ -482,21 +482,21 @@ def _throttle(self, rec: IssueRecord, record: logging.LogRecord) -> bool: if rec.suppressed_counter >= rec.threshold: rec.threshold = rec.threshold * 10 # Escalate: 10 -> 100 -> 1000 ... rec.last_occurrence = current_time - rec. last_occurrence_formatted = self._format_timestamp(current_time) + rec.last_occurrence_formatted = self._format_timestamp(current_time) self._report_suppression(rec, record) return False # Don't emit the original record # Step 4: Check if enough time passed since last report if current_time - rec.last_report > self.time_limit: rec.last_occurrence = current_time - rec. last_occurrence_formatted = self._format_timestamp(current_time) + rec.last_occurrence_formatted = self._format_timestamp(current_time) self._report_suppression(rec, record) return False # Don't emit the original record # Step 5: Suppress silently rec.suppressed_counter += 1 rec.last_occurrence = current_time - rec. last_occurrence_formatted = self._format_timestamp(current_time) + rec.last_occurrence_formatted = self._format_timestamp(current_time) return False def _report_suppression(self, rec: IssueRecord, record: logging.LogRecord) -> None: diff --git a/tests/logging/test_handlers.py b/tests/logging/test_handlers.py new file mode 100644 index 0000000..a7d02b0 --- /dev/null +++ b/tests/logging/test_handlers.py @@ -0,0 +1,662 @@ +"""Comprehensive tests for the logging filters in handlers.py. + +Tests cover: +- BaseHandlerFilter: Handler selection logic for both ERS and non-ERS paths +- HandleIDFilter: Filter that accepts only specific handler types +- ThrottleFilter: Advanced throttling with escalating thresholds and time windows +- Integration: Real logger usage with filters and handlers +""" + +import copy +import io +import logging +import time +from threading import Thread +from unittest.mock import MagicMock + +import pytest + +from daqpytools.logging.handlers import ( + BaseHandlerFilter, + ERSPyLogHandlerConf, + HandleIDFilter, + HandlerType, + IssueRecord, + ProtobufConf, + StreamType, + ThrottleFilter, +) +from daqpytools.logging.levels import level_to_ers_var + +# ============================================================================ +# FIXTURES +# ============================================================================ + + +@pytest.fixture +def clean_logger(): + """Provide a clean logger with no handlers or filters.""" + logger = logging.getLogger("test_logger_handlers") + logger.handlers = [] + logger.filters = [] + logger.setLevel(logging.DEBUG) + return logger + + +@pytest.fixture +def log_record() -> logging.LogRecord: + """Provide a basic log record for testing.""" + return logging.LogRecord( + name="test.module", + level=logging.ERROR, + pathname="/path/to/test.py", + lineno=42, + msg="Test message", + args=(), + exc_info=None, + ) + + +@pytest.fixture +def ers_log_record(): + """Provide a log record configured for ERS streaming.""" + record = logging.LogRecord( + name="test.module", + level=logging.ERROR, + pathname="/path/to/test.py", + lineno=67, + msg="ERS message", + args=(), + exc_info=None, + ) + record.stream = StreamType.ERS + return record + + +@pytest.fixture +def mock_ers_handlers(): + """Provide mock ERS handler configuration for testing.""" + handlers_config = {} + for level_var in level_to_ers_var.values(): + conf = ERSPyLogHandlerConf( + handlers=[HandlerType.Throttle, HandlerType.Protobufstream], + protobufconf=ProtobufConf(url="monkafka.cern.ch", port=30092), + ) + handlers_config[level_var] = conf + return handlers_config + + +# ============================================================================ +# BaseHandlerFilter Tests +# ============================================================================ + + +class TestBaseHandlerFilter: + """Tests for BaseHandlerFilter.get_allowed() logic.""" + + def test_non_ers_uses_record_handlers_attribute( + self, log_record: logging.LogRecord + ): + """Test get_allowed() uses 'handlers' attribute from record for non-ERS.""" + log_record.handlers = [HandlerType.Rich, HandlerType.File] + filter_obj = BaseHandlerFilter() + + allowed = filter_obj.get_allowed(log_record) + + assert allowed == [HandlerType.Rich, HandlerType.File] + + def test_non_ers_defaults_to_base_handlers( + self, log_record: logging.LogRecord + ): + """Test get_allowed() falls back to default handlers when attribute missing.""" + # log_record has no 'handlers' attribute + filter_obj = BaseHandlerFilter() + + allowed = filter_obj.get_allowed(log_record) + + # Should return the base handlers from LogHandlerConf + assert allowed is not None + expected_handlers = {HandlerType.Stream, HandlerType.Rich, HandlerType.File} + assert expected_handlers.issubset(set(allowed)) + + def test_ers_path_valid_configuration( + self, ers_log_record: logging.LogRecord, mock_ers_handlers: dict + ): + """Test get_allowed() extracts ERS handlers correctly with valid config.""" + ers_log_record.ers_handlers = mock_ers_handlers + filter_obj = BaseHandlerFilter() + + allowed = filter_obj.get_allowed(ers_log_record) + + assert allowed == [HandlerType.Throttle, HandlerType.Protobufstream] + + def test_ers_path_no_matching_level_variable( + self, ers_log_record: logging.LogRecord, mock_ers_handlers: dict + ): + """Test get_allowed() returns None when log level has no ERS mapping.""" + # Set a log level that does not have an ERS equivalent + ers_log_record.levelno = 25 # Between INFO and WARNING + ers_log_record.ers_handlers = mock_ers_handlers + filter_obj = BaseHandlerFilter() + + allowed = filter_obj.get_allowed(ers_log_record) + assert allowed is None + + +# ============================================================================ +# HandleIDFilter Tests +# ============================================================================ + + +class TestHandleIDFilter: + """Tests for HandleIDFilter.filter() logic.""" + + def test_single_handler_id_normalized_to_set(self): + """Test that single handler_id is normalized to a set.""" + filter_obj = HandleIDFilter(HandlerType.Rich) + + assert isinstance(filter_obj.handler_ids, set) + assert HandlerType.Rich in filter_obj.handler_ids + + def test_list_handler_ids_converted_to_set(self): + """Test that list of handler_ids is converted to a set.""" + handlers = [HandlerType.Rich, HandlerType.File] + filter_obj = HandleIDFilter(handlers) + + assert isinstance(filter_obj.handler_ids, set) + assert filter_obj.handler_ids == {HandlerType.Rich, HandlerType.File} + + def test_filter_returns_true_when_handler_in_allowed( + self, log_record: logging.LogRecord + ): + """Test filter() returns True when handler_id is in allowed list.""" + log_record.handlers = [HandlerType.Rich, HandlerType.File, HandlerType.Stream] + filter_obj = HandleIDFilter(HandlerType.Rich) + + result = filter_obj.filter(log_record) + + assert result is True + + def test_filter_returns_false_when_handler_not_in_allowed( + self, log_record: logging.LogRecord + ): + """Test filter() returns False when handler_id not in allowed.""" + log_record.handlers = [HandlerType.File, HandlerType.Stream] + filter_obj = HandleIDFilter(HandlerType.Rich) + + result = filter_obj.filter(log_record) + + assert result is False + + def test_filter_returns_false_when_get_allowed_returns_none( + self, log_record: logging.LogRecord + ): + """Test filter() returns False when get_allowed() returns None.""" + filter_obj = HandleIDFilter(HandlerType.Rich) + filter_obj.get_allowed = MagicMock(return_value=None) + + result = filter_obj.filter(log_record) + + assert result is False + + def test_filter_with_multiple_handler_ids( + self, log_record: logging.LogRecord + ): + """Test filter() with multiple handler_ids checks intersection.""" + log_record.handlers = [HandlerType.Rich, HandlerType.File] + filter_obj = HandleIDFilter([HandlerType.Rich, HandlerType.Stream]) + + result = filter_obj.filter(log_record) + + # Should return True because Rich is in both sets + assert result is True + + def test_filter_no_intersection_with_multiple_ids( + self, log_record: logging.LogRecord + ): + """Test filter() returns False when no intersection with multiple ids.""" + log_record.handlers = [HandlerType.File] + filter_obj = HandleIDFilter([HandlerType.Rich, HandlerType.Stream]) + + result = filter_obj.filter(log_record) + + assert result is False + + +# ============================================================================ +# ThrottleFilter Tests +# ============================================================================ + + +class TestThrottleFilter: + """Tests for ThrottleFilter throttling and suppression logic.""" + + def test_initial_phase_lets_through_first_n_messages( + self, log_record: logging.LogRecord + ): + """Test that first N messages pass through without suppression.""" + log_record.handlers = [HandlerType.Throttle] + filter_obj = ThrottleFilter(initial_threshold=3, time_limit=10) + + # First 3 messages should pass + assert filter_obj.filter(log_record) is True + assert filter_obj.filter(log_record) is True + assert filter_obj.filter(log_record) is True + + def test_after_initial_threshold_suppresses( + self, log_record: logging.LogRecord + ): + """Test that messages are suppressed after initial_threshold.""" + log_record.handlers = [HandlerType.Throttle] + filter_obj = ThrottleFilter(initial_threshold=2, time_limit=10) + + # First 2 pass + assert filter_obj.filter(log_record) is True + assert filter_obj.filter(log_record) is True + + # 3rd should be suppressed + assert filter_obj.filter(log_record) is False + + def test_escalating_threshold_doubles_on_report( + self, log_record: logging.LogRecord + ): + """Test that threshold escalates (10->100->1000) when reporting.""" + log_record.handlers = [HandlerType.Throttle] + filter_obj = ThrottleFilter(initial_threshold=1, time_limit=100) + + issue_id = f"{log_record.pathname}:{log_record.lineno}" + issue_record = filter_obj.issue_map[issue_id] + + # First is emitted + # Next 10 are suppressed + # needs 1 more to trigger update + for _ in range(12): + filter_obj._throttle(issue_record, log_record) + + + assert issue_record.threshold == 100 # Escalated from 10 + + def test_time_window_reset_resets_counters( + self, log_record: logging.LogRecord, monkeypatch: pytest.MonkeyPatch + ): + """Test that state resets after time_limit expires.""" + log_record.handlers = [HandlerType.Throttle] + filter_obj = ThrottleFilter(initial_threshold=1, time_limit=1) + + times = iter([1000.0, 1002.5]) + monkeypatch.setattr(time, "time", lambda: next(times)) + + # First message passes + assert filter_obj.filter(log_record) is True + + # Time advances beyond time_limit with no suppression, reset should allow pass + assert filter_obj.filter(log_record) is True + + def test_suppressed_counter_increments( + self, log_record: logging.LogRecord + ): + """Test that suppressed_counter increments for each suppressed message.""" + log_record.handlers = [HandlerType.Throttle] + filter_obj = ThrottleFilter(initial_threshold=0, time_limit=100) + + issue_id = f"{log_record.pathname}:{log_record.lineno}" + issue_record = filter_obj.issue_map[issue_id] + + # Send 5 messages + for i in range(5): + filter_obj.filter(log_record) + # After initial messages handled, counter should increment + if i > 0: + assert issue_record.suppressed_counter >= 0 + + def test_throttle_suppression_flag_bypasses_filter( + self, log_record: logging.LogRecord + ): + """Test that _throttle_suppression flag allows suppression messages through.""" + log_record.handlers = [HandlerType.Throttle] + filter_obj = ThrottleFilter(initial_threshold=0, time_limit=100) + + # Normal message is suppressed + assert filter_obj.filter(log_record) is False + + # Same message with suppression flag bypasses filter + log_record._throttle_suppression = True + assert filter_obj.filter(log_record) is True + + def test_get_allowed_returns_none_skips_throttle( + self, log_record: logging.LogRecord + ): + """Test filter() returns True if get_allowed() returns None.""" + filter_obj = ThrottleFilter() + filter_obj.get_allowed = MagicMock(return_value=None) + + # Should return False because allowed is None + result = filter_obj.filter(log_record) + assert result is False + + def test_throttle_not_in_allowed_returns_true( + self, log_record: logging.LogRecord + ): + """Test filter() returns True if Throttle not in allowed handlers.""" + log_record.handlers = [HandlerType.Rich, HandlerType.File] + filter_obj = ThrottleFilter(initial_threshold=0, time_limit=10) + + # Throttle not in allowed, so should return True + assert filter_obj.filter(log_record) is True + + def test_timestamp_formatting(self): + """Test that timestamp formatting produces valid ISO format.""" + filter_obj = ThrottleFilter() + timestamp = time.time() + + formatted = filter_obj._format_timestamp(timestamp) + + # Should be ISO format with microseconds + assert len(formatted) == 26 # YYYY-MM-DD HH:MM:SS.ffffff + assert formatted.count("-") == 2 # Two dashes for date + assert formatted.count(":") == 2 # Two colons for time + + def test_different_issues_tracked_separately( + self, log_record: logging.LogRecord + ): + """Test that different file:line combinations track state separately.""" + filter_obj = ThrottleFilter(initial_threshold=2, time_limit=10) + + # First issue + record1 = copy.deepcopy(log_record) + record1.pathname = "/path1.py" + record1.lineno = 10 + record1.handlers = [HandlerType.Throttle] + + # Second issue + record2 = copy.deepcopy(log_record) + record2.pathname = "/path2.py" + record2.lineno = 20 + record2.handlers = [HandlerType.Throttle] + + # Both pass initial threshold + assert filter_obj.filter(record1) is True + assert filter_obj.filter(record2) is True + + # Issue 1: passes again + assert filter_obj.filter(record1) is True + + # Issue 2: passes again (separate tracking) + assert filter_obj.filter(record2) is True + + # Issue 1: suppressed + assert filter_obj.filter(record1) is False + + # Issue 2: suppressed (independent) + assert filter_obj.filter(record2) is False + + def test_thread_safety_concurrent_issues( + self, log_record: logging.LogRecord + ): + """Test ThrottleFilter is thread-safe with concurrent logging.""" + filter_obj = ThrottleFilter(initial_threshold=5, time_limit=10) + log_record.handlers = [HandlerType.Throttle] + results = [] + + def log_messages(record: logging.LogRecord, num_messages: int) -> None: + """Log from a thread.""" + for _ in range(num_messages): + result = filter_obj.filter(record) + results.append(result) + + # Create threads logging to same issue + threads = [] + for _ in range(3): + thread = Thread(target=log_messages, args=(log_record, 10)) + threads.append(thread) + thread.start() + + # Wait for all threads + for thread in threads: + thread.join() + + # Should have completed without deadlock + assert len(results) == 30 + # First 5 should pass (initial threshold) + assert results[:5].count(True) >= 3 # At least some early ones pass + + +# ============================================================================ +# IssueRecord Tests +# ============================================================================ + + +class TestIssueRecord: + """Tests for IssueRecord state tracking.""" + + def test_init_sets_defaults(self): + """Test that __init__ sets proper default values.""" + record = IssueRecord() + + assert record.last_occurrence == 0.0 + assert record.last_report == 0.0 + assert record.initial_counter == 0 + assert record.threshold == 10 + assert record.suppressed_counter == 0 + assert record.last_occurrence_formatted == "" + + def test_reset_clears_all_state(self): + """Test that reset() clears all counters and timestamps.""" + record = IssueRecord() + record.last_occurrence = 100.0 + record.initial_counter = 5 + record.suppressed_counter = 20 + record.threshold = 100 + record.last_occurrence_formatted = "2025-01-01 12:00:00.000000" + + record.reset() + + assert record.last_occurrence == 0.0 + assert record.last_report == 0.0 + assert record.initial_counter == 0 + assert record.threshold == 10 + assert record.suppressed_counter == 0 + assert record.last_occurrence_formatted == "" + + +# ============================================================================ +# Integration Tests +# ============================================================================ + + +class TestFiltersIntegration: + """Integration tests with real logger setup.""" + + def test_logger_with_handle_id_filter(self, clean_logger: logging.Logger): + """Test logger with HandleIDFilter allows only specific handlers.""" + stream = io.StringIO() + handler = logging.StreamHandler(stream) + handler.addFilter(HandleIDFilter(HandlerType.Stream)) + + clean_logger.addHandler(handler) + + # Log with matching handler type + record = logging.LogRecord( + name=clean_logger.name, + level=logging.INFO, + pathname="test.py", + lineno=1, + msg="Test message", + args=(), + exc_info=None, + ) + record.handlers = [HandlerType.Stream, HandlerType.Rich] + + clean_logger.handle(record) + + # Message should appear because Stream is in allowed + assert "Test message" in stream.getvalue() + + def test_logger_with_throttle_filter(self, clean_logger: logging.Logger): + """Test logger correctly suppresses messages with ThrottleFilter.""" + stream = io.StringIO() + handler = logging.StreamHandler(stream) + handler.setFormatter(logging.Formatter("%(message)s")) + filter_obj = ThrottleFilter(initial_threshold=2, time_limit=10) + handler.addFilter(filter_obj) + + clean_logger.addHandler(handler) + clean_logger.setLevel(logging.INFO) + + record = logging.LogRecord( + name=clean_logger.name, + level=logging.INFO, + pathname="test.py", + lineno=10, + msg="Repeated message", + args=(), + exc_info=None, + ) + record.handlers = [HandlerType.Throttle] + + # Log 5 times + for _ in range(5): + clean_logger.handle(record) + + output = stream.getvalue() + + # First 2 should appear, then suppression message + assert output.count("Repeated message") >= 2 + + def test_chained_filters(self, clean_logger: logging.Logger): + """Test stacking HandleIDFilter and ThrottleFilter.""" + stream = io.StringIO() + handler = logging.StreamHandler(stream) + handler.setFormatter(logging.Formatter("%(message)s")) + + # Add both filters + handler.addFilter(HandleIDFilter(HandlerType.Throttle)) + handler.addFilter(ThrottleFilter(initial_threshold=1, time_limit=10)) + + clean_logger.addHandler(handler) + clean_logger.setLevel(logging.INFO) + + record = logging.LogRecord( + name=clean_logger.name, + level=logging.INFO, + pathname="test.py", + lineno=10, + msg="Chained filters test", + args=(), + exc_info=None, + ) + record.handlers = [HandlerType.Throttle] + + # Log message + clean_logger.handle(record) + + # Should appear in output + output = stream.getvalue() + assert "Chained filters test" in output + + +# ============================================================================ +# Edge Cases and Error Handling +# ============================================================================ + + +class TestEdgeCases: + """Tests for edge cases and boundary conditions.""" + + def test_empty_handlers_list(self, log_record: logging.LogRecord): + """Test filter behavior with empty handlers list.""" + log_record.handlers = [] + filter_obj = HandleIDFilter(HandlerType.Rich) + + result = filter_obj.filter(log_record) + + assert result is False + + def test_none_handlers_attribute(self, log_record: logging.LogRecord): + """Test filter when record.handlers is None.""" + log_record.handlers = None + filter_obj = HandleIDFilter(HandlerType.Rich) + + # get_allowed should handle None gracefully + result = filter_obj.filter(log_record) + assert result is False + + def test_throttle_with_zero_initial_threshold( + self, log_record: logging.LogRecord + ): + """Test ThrottleFilter with initial_threshold=0.""" + log_record.handlers = [HandlerType.Throttle] + filter_obj = ThrottleFilter(initial_threshold=0, time_limit=10) + + # All messages should be suppressed after first + assert filter_obj.filter(log_record) is False + + def test_issue_record_key_format(self, log_record: logging.LogRecord): + """Test that issue_record key is formatted correctly.""" + filter_obj = ThrottleFilter() + + issue_id = f"{log_record.pathname}:{log_record.lineno}" + record = filter_obj.issue_map[issue_id] + + assert isinstance(record, IssueRecord) + + def test_multiple_handler_types_intersection( + self, log_record: logging.LogRecord + ): + """Test set intersection with multiple handler types.""" + log_record.handlers = [ + HandlerType.Rich, + HandlerType.File, + HandlerType.Stream, + ] + filter_obj = HandleIDFilter([HandlerType.Rich, HandlerType.Lstdout]) + + # Rich is in the intersection + result = filter_obj.filter(log_record) + assert result is True + + def test_protobuf_conf_in_ers_handlers( + self, ers_log_record: logging.LogRecord, mock_ers_handlers: dict + ): + """Test that ProtobufConf is properly included in ERS configuration.""" + ers_log_record.ers_handlers = mock_ers_handlers + filter_obj = BaseHandlerFilter() + + allowed = filter_obj.get_allowed(ers_log_record) + + assert HandlerType.Protobufstream in allowed + + def test_suppression_message_includes_count( + self, clean_logger: logging.Logger + ): + """Test that suppression message includes suppressed count.""" + stream = io.StringIO() + handler = logging.StreamHandler(stream) + handler.setFormatter(logging.Formatter("%(message)s")) + + # Create a throttle filter that will suppress quickly + throttle_filter = ThrottleFilter(initial_threshold=1, time_limit=10) + handler.addFilter(throttle_filter) + + clean_logger.addHandler(handler) + clean_logger.setLevel(logging.INFO) + + record = logging.LogRecord( + name=clean_logger.name, + level=logging.INFO, + pathname="test.py", + lineno=10, + msg="Test", + args=(), + exc_info=None, + ) + record.handlers = [HandlerType.Throttle] + + # Send messages to trigger suppression + for _ in range(15): + clean_logger.handle(copy.deepcopy(record)) + + output = stream.getvalue() + + # Should contain suppression message with count + assert "suppressed" in output.lower()