diff --git a/README.md b/README.md index 31b1faa..1b3302f 100644 --- a/README.md +++ b/README.md @@ -135,6 +135,11 @@ Primary configuration parameters: - `[SNAPSHOT]/compress_state` - whether to compress the state before saving it. This can be useful when using Kafka persistence. +Optional configuration parameters: + +- `[SNAPSHOT]/snapshot_triggers` - Events that can trigger saving the snapshots. Defaults to `cluster_created`, + `cluster_template_changed`, `periodic`, `on-demand`. See more in [Snapshot Triggers](#snapshot-triggers) + ## Masking This feature allows masking of specific variable parts in log message with keywords, prior to passing to Drain. A @@ -305,6 +310,27 @@ pip3 install kafka-python pip3 install redis ``` +## Snapshot Triggers + +This controls when a snapshot of the Drain3 state is saved. For very noisy logs (such as CI/CD logs) this can happen +very frequently due to new cluster templates being created and changed. This can significantly slow down the ingest +progress due to the constant snapshot saving. + +The `[SNAPSHOT]/snapshot_triggers` options allows you to control this by listing which events trigger a snapshot thereby +reducing the constant snapshotting and drastically increasing performance; for noisy CI/CD logs this resulted in an 80% +speed increase for a 716k build log. + +In practice, you'll likely either need all the default events or just `on-demand` and `periodic`. + +**Important** + +The client code calling this will need to ensure that an `on-demand` event is called at the end of the processing +otherwise the state will be incomplete. For example, on an instance of `TemplateMiner`: + +```python +template_miner.save_state({"change_type": "on-demand"}) +``` + ## Examples In order to run the examples directly from the repository, you need to install dependencies. You can do that using * diff --git a/drain3/template_miner.py b/drain3/template_miner.py index 2cc0b24..74ccf4e 100644 --- a/drain3/template_miner.py +++ b/drain3/template_miner.py @@ -109,8 +109,12 @@ def load_state(self) -> None: logger.info(f"Restored {len(loaded_drain.clusters)} clusters " f"built from {loaded_drain.get_total_cluster_size()} messages") - def save_state(self, snapshot_reason: str) -> None: + def save_state(self, snapshot_reason: Mapping[str, str]) -> None: assert self.persistence_handler is not None + if snapshot_reason["change_type"] not in self.config.snapshot_triggers: + return + + cluster_id = snapshot_reason["cluster_id"] if "cluster_id" in snapshot_reason else "no cluster id" state = jsonpickle.dumps(self.drain, keys=True).encode('utf-8') if self.config.snapshot_compress_state: @@ -118,16 +122,21 @@ def save_state(self, snapshot_reason: str) -> None: logger.info(f"Saving state of {len(self.drain.clusters)} clusters " f"with {self.drain.get_total_cluster_size()} messages, {len(state)} bytes, " - f"reason: {snapshot_reason}") + f"reason: {snapshot_reason["change_type"]} ({cluster_id})") self.persistence_handler.save_state(state) - def get_snapshot_reason(self, change_type: str, cluster_id: int) -> Optional[str]: + def get_snapshot_reason(self, change_type: str, cluster_id: int) -> Optional[Mapping[str, str]]: + snapshot_reason: Mapping[str, str] = { + "change_type": change_type, + "cluster_id": cluster_id + } + if change_type != "none": - return f"{change_type} ({cluster_id})" + return snapshot_reason diff_time_sec = time.time() - self.last_save_time if diff_time_sec >= self.config.snapshot_interval_minutes * 60: - return "periodic" + return { "change_type": "periodic", "cluster_id": cluster_id } return None diff --git a/drain3/template_miner_config.py b/drain3/template_miner_config.py index 9e8d65a..8d22524 100644 --- a/drain3/template_miner_config.py +++ b/drain3/template_miner_config.py @@ -28,6 +28,7 @@ def __init__(self) -> None: self.mask_suffix = ">" self.parameter_extraction_cache_capacity = 3000 self.parametrize_numeric_tokens = True + self.snapshot_triggers = ["cluster_created", "cluster_template_changed", "periodic", "on-demand"] def load(self, config_filename: str) -> None: parser = configparser.ConfigParser() @@ -80,3 +81,5 @@ def load(self, config_filename: str) -> None: instruction = MaskingInstruction(mi['regex_pattern'], mi['mask_with']) masking_instructions.append(instruction) self.masking_instructions = masking_instructions + + self.snapshot_triggers = parser.get(section_snapshot, 'snapshot_triggers', fallback=self.snapshot_triggers)