Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ Primary configuration parameters:
- `[SNAPSHOT]/compress_state` - whether to compress the state before saving it. This can be useful when using Kafka
persistence.

Optional configuration parameters:

- `[SNAPSHOT]/snapshot_triggers` - Events that can trigger saving the snapshots. Defaults to `cluster_created`,
`cluster_template_changed`, `periodic`, `on-demand`. See more in [Snapshot Triggers](#snapshot-triggers)

## Masking

This feature allows masking of specific variable parts in log message with keywords, prior to passing to Drain. A
Expand Down Expand Up @@ -305,6 +310,27 @@ pip3 install kafka-python
pip3 install redis
```

## Snapshot Triggers

This controls when a snapshot of the Drain3 state is saved. For very noisy logs (such as CI/CD logs) this can happen
very frequently due to new cluster templates being created and changed. This can significantly slow down the ingest
progress due to the constant snapshot saving.

The `[SNAPSHOT]/snapshot_triggers` options allows you to control this by listing which events trigger a snapshot thereby
reducing the constant snapshotting and drastically increasing performance; for noisy CI/CD logs this resulted in an 80%
speed increase for a 716k build log.

In practice, you'll likely either need all the default events or just `on-demand` and `periodic`.

**Important**

The client code calling this will need to ensure that an `on-demand` event is called at the end of the processing
otherwise the state will be incomplete. For example, on an instance of `TemplateMiner`:

```python
template_miner.save_state({"change_type": "on-demand"})
```

## Examples

In order to run the examples directly from the repository, you need to install dependencies. You can do that using *
Expand Down
19 changes: 14 additions & 5 deletions drain3/template_miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,25 +109,34 @@ def load_state(self) -> None:
logger.info(f"Restored {len(loaded_drain.clusters)} clusters "
f"built from {loaded_drain.get_total_cluster_size()} messages")

def save_state(self, snapshot_reason: str) -> None:
def save_state(self, snapshot_reason: Mapping[str, str]) -> None:
assert self.persistence_handler is not None
if snapshot_reason["change_type"] not in self.config.snapshot_triggers:
return

cluster_id = snapshot_reason["cluster_id"] if "cluster_id" in snapshot_reason else "no cluster id"

state = jsonpickle.dumps(self.drain, keys=True).encode('utf-8')
if self.config.snapshot_compress_state:
state = base64.b64encode(zlib.compress(state))

logger.info(f"Saving state of {len(self.drain.clusters)} clusters "
f"with {self.drain.get_total_cluster_size()} messages, {len(state)} bytes, "
f"reason: {snapshot_reason}")
f"reason: {snapshot_reason["change_type"]} ({cluster_id})")
self.persistence_handler.save_state(state)

def get_snapshot_reason(self, change_type: str, cluster_id: int) -> Optional[str]:
def get_snapshot_reason(self, change_type: str, cluster_id: int) -> Optional[Mapping[str, str]]:
snapshot_reason: Mapping[str, str] = {
"change_type": change_type,
"cluster_id": cluster_id
}

if change_type != "none":
return f"{change_type} ({cluster_id})"
return snapshot_reason

diff_time_sec = time.time() - self.last_save_time
if diff_time_sec >= self.config.snapshot_interval_minutes * 60:
return "periodic"
return { "change_type": "periodic", "cluster_id": cluster_id }

return None

Expand Down
3 changes: 3 additions & 0 deletions drain3/template_miner_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def __init__(self) -> None:
self.mask_suffix = ">"
self.parameter_extraction_cache_capacity = 3000
self.parametrize_numeric_tokens = True
self.snapshot_triggers = ["cluster_created", "cluster_template_changed", "periodic", "on-demand"]

def load(self, config_filename: str) -> None:
parser = configparser.ConfigParser()
Expand Down Expand Up @@ -80,3 +81,5 @@ def load(self, config_filename: str) -> None:
instruction = MaskingInstruction(mi['regex_pattern'], mi['mask_with'])
masking_instructions.append(instruction)
self.masking_instructions = masking_instructions

self.snapshot_triggers = parser.get(section_snapshot, 'snapshot_triggers', fallback=self.snapshot_triggers)