Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pyrit/scenario/core/scenario_techniques.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pyrit.executor.attack import (
ManyShotJailbreakAttack,
PromptSendingAttack,
RedTeamingAttack,
RolePlayAttack,
RolePlayPaths,
TreeOfAttacksWithPruningAttack,
Expand Down Expand Up @@ -70,6 +71,11 @@
strategy_tags=["core", "multi_turn"],
accepts_scorer_override=False,
),
AttackTechniqueSpec(
name="red_teaming",
attack_class=RedTeamingAttack,
strategy_tags=["core", "multi_turn"],
),
]


Expand Down
4 changes: 3 additions & 1 deletion pyrit/scenario/scenarios/airt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any

from pyrit.scenario.scenarios.airt.content_harms import ContentHarms
from pyrit.scenario.scenarios.airt.cyber import Cyber, CyberStrategy
from pyrit.scenario.scenarios.airt.cyber import Cyber
from pyrit.scenario.scenarios.airt.jailbreak import Jailbreak, JailbreakStrategy
from pyrit.scenario.scenarios.airt.leakage import Leakage, LeakageStrategy
from pyrit.scenario.scenarios.airt.psychosocial import Psychosocial, PsychosocialStrategy
Expand All @@ -28,6 +28,8 @@ def __getattr__(name: str) -> Any:
return RapidResponse.get_strategy_class()
if name == "ContentHarmsStrategy":
return ContentHarms.get_strategy_class()
if name == "CyberStrategy":
return Cyber.get_strategy_class()
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")


Expand Down
209 changes: 59 additions & 150 deletions pyrit/scenario/scenarios/airt/cyber.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.

"""
Cyber scenario — technique-based malware generation testing.

Strategies select **attack techniques** (PromptSending, RedTeaming).
Datasets control **what** is tested (malware generation objectives).
Use ``--dataset-names`` to narrow which objectives to test.
"""

from __future__ import annotations

import logging
import os
from typing import TYPE_CHECKING, Any, Optional
from typing import TYPE_CHECKING, ClassVar

from pyrit.auth import get_azure_openai_auth
from pyrit.common import apply_defaults
from pyrit.common.path import SCORER_SEED_PROMPT_PATH
from pyrit.executor.attack.core.attack_config import (
AttackAdversarialConfig,
AttackScoringConfig,
)
from pyrit.executor.attack.multi_turn.red_teaming import RedTeamingAttack
from pyrit.executor.attack.single_turn.prompt_sending import PromptSendingAttack
from pyrit.models import SeedAttackGroup
from pyrit.prompt_target import OpenAIChatTarget, PromptChatTarget
from pyrit.scenario.core.atomic_attack import AtomicAttack
from pyrit.scenario.core.attack_technique import AttackTechnique
from pyrit.prompt_target import OpenAIChatTarget
from pyrit.scenario.core.dataset_configuration import DatasetConfiguration
from pyrit.scenario.core.scenario import Scenario
from pyrit.scenario.core.scenario_strategy import ScenarioStrategy
from pyrit.score import (
SelfAskRefusalScorer,
SelfAskTrueFalseScorer,
Expand All @@ -31,24 +31,37 @@
)

if TYPE_CHECKING:
from pyrit.executor.attack.core.attack_strategy import AttackStrategy
from pyrit.scenario.core.scenario_strategy import ScenarioStrategy

logger = logging.getLogger(__name__)

_CYBER_TECHNIQUE_NAMES = {"prompt_sending", "red_teaming"}


class CyberStrategy(ScenarioStrategy):
def _build_cyber_strategy() -> type[ScenarioStrategy]:
"""
Strategies for malware-focused cyber attacks. While not in the CyberStrategy class, a
few of these include:
* Shell smashing
* Zip bombs
* File deletion (rm -rf /).
Build the Cyber strategy class dynamically from SCENARIO_TECHNIQUES.

Selects only ``prompt_sending`` and ``red_teaming`` techniques from
the shared catalog.

Returns:
type[ScenarioStrategy]: The dynamically generated strategy enum class.
"""
from pyrit.registry.object_registries.attack_technique_registry import AttackTechniqueRegistry
from pyrit.registry.tag_query import TagQuery
from pyrit.scenario.core.scenario_techniques import SCENARIO_TECHNIQUES

cyber_specs = [s for s in SCENARIO_TECHNIQUES if s.name in _CYBER_TECHNIQUE_NAMES]
Copy link
Copy Markdown
Contributor Author

@rlundeen2 rlundeen2 Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure what strategies we really wanted. We may want these to be core? But for now I like not changing functionality


# Aggregate members (special markers that expand to strategies with matching tags)
ALL = ("all", {"all"})
SINGLE_TURN = ("single_turn", {"single_turn"})
MULTI_TURN = ("multi_turn", {"multi_turn"})
return AttackTechniqueRegistry.build_strategy_class_from_specs(
class_name="CyberStrategy",
specs=cyber_specs,
aggregate_tags={
"single_turn": TagQuery.any_of("single_turn"),
"multi_turn": TagQuery.any_of("multi_turn"),
},
)


class Cyber(Scenario):
Expand All @@ -60,27 +73,31 @@ class Cyber(Scenario):
techniques.
"""

VERSION: int = 1
VERSION: int = 2
_cached_strategy_class: ClassVar[type[ScenarioStrategy] | None] = None

@classmethod
def get_strategy_class(cls) -> type[ScenarioStrategy]:
"""
Get the strategy enum class for this scenario.
Return the dynamically generated strategy class, building it on first access.

Returns:
Type[ScenarioStrategy]: The CyberStrategy enum class.
type[ScenarioStrategy]: The CyberStrategy enum class.
"""
return CyberStrategy
if cls._cached_strategy_class is None:
cls._cached_strategy_class = _build_cyber_strategy()
return cls._cached_strategy_class

@classmethod
def get_default_strategy(cls) -> ScenarioStrategy:
"""
Get the default strategy used when no strategies are specified.
Return the default strategy member (``ALL``).

Returns:
ScenarioStrategy: CyberStrategy.ALL (all cyber strategies).
ScenarioStrategy: The ALL strategy value.
"""
return CyberStrategy.ALL
strategy_class = cls.get_strategy_class()
return strategy_class("all")

@classmethod
def default_dataset_config(cls) -> DatasetConfiguration:
Expand All @@ -96,54 +113,36 @@ def default_dataset_config(cls) -> DatasetConfiguration:
def __init__(
self,
*,
adversarial_chat: Optional[PromptChatTarget] = None,
objective_scorer: Optional[TrueFalseScorer] = None,
objective_scorer: TrueFalseScorer | None = None,
include_baseline: bool = True,
scenario_result_id: Optional[str] = None,
scenario_result_id: str | None = None,
) -> None:
"""
Initialize the cyber harms scenario.

Args:
adversarial_chat (Optional[PromptChatTarget]): Adversarial chat for the red teaming attack, corresponding
to CyberStrategy.MultiTurn. If not provided, defaults to an OpenAI chat target.
objective_scorer (Optional[TrueFalseScorer]): Objective scorer for malware detection. If not
provided, defaults to a SelfAskScorer using the malware.yaml file under the scorer config store for
malware detection
objective_scorer (TrueFalseScorer | None): Objective scorer for malware detection. If not
provided, defaults to a composite scorer using malware detection + refusal backstop.
include_baseline (bool): Whether to include a baseline atomic attack that sends all objectives
without modifications. Defaults to True. When True, a "baseline" attack is automatically
added as the first atomic attack, allowing comparison between unmodified prompts and
attack-modified prompts.
scenario_result_id (Optional[str]): Optional ID of an existing scenario result to resume.
without modifications. Defaults to True.
scenario_result_id (str | None): Optional ID of an existing scenario result to resume.
"""
# Cyber uses a "take object, make config" pattern to expose a more ergonomic interface. Helper
# methods return objects, not configs.

# In this context the "objective" scorer has nothing to do with the "objective" target.
# The scoring config is what's used later in scoring orchestration, not the scorer object.
if not objective_scorer:
objective_scorer = self._get_default_objective_scorer()
self._scorer_config = AttackScoringConfig(objective_scorer=objective_scorer)

# Adversarial chat is used by the RedTeamingAttack (multiturn) variation of the scenario. Otherwise, it's not
# invoked.
self._adversarial_chat = adversarial_chat if adversarial_chat else self._get_default_adversarial_target()
self._adversarial_config = AttackAdversarialConfig(target=self._adversarial_chat)
self._objective_scorer: TrueFalseScorer = (
objective_scorer if objective_scorer else self._get_default_objective_scorer()
)

super().__init__(
version=self.VERSION,
strategy_class=CyberStrategy,
objective_scorer=objective_scorer,
objective_scorer=self._objective_scorer,
strategy_class=self.get_strategy_class(),
include_default_baseline=include_baseline,
scenario_result_id=scenario_result_id,
)

# Will be resolved in _get_atomic_attacks_async
self._seed_groups: Optional[list[SeedAttackGroup]] = None

def _get_default_objective_scorer(self) -> TrueFalseCompositeScorer:
"""
Retrieve the default objective scorer.

The scoring mechanism is as follows:
1. Detect for the presence of malware using a SelfAskTrueFalseScorer.
2. Detect for NON-refusal as a backstop. (If the target model refuses to produce malware, we want that
Expand Down Expand Up @@ -175,93 +174,3 @@ def _get_default_objective_scorer(self) -> TrueFalseCompositeScorer:
return TrueFalseCompositeScorer(
aggregator=TrueFalseScoreAggregator.AND, scorers=[presence_of_malware, backstop]
)

def _get_default_adversarial_target(self) -> OpenAIChatTarget:
"""
Create and retrieve the default adversarial target.

Returns:
OpenAIChatTarget: Default adversarial target, using an unfiltered endpoint.
"""
endpoint = os.getenv("AZURE_OPENAI_GPT4O_UNSAFE_CHAT_ENDPOINT")
return OpenAIChatTarget(
endpoint=endpoint,
api_key=get_azure_openai_auth(endpoint or ""),
model_name=os.environ.get("AZURE_OPENAI_GPT4O_UNSAFE_CHAT_MODEL"),
temperature=1.2,
)

def _resolve_seed_groups(self) -> list[SeedAttackGroup]:
"""
Resolve seed groups from dataset configuration.

Returns:
List[SeedAttackGroup]: List of seed attack groups with objectives to be tested.
"""
# Use dataset_config (guaranteed to be set by initialize_async)
seed_groups = self._dataset_config.get_all_seed_attack_groups()

if not seed_groups:
self._raise_dataset_exception()

return list(seed_groups)

def _get_atomic_attack_from_strategy(self, strategy: str) -> AtomicAttack:
"""
Translate the strategy into an actual AtomicAttack.

Args:
strategy: The CyberStrategy enum (SingleTurn or MultiTurn).

Returns:
AtomicAttack: configured for the specified strategy.

Raises:
ValueError: If scenario is not properly initialized or an unknown CyberStrategy is passed.
"""
# objective_target is guaranteed to be non-None by parent class validation
if self._objective_target is None:
raise ValueError(
"Scenario not properly initialized. Call await scenario.initialize_async() before running."
)
attack_strategy: Optional[AttackStrategy[Any, Any]] = None
if strategy == "single_turn":
attack_strategy = PromptSendingAttack(
objective_target=self._objective_target,
attack_scoring_config=self._scorer_config,
)
elif strategy == "multi_turn":
attack_strategy = RedTeamingAttack(
objective_target=self._objective_target,
attack_scoring_config=self._scorer_config,
attack_adversarial_config=self._adversarial_config,
)
else:
raise ValueError(f"Unknown CyberStrategy: {strategy}")

# _seed_groups is guaranteed to be set by _get_atomic_attacks_async before this method is called
if self._seed_groups is None:
raise ValueError("_seed_groups must be resolved before creating atomic attacks")

return AtomicAttack(
atomic_attack_name=f"cyber_{strategy}",
attack_technique=AttackTechnique(attack=attack_strategy),
seed_groups=self._seed_groups,
adversarial_chat=self._adversarial_chat,
objective_scorer=self._scorer_config.objective_scorer,
memory_labels=self._memory_labels,
)

async def _get_atomic_attacks_async(self) -> list[AtomicAttack]:
"""
Generate atomic attacks for each strategy.

Returns:
List[AtomicAttack]: List of atomic attacks to execute.
"""
# Resolve seed groups from deprecated objectives or dataset config
self._seed_groups = self._resolve_seed_groups()

strategies = {s.value for s in self._scenario_strategies}

return [self._get_atomic_attack_from_strategy(strategy) for strategy in strategies]
Loading
Loading