-
Notifications
You must be signed in to change notification settings - Fork 533
[crowdstrike] Fix/5309 Crowdstrike intrusion set name resolution #5410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[crowdstrike] Fix/5309 Crowdstrike intrusion set name resolution #5410
Conversation
instead of slugs Added debug and warning logs for actor resolution process
Kakudou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi,
Thanks for the PR.
Unfortunately, I have to mark it as "Request Changes" for a few reasons.
The first and most important one:
Those fixes do not seem to work. I still see intrusion names being created using the slug/id name.
They were retrieved using the "indicator" scope only.

Regarding the report part, it feels over-engineered and contains a lot of unused code.
I was unable to reach that code path, even after ingesting a full day of historical data.
Could you provide some insight about your configuration for this part, so I can retrieve the same dataset as you and observe the same behavior?
external-import/crowdstrike/src/crowdstrike_feeds_connector/report/builder.py
Outdated
Show resolved
Hide resolved
external-import/crowdstrike/src/crowdstrike_feeds_connector/report/importer.py
Outdated
Show resolved
Hide resolved
external-import/crowdstrike/src/crowdstrike_feeds_services/client/reports.py
Outdated
Show resolved
Hide resolved
external-import/crowdstrike/src/crowdstrike_feeds_connector/report/builder.py
Outdated
Show resolved
Hide resolved
external-import/crowdstrike/src/crowdstrike_feeds_services/client/actors.py
Outdated
Show resolved
Hide resolved
external-import/crowdstrike/src/crowdstrike_feeds_services/client/actors.py
Outdated
Show resolved
Hide resolved
external-import/crowdstrike/src/crowdstrike_feeds_services/client/actors.py
Outdated
Show resolved
Hide resolved
…t name resolution
- Skip slug/name resolution when report actor stubs include IDs - Normalize indicator actor tokens before Intrusion Set creation - Add per-run caching to reduce repeated actor API calls - Prevent Intrusion Set name drift across feeds
Kakudou
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the second pass on that code.
But i can't approved it in that state.
The fix don't work, the Actors are still ingested with the ID instead the human name.
Second reason:
The code is generated/written with an insane amount of complexity without any benefit.
Making the review and future maintenance really difficult to handle.
Please, keep it KISS, and prefer straight to the goal instead of over complex-spaghetti code.
| # Normalize CrowdStrike actor tokens to stable human-readable names before building the bundle. | ||
| # Indicators frequently contain actor values as strings (internal tokens / slugs). We only | ||
| # normalize the names for stable Intrusion Set creation; the Actor feed/enrichment owns the full profile. | ||
| actor_tokens = indicator.get("actors") or [] | ||
| if actor_tokens: | ||
| # Use cache first. | ||
| normalized: List[str] = [] | ||
| to_resolve: List[str] = [] | ||
|
|
||
| for token in actor_tokens: | ||
| if not token: | ||
| continue | ||
| cached = self._resolved_actor_name_cache.get(token) | ||
| if cached: | ||
| normalized.append(cached) | ||
| else: | ||
| to_resolve.append(token) | ||
|
|
||
| if to_resolve: | ||
| try: | ||
| response = self.actors_api_cs.get_actors_by_slugs(to_resolve) | ||
| resources = ( | ||
| response.get("resources", []) | ||
| if isinstance(response, dict) | ||
| else [] | ||
| ) | ||
|
|
||
| # Build lookup from slug/name -> canonical name. | ||
| lookup: Dict[str, str] = {} | ||
| for actor in resources: | ||
| name = actor.get("name") or actor.get("slug") | ||
| if not name: | ||
| continue | ||
| slug = actor.get("slug") | ||
| if slug: | ||
| lookup[slug] = name | ||
| lookup[name] = name | ||
|
|
||
| for token in to_resolve: | ||
| canon = lookup.get(token) | ||
| if not canon: | ||
| canon = lookup.get(token.lower()) or lookup.get( | ||
| token.upper() | ||
| ) | ||
| canon = canon or token | ||
| self._resolved_actor_name_cache[token] = canon | ||
| normalized.append(canon) | ||
|
|
||
| self.helper.connector_logger.debug( | ||
| "Normalized indicator actors via CrowdStrike Actors API.", | ||
| { | ||
| "indicator_id": indicator.get("id"), | ||
| "input_actors": actor_tokens, | ||
| "normalized_actors": normalized, | ||
| "resolved_count": len(resources), | ||
| }, | ||
| ) | ||
| except Exception as err: | ||
| # Do not fail the whole indicator if actor normalization fails. | ||
| # Keep existing 'actors' field (tokens) and log a warning. | ||
| self.helper.connector_logger.warning( | ||
| "[WARNING] Failed to normalize indicator actors; using raw values as-is.", | ||
| { | ||
| "indicator_id": indicator.get("id"), | ||
| "actor_tokens": actor_tokens, | ||
| "error": str(err), | ||
| }, | ||
| ) | ||
| normalized = actor_tokens | ||
|
|
||
| # Replace with normalized list (preserves ordering; falls back to raw token when unresolved). | ||
| indicator["actors"] = normalized | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion:
Honestly, was 'hard' to read, feel really overcomplicated/over-engineered for a simple output.
if the purpose was to retrieved each tokens, for each call the api to extract the human name, keeping a cache so we don't make same call twice.
The code could have been:
actor_tokens = indicator.get("actors") or []
if actor_tokens:
uncached_tokens = [t for t in actor_tokens if t and t not in self._resolved_actor_name_cache]
if uncached_tokens:
try:
response = self.actors_api_cs.get_actors_by_slugs(uncached_tokens)
actors_data = response.get("resources", [])
for actor in actors_data:
slug = actor.get("slug")
name = actor.get("name")
self._resolved_actor_name_cache[slug] = name
except Exception as err:
self.helper.connector_logger.warning(
"Failed to normalize indicator actors; using raw values",
)
for token in uncached_tokens:
self._resolved_actor_name_cache[token] = token
indicator["actors"] = [
self._resolved_actor_name_cache.get(token, token)
for token in actor_tokens if token
]
from ~70l down to ~20
Why the actual code is over-engineered:
- We are inside
if actor_tokensso in the loop, howif not tokencould occurs ? isinstance(response, dict)is unreachable, you gotresponse.get('resources', [])and i don't think the api could return inconsistent type.- using two list 'nomalized' and 'to_resolve' make no really sense, you got it in cache or you will add it to the cache.
- why doing
canon = lookup.get(token.lower()) or lookup.get(token.upper() )action doesn't the API send always the same case ? lookupgot two mapping:lookup[slug] = namelookup[name] = namethe self reference to name look useless
| intrusion_set = self._create_intrusion_set_from_actor(actor) | ||
| intrusion_sets.append(intrusion_set) | ||
| try: | ||
| # crowdstrike_feeds_connector/report/builder.py inside _create_intrusion_sets loop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QoL:
That comment feel like an artifact of the process of writing that code.
And gave no real information, should be removed.
| intrusion_sets.append(intrusion_set) | ||
| try: | ||
| # crowdstrike_feeds_connector/report/builder.py inside _create_intrusion_sets loop | ||
| if isinstance(actor, dict): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ask:
In which condition actor not gonna be a dict ?
| object_markings=self.object_markings, | ||
| ) | ||
| ) | ||
| continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion:
That continue do absolutely nothing
| if isinstance(actor, dict): | ||
| logger.debug( | ||
| "Report actor entity before IntrusionSet creation", | ||
| extra={ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion:
That extra make sense in 'classic lo', not from the logger coming from the helper.
| ) | ||
|
|
||
|
|
||
| def create_intrusion_set_from_actor_entity( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion:
Like all above, utterly complex.
If you look that module utils other functions, you will see they are used to create stix model.
In that implementation you add processing logic in it (mapping and transformations) for me that's clearly not what we should find there.
| Expects a full actor resource as returned by the Intel API. | ||
| """ | ||
| # Name / slug | ||
| name = actor.get("name") or actor.get("slug") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QoL:
That method is full of ... or ... all those fallback feel wrong to me, we are not able to predict/understand that real/final output
(i will only pinpoint that first one, as example)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted and removed.
| # Also add a compact alias without spaces (e.g. "SALTY SPIDER" -> "SALTYSPIDER") | ||
| if isinstance(name, str): | ||
| compact = name.replace(" ", "") | ||
| if compact and compact != name and compact not in aliases: | ||
| aliases.append(compact) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ask:
Isn't the whole purpose of that PR to get off that id/slug naming without spaces, why bringing it back in that place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an attempt to populate aliases for deduplication purposes not the name of the object.
| # Deduplicate while preserving order | ||
| seen_aliases = set() | ||
| deduped_aliases: List[str] = [] | ||
| for alias in aliases: | ||
| if alias not in seen_aliases and alias != name: | ||
| seen_aliases.add(alias) | ||
| deduped_aliases.append(alias) | ||
| aliases = deduped_aliases |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QoL:
Already seen above, could simply use dict.fromkeys
| # -*- coding: utf-8 -*- | ||
| """CrowdStrike label parsing utilities. | ||
| This module is imported by crowdstrike_feeds_services.utils.__init__. | ||
| Goal: | ||
| - Normalize label values coming from CrowdStrike (strings + raw label objects) | ||
| - Provide convenience helpers for extracting label names and parsing into buckets | ||
| Common input shapes: | ||
| - "DataObfuscation" | ||
| - "Data Obfuscation" | ||
| - {"type": "attack_pattern", "value": "DataObfuscation"} | ||
| - {"name": "DataObfuscation"} | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import re | ||
| from dataclasses import dataclass | ||
| from typing import Any, Iterable, List, Mapping, Optional | ||
|
|
||
| _CAMEL_SPLIT_RE = re.compile(r"(?<=[a-z])(?=[A-Z])") | ||
|
|
||
|
|
||
| def _normalize_label_value(value: str) -> str: | ||
| """Normalize a label value into a more human-friendly, consistent representation.""" | ||
| if value is None: | ||
| return "" | ||
|
|
||
| value = str(value).strip() | ||
| if not value: | ||
| return "" | ||
|
|
||
| # If it's camelCase / PascalCase like "DataObfuscation", split into "Data Obfuscation". | ||
| if " " not in value and _CAMEL_SPLIT_RE.search(value): | ||
| value = _CAMEL_SPLIT_RE.sub(" ", value) | ||
|
|
||
| # Collapse repeated whitespace | ||
| value = " ".join(value.split()) | ||
| return value | ||
|
|
||
|
|
||
| def extract_label_names(labels: Optional[Iterable[Any]]) -> List[str]: | ||
| """Extract label names from mixed raw inputs. | ||
| Accepts: | ||
| - list[str] | ||
| - list[dict] where dict may have 'value', 'name', 'label', 'slug' | ||
| """ | ||
| if not labels: | ||
| return [] | ||
|
|
||
| names: List[str] = [] | ||
| for lab in labels: | ||
| if lab is None: | ||
| continue | ||
|
|
||
| if isinstance(lab, str): | ||
| val = _normalize_label_value(lab) | ||
| elif isinstance(lab, Mapping): | ||
| val = ( | ||
| lab.get("value") | ||
| or lab.get("name") | ||
| or lab.get("label") | ||
| or lab.get("slug") | ||
| or "" | ||
| ) | ||
| val = _normalize_label_value(str(val)) | ||
| else: | ||
| val = _normalize_label_value(str(lab)) | ||
|
|
||
| if val: | ||
| names.append(val) | ||
|
|
||
| # De-dupe preserving order | ||
| seen = set() | ||
| deduped: List[str] = [] | ||
| for n in names: | ||
| if n not in seen: | ||
| seen.add(n) | ||
| deduped.append(n) | ||
|
|
||
| return deduped | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class ParsedLabels: | ||
| """Parsed CrowdStrike labels bucketed into common categories.""" | ||
|
|
||
| attack_patterns: List[str] | ||
| malware_families: List[str] | ||
| actor_names: List[str] | ||
| threat_types: List[str] | ||
| raw: List[str] | ||
|
|
||
|
|
||
| def parse_crowdstrike_labels_from_raw( | ||
| raw_labels: Optional[Iterable[Any]], | ||
| ) -> ParsedLabels: | ||
| """Parse CrowdStrike labels from raw inputs (strings or dicts). | ||
| This is a tolerant parser. If it can't confidently bucket a label, it will | ||
| still appear in `raw`. | ||
| """ | ||
| label_names = extract_label_names(raw_labels) | ||
|
|
||
| # Bucket heuristics: | ||
| # If you have a strict schema in your payloads (e.g., dict['type']), we use it. | ||
| attack_patterns: List[str] = [] | ||
| malware_families: List[str] = [] | ||
| actor_names: List[str] = [] | ||
| threat_types: List[str] = [] | ||
|
|
||
| # First pass: typed dicts | ||
| if raw_labels: | ||
| for lab in raw_labels: | ||
| if not isinstance(lab, Mapping): | ||
| continue | ||
|
|
||
| ltype = (lab.get("type") or lab.get("category") or "").lower().strip() | ||
| val = ( | ||
| lab.get("value") | ||
| or lab.get("name") | ||
| or lab.get("label") | ||
| or lab.get("slug") | ||
| or "" | ||
| ) | ||
| val = _normalize_label_value(str(val)) | ||
| if not val: | ||
| continue | ||
|
|
||
| if ltype in ( | ||
| "attack_pattern", | ||
| "attack-pattern", | ||
| "mitre", | ||
| "tactic", | ||
| "technique", | ||
| ): | ||
| attack_patterns.append(val) | ||
| elif ltype in ("malware", "malware_family", "family"): | ||
| malware_families.append(val) | ||
| elif ltype in ("actor", "threat_actor", "intrusion_set", "intrusion-set"): | ||
| actor_names.append(val) | ||
| elif ltype in ("threat_type", "threat-type", "threat"): | ||
| threat_types.append(val) | ||
|
|
||
| # De-dupe typed lists while preserving order | ||
| def _dedupe(xs: List[str]) -> List[str]: | ||
| seen = set() | ||
| out: List[str] = [] | ||
| for x in xs: | ||
| if x not in seen: | ||
| seen.add(x) | ||
| out.append(x) | ||
| return out | ||
|
|
||
| attack_patterns = _dedupe(attack_patterns) | ||
| malware_families = _dedupe(malware_families) | ||
| actor_names = _dedupe(actor_names) | ||
| threat_types = _dedupe(threat_types) | ||
|
|
||
| return ParsedLabels( | ||
| attack_patterns=attack_patterns, | ||
| malware_families=malware_families, | ||
| actor_names=actor_names, | ||
| threat_types=threat_types, | ||
| raw=label_names, | ||
| ) | ||
|
|
||
|
|
||
| def parse_crowdstrike_labels(labels: Optional[List[str]]) -> ParsedLabels: | ||
| """Parse CrowdStrike labels when you already have a list[str].""" | ||
| return parse_crowdstrike_labels_from_raw(labels) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion:
Gold plated complexity and overengineering, for something absolutely never used in the code...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This functionality will be used for another feature/request where we need to parse labels to build Attack Patterns and set Indicator Types. Can be pulled from this PR


Summary
Fixes #5309 by ensuring we do not use CrowdStrike actor identifiers (e.g.,
LABYRINTHCHOLLIMA) as Intrusion Set names when processingactorsassociations found in collections likeindicators,reports,yara_master, andsnort_suricata_master.Instead, when an object payload includes
actors: ["<CS_ACTOR_ID>"], we resolve that identifier to the actor's canonical display name (and aliases where relevant) via CrowdStrike’s actor entity endpoint and use that name consistently when creating/updating the corresponding Intrusion Set. oai_citation:1‡e50b7e94-e625-496a-bef8-d88e8175f5d9_CrowdStrike_Connector_Issues.pdfProblem
Multiple collections return an
actorsfield containing CrowdStrike internal actor IDs, not the human-readable name. The connector previously treated the value as the Intrusion Set name, which led to Intrusion Sets being renamed back and forth:This broke entity stability and enrichment workflows dependent on consistent STIX IDs.
Changes
actors[]to canonical actor names before creating/updating Intrusion Sets.Why this approach
CrowdStrike’s
actorsarray contains identifiers, and the correct name must be retrieved by querying the actor definition endpoint (GetIntelActorEntities). This makes the Intrusion Set naming stable across all collections while remaining faithful to the source.Testing
Manual / functional verification:
actors: ["<CS_ACTOR_ID>"]Logs:
Notes / Follow-ups