diff --git a/pyrit/scenario/core/scenario_techniques.py b/pyrit/scenario/core/scenario_techniques.py index f76ff5d2d..0349214db 100644 --- a/pyrit/scenario/core/scenario_techniques.py +++ b/pyrit/scenario/core/scenario_techniques.py @@ -25,6 +25,7 @@ from pyrit.executor.attack import ( ManyShotJailbreakAttack, PromptSendingAttack, + RedTeamingAttack, RolePlayAttack, RolePlayPaths, TreeOfAttacksWithPruningAttack, @@ -70,6 +71,11 @@ strategy_tags=["core", "multi_turn"], accepts_scorer_override=False, ), + AttackTechniqueSpec( + name="red_teaming", + attack_class=RedTeamingAttack, + strategy_tags=["core", "multi_turn"], + ), ] diff --git a/pyrit/scenario/scenarios/airt/__init__.py b/pyrit/scenario/scenarios/airt/__init__.py index 92b4ea9e1..56cf1c3e3 100644 --- a/pyrit/scenario/scenarios/airt/__init__.py +++ b/pyrit/scenario/scenarios/airt/__init__.py @@ -6,7 +6,7 @@ from typing import Any from pyrit.scenario.scenarios.airt.content_harms import ContentHarms -from pyrit.scenario.scenarios.airt.cyber import Cyber, CyberStrategy +from pyrit.scenario.scenarios.airt.cyber import Cyber from pyrit.scenario.scenarios.airt.jailbreak import Jailbreak, JailbreakStrategy from pyrit.scenario.scenarios.airt.leakage import Leakage, LeakageStrategy from pyrit.scenario.scenarios.airt.psychosocial import Psychosocial, PsychosocialStrategy @@ -28,6 +28,8 @@ def __getattr__(name: str) -> Any: return RapidResponse.get_strategy_class() if name == "ContentHarmsStrategy": return ContentHarms.get_strategy_class() + if name == "CyberStrategy": + return Cyber.get_strategy_class() raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/pyrit/scenario/scenarios/airt/cyber.py b/pyrit/scenario/scenarios/airt/cyber.py index 2f44d1c5d..aad06d0ad 100644 --- a/pyrit/scenario/scenarios/airt/cyber.py +++ b/pyrit/scenario/scenarios/airt/cyber.py @@ -1,26 +1,26 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. +""" +Cyber scenario — technique-based malware generation testing. + +Strategies select **attack techniques** (PromptSending, RedTeaming). +Datasets control **what** is tested (malware generation objectives). +Use ``--dataset-names`` to narrow which objectives to test. +""" + +from __future__ import annotations + import logging import os -from typing import TYPE_CHECKING, Any, Optional +from typing import TYPE_CHECKING, ClassVar from pyrit.auth import get_azure_openai_auth from pyrit.common import apply_defaults from pyrit.common.path import SCORER_SEED_PROMPT_PATH -from pyrit.executor.attack.core.attack_config import ( - AttackAdversarialConfig, - AttackScoringConfig, -) -from pyrit.executor.attack.multi_turn.red_teaming import RedTeamingAttack -from pyrit.executor.attack.single_turn.prompt_sending import PromptSendingAttack -from pyrit.models import SeedAttackGroup -from pyrit.prompt_target import OpenAIChatTarget, PromptChatTarget -from pyrit.scenario.core.atomic_attack import AtomicAttack -from pyrit.scenario.core.attack_technique import AttackTechnique +from pyrit.prompt_target import OpenAIChatTarget from pyrit.scenario.core.dataset_configuration import DatasetConfiguration from pyrit.scenario.core.scenario import Scenario -from pyrit.scenario.core.scenario_strategy import ScenarioStrategy from pyrit.score import ( SelfAskRefusalScorer, SelfAskTrueFalseScorer, @@ -31,24 +31,37 @@ ) if TYPE_CHECKING: - from pyrit.executor.attack.core.attack_strategy import AttackStrategy + from pyrit.scenario.core.scenario_strategy import ScenarioStrategy logger = logging.getLogger(__name__) +_CYBER_TECHNIQUE_NAMES = {"prompt_sending", "red_teaming"} + -class CyberStrategy(ScenarioStrategy): +def _build_cyber_strategy() -> type[ScenarioStrategy]: """ - Strategies for malware-focused cyber attacks. While not in the CyberStrategy class, a - few of these include: - * Shell smashing - * Zip bombs - * File deletion (rm -rf /). + Build the Cyber strategy class dynamically from SCENARIO_TECHNIQUES. + + Selects only ``prompt_sending`` and ``red_teaming`` techniques from + the shared catalog. + + Returns: + type[ScenarioStrategy]: The dynamically generated strategy enum class. """ + from pyrit.registry.object_registries.attack_technique_registry import AttackTechniqueRegistry + from pyrit.registry.tag_query import TagQuery + from pyrit.scenario.core.scenario_techniques import SCENARIO_TECHNIQUES + + cyber_specs = [s for s in SCENARIO_TECHNIQUES if s.name in _CYBER_TECHNIQUE_NAMES] - # Aggregate members (special markers that expand to strategies with matching tags) - ALL = ("all", {"all"}) - SINGLE_TURN = ("single_turn", {"single_turn"}) - MULTI_TURN = ("multi_turn", {"multi_turn"}) + return AttackTechniqueRegistry.build_strategy_class_from_specs( + class_name="CyberStrategy", + specs=cyber_specs, + aggregate_tags={ + "single_turn": TagQuery.any_of("single_turn"), + "multi_turn": TagQuery.any_of("multi_turn"), + }, + ) class Cyber(Scenario): @@ -60,27 +73,31 @@ class Cyber(Scenario): techniques. """ - VERSION: int = 1 + VERSION: int = 2 + _cached_strategy_class: ClassVar[type[ScenarioStrategy] | None] = None @classmethod def get_strategy_class(cls) -> type[ScenarioStrategy]: """ - Get the strategy enum class for this scenario. + Return the dynamically generated strategy class, building it on first access. Returns: - Type[ScenarioStrategy]: The CyberStrategy enum class. + type[ScenarioStrategy]: The CyberStrategy enum class. """ - return CyberStrategy + if cls._cached_strategy_class is None: + cls._cached_strategy_class = _build_cyber_strategy() + return cls._cached_strategy_class @classmethod def get_default_strategy(cls) -> ScenarioStrategy: """ - Get the default strategy used when no strategies are specified. + Return the default strategy member (``ALL``). Returns: - ScenarioStrategy: CyberStrategy.ALL (all cyber strategies). + ScenarioStrategy: The ALL strategy value. """ - return CyberStrategy.ALL + strategy_class = cls.get_strategy_class() + return strategy_class("all") @classmethod def default_dataset_config(cls) -> DatasetConfiguration: @@ -96,54 +113,36 @@ def default_dataset_config(cls) -> DatasetConfiguration: def __init__( self, *, - adversarial_chat: Optional[PromptChatTarget] = None, - objective_scorer: Optional[TrueFalseScorer] = None, + objective_scorer: TrueFalseScorer | None = None, include_baseline: bool = True, - scenario_result_id: Optional[str] = None, + scenario_result_id: str | None = None, ) -> None: """ Initialize the cyber harms scenario. Args: - adversarial_chat (Optional[PromptChatTarget]): Adversarial chat for the red teaming attack, corresponding - to CyberStrategy.MultiTurn. If not provided, defaults to an OpenAI chat target. - objective_scorer (Optional[TrueFalseScorer]): Objective scorer for malware detection. If not - provided, defaults to a SelfAskScorer using the malware.yaml file under the scorer config store for - malware detection + objective_scorer (TrueFalseScorer | None): Objective scorer for malware detection. If not + provided, defaults to a composite scorer using malware detection + refusal backstop. include_baseline (bool): Whether to include a baseline atomic attack that sends all objectives - without modifications. Defaults to True. When True, a "baseline" attack is automatically - added as the first atomic attack, allowing comparison between unmodified prompts and - attack-modified prompts. - scenario_result_id (Optional[str]): Optional ID of an existing scenario result to resume. + without modifications. Defaults to True. + scenario_result_id (str | None): Optional ID of an existing scenario result to resume. """ - # Cyber uses a "take object, make config" pattern to expose a more ergonomic interface. Helper - # methods return objects, not configs. - - # In this context the "objective" scorer has nothing to do with the "objective" target. - # The scoring config is what's used later in scoring orchestration, not the scorer object. - if not objective_scorer: - objective_scorer = self._get_default_objective_scorer() - self._scorer_config = AttackScoringConfig(objective_scorer=objective_scorer) - - # Adversarial chat is used by the RedTeamingAttack (multiturn) variation of the scenario. Otherwise, it's not - # invoked. - self._adversarial_chat = adversarial_chat if adversarial_chat else self._get_default_adversarial_target() - self._adversarial_config = AttackAdversarialConfig(target=self._adversarial_chat) + self._objective_scorer: TrueFalseScorer = ( + objective_scorer if objective_scorer else self._get_default_objective_scorer() + ) super().__init__( version=self.VERSION, - strategy_class=CyberStrategy, - objective_scorer=objective_scorer, + objective_scorer=self._objective_scorer, + strategy_class=self.get_strategy_class(), include_default_baseline=include_baseline, scenario_result_id=scenario_result_id, ) - # Will be resolved in _get_atomic_attacks_async - self._seed_groups: Optional[list[SeedAttackGroup]] = None - def _get_default_objective_scorer(self) -> TrueFalseCompositeScorer: """ Retrieve the default objective scorer. + The scoring mechanism is as follows: 1. Detect for the presence of malware using a SelfAskTrueFalseScorer. 2. Detect for NON-refusal as a backstop. (If the target model refuses to produce malware, we want that @@ -175,93 +174,3 @@ def _get_default_objective_scorer(self) -> TrueFalseCompositeScorer: return TrueFalseCompositeScorer( aggregator=TrueFalseScoreAggregator.AND, scorers=[presence_of_malware, backstop] ) - - def _get_default_adversarial_target(self) -> OpenAIChatTarget: - """ - Create and retrieve the default adversarial target. - - Returns: - OpenAIChatTarget: Default adversarial target, using an unfiltered endpoint. - """ - endpoint = os.getenv("AZURE_OPENAI_GPT4O_UNSAFE_CHAT_ENDPOINT") - return OpenAIChatTarget( - endpoint=endpoint, - api_key=get_azure_openai_auth(endpoint or ""), - model_name=os.environ.get("AZURE_OPENAI_GPT4O_UNSAFE_CHAT_MODEL"), - temperature=1.2, - ) - - def _resolve_seed_groups(self) -> list[SeedAttackGroup]: - """ - Resolve seed groups from dataset configuration. - - Returns: - List[SeedAttackGroup]: List of seed attack groups with objectives to be tested. - """ - # Use dataset_config (guaranteed to be set by initialize_async) - seed_groups = self._dataset_config.get_all_seed_attack_groups() - - if not seed_groups: - self._raise_dataset_exception() - - return list(seed_groups) - - def _get_atomic_attack_from_strategy(self, strategy: str) -> AtomicAttack: - """ - Translate the strategy into an actual AtomicAttack. - - Args: - strategy: The CyberStrategy enum (SingleTurn or MultiTurn). - - Returns: - AtomicAttack: configured for the specified strategy. - - Raises: - ValueError: If scenario is not properly initialized or an unknown CyberStrategy is passed. - """ - # objective_target is guaranteed to be non-None by parent class validation - if self._objective_target is None: - raise ValueError( - "Scenario not properly initialized. Call await scenario.initialize_async() before running." - ) - attack_strategy: Optional[AttackStrategy[Any, Any]] = None - if strategy == "single_turn": - attack_strategy = PromptSendingAttack( - objective_target=self._objective_target, - attack_scoring_config=self._scorer_config, - ) - elif strategy == "multi_turn": - attack_strategy = RedTeamingAttack( - objective_target=self._objective_target, - attack_scoring_config=self._scorer_config, - attack_adversarial_config=self._adversarial_config, - ) - else: - raise ValueError(f"Unknown CyberStrategy: {strategy}") - - # _seed_groups is guaranteed to be set by _get_atomic_attacks_async before this method is called - if self._seed_groups is None: - raise ValueError("_seed_groups must be resolved before creating atomic attacks") - - return AtomicAttack( - atomic_attack_name=f"cyber_{strategy}", - attack_technique=AttackTechnique(attack=attack_strategy), - seed_groups=self._seed_groups, - adversarial_chat=self._adversarial_chat, - objective_scorer=self._scorer_config.objective_scorer, - memory_labels=self._memory_labels, - ) - - async def _get_atomic_attacks_async(self) -> list[AtomicAttack]: - """ - Generate atomic attacks for each strategy. - - Returns: - List[AtomicAttack]: List of atomic attacks to execute. - """ - # Resolve seed groups from deprecated objectives or dataset config - self._seed_groups = self._resolve_seed_groups() - - strategies = {s.value for s in self._scenario_strategies} - - return [self._get_atomic_attack_from_strategy(strategy) for strategy in strategies] diff --git a/tests/unit/scenario/test_cyber.py b/tests/unit/scenario/test_cyber.py index 3e7f82845..a61d303cd 100644 --- a/tests/unit/scenario/test_cyber.py +++ b/tests/unit/scenario/test_cyber.py @@ -1,77 +1,80 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. -"""Tests for the Cyber class.""" +"""Tests for the Cyber scenario (refactored to technique registry pattern).""" -import pathlib from unittest.mock import MagicMock, patch import pytest -from pyrit.common.path import DATASETS_PATH from pyrit.executor.attack import PromptSendingAttack, RedTeamingAttack -from pyrit.executor.attack.core.attack_config import AttackScoringConfig from pyrit.identifiers import ComponentIdentifier -from pyrit.models import SeedAttackGroup, SeedDataset, SeedObjective -from pyrit.prompt_target import OpenAIChatTarget, PromptChatTarget, PromptTarget -from pyrit.scenario import DatasetConfiguration -from pyrit.scenario.airt import Cyber, CyberStrategy -from pyrit.score import TrueFalseCompositeScorer - - -def _mock_scorer_id(name: str = "MockObjectiveScorer") -> ComponentIdentifier: - """Helper to create ComponentIdentifier for tests.""" - return ComponentIdentifier( - class_name=name, - class_module="test", - ) +from pyrit.models import SeedAttackGroup, SeedObjective, SeedPrompt +from pyrit.prompt_target import PromptTarget +from pyrit.prompt_target.common.prompt_chat_target import PromptChatTarget +from pyrit.registry.object_registries.attack_technique_registry import AttackTechniqueRegistry +from pyrit.scenario.core.dataset_configuration import DatasetConfiguration +from pyrit.scenario.core.scenario_techniques import register_scenario_techniques +from pyrit.scenario.scenarios.airt.cyber import Cyber +from pyrit.score import TrueFalseScorer +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- -def _mock_target_id(name: str = "MockTarget") -> ComponentIdentifier: - """Helper to create ComponentIdentifier for tests.""" - return ComponentIdentifier( - class_name=name, - class_module="test", - ) +def _mock_id(name: str) -> ComponentIdentifier: + return ComponentIdentifier(class_name=name, class_module="test") -@pytest.fixture -def mock_memory_seed_groups(): - """Create mock seed groups that _get_default_seed_groups() would return.""" - malware_path = pathlib.Path(DATASETS_PATH) / "seed_datasets" / "local" / "airt" - seed_prompts = list(SeedDataset.from_yaml_file(malware_path / "malware.prompt").get_values()) - return [SeedAttackGroup(seeds=[SeedObjective(value=prompt)]) for prompt in seed_prompts] +def _strategy_class(): + """Get the dynamically-generated CyberStrategy class.""" + return Cyber.get_strategy_class() -@pytest.fixture -def mock_dataset_config(mock_memory_seed_groups): - """Create a mock dataset config that returns the seed groups.""" - mock_config = MagicMock(spec=DatasetConfiguration) - mock_config.get_all_seed_attack_groups.return_value = mock_memory_seed_groups - mock_config.get_default_dataset_names.return_value = ["airt_malware"] - mock_config.has_data_source.return_value = True - return mock_config + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- @pytest.fixture -def fast_cyberstrategy(): - return CyberStrategy.SINGLE_TURN +def mock_objective_target(): + mock = MagicMock(spec=PromptTarget) + mock.get_identifier.return_value = _mock_id("MockObjectiveTarget") + return mock @pytest.fixture -def slow_cyberstrategy(): - return CyberStrategy.MULTI_TURN +def mock_adversarial_target(): + mock = MagicMock(spec=PromptChatTarget) + mock.get_identifier.return_value = _mock_id("MockAdversarialTarget") + return mock @pytest.fixture -def malware_prompts(): - """The default malware prompts.""" - malware_path = pathlib.Path(DATASETS_PATH) / "seed_datasets" / "local" / "airt" - return list(SeedDataset.from_yaml_file(malware_path / "malware.prompt").get_values()) +def mock_objective_scorer(): + mock = MagicMock(spec=TrueFalseScorer) + mock.get_identifier.return_value = _mock_id("MockObjectiveScorer") + return mock + + +@pytest.fixture(autouse=True) +def reset_technique_registry(): + """Reset the AttackTechniqueRegistry, TargetRegistry, and cached strategy class between tests.""" + from pyrit.registry import TargetRegistry + + AttackTechniqueRegistry.reset_instance() + TargetRegistry.reset_instance() + Cyber._cached_strategy_class = None + yield + AttackTechniqueRegistry.reset_instance() + TargetRegistry.reset_instance() + Cyber._cached_strategy_class = None @pytest.fixture def mock_runtime_env(): + """Set minimal env vars needed for OpenAIChatTarget fallback via @apply_defaults.""" with patch.dict( "os.environ", { @@ -86,260 +89,262 @@ def mock_runtime_env(): yield -@pytest.fixture -def mock_objective_target(): - """Create a mock objective target for testing.""" - mock = MagicMock(spec=PromptTarget) - mock.get_identifier.return_value = _mock_target_id("MockObjectiveTarget") - return mock +def _make_seed_groups(name: str) -> list[SeedAttackGroup]: + """Create two seed attack groups for a given category.""" + return [ + SeedAttackGroup(seeds=[SeedObjective(value=f"{name} objective 1"), SeedPrompt(value=f"{name} prompt 1")]), + SeedAttackGroup(seeds=[SeedObjective(value=f"{name} objective 2"), SeedPrompt(value=f"{name} prompt 2")]), + ] -@pytest.fixture -def mock_objective_scorer(): - """Create a mock objective scorer for testing.""" - mock = MagicMock(spec=TrueFalseCompositeScorer) - mock.get_identifier.return_value = _mock_scorer_id("MockObjectiveScorer") - return mock +FIXTURES = ["patch_central_database", "mock_runtime_env"] -@pytest.fixture -def mock_adversarial_target(): - """Create a mock adversarial target for testing.""" - mock = MagicMock(spec=PromptChatTarget) - mock.get_identifier.return_value = _mock_target_id("MockAdversarialTarget") - return mock +# =========================================================================== +# Initialization / class-level tests +# =========================================================================== -FIXTURES = ["patch_central_database", "mock_runtime_env"] +@pytest.mark.usefixtures(*FIXTURES) +class TestCyberBasic: + """Tests for Cyber initialization and class properties.""" + def test_version_is_2(self): + assert Cyber.VERSION == 2 -@pytest.mark.usefixtures(*FIXTURES) -class TestCyberInitialization: - """Tests for Cyber initialization.""" + def test_get_strategy_class(self): + strat = _strategy_class() + assert Cyber.get_strategy_class() is strat - def test_init_with_default_objectives(self, mock_objective_scorer, malware_prompts, mock_memory_seed_groups): - """Test initialization with default objectives.""" + def test_get_default_strategy_returns_all(self): + strat = _strategy_class() + assert Cyber.get_default_strategy() == strat.ALL - with patch.object(Cyber, "_resolve_seed_groups", return_value=mock_memory_seed_groups): - scenario = Cyber(objective_scorer=mock_objective_scorer) + def test_default_dataset_config_has_malware_dataset(self): + config = Cyber.default_dataset_config() + assert isinstance(config, DatasetConfiguration) + names = config.get_default_dataset_names() + assert "airt_malware" in names + assert len(names) == 1 - assert scenario.name == "Cyber" - assert scenario.VERSION == 1 - - def test_init_with_default_scorer(self, mock_memory_seed_groups): - """Test initialization with default scorer.""" - with patch.object(Cyber, "_resolve_seed_groups", return_value=mock_memory_seed_groups): - scenario = Cyber() - assert scenario._objective_scorer_identifier - - def test_init_with_custom_scorer(self, mock_objective_scorer, mock_memory_seed_groups): - """Test initialization with custom scorer.""" - scorer = MagicMock(TrueFalseCompositeScorer) - with patch.object(Cyber, "_resolve_seed_groups", return_value=mock_memory_seed_groups): - scenario = Cyber(objective_scorer=scorer) - assert isinstance(scenario._scorer_config, AttackScoringConfig) - - def test_init_default_adversarial_chat(self, mock_objective_scorer, mock_memory_seed_groups): - """Test initialization with default adversarial chat.""" - with patch.object(Cyber, "_resolve_seed_groups", return_value=mock_memory_seed_groups): - scenario = Cyber( - objective_scorer=mock_objective_scorer, - ) - - assert isinstance(scenario._adversarial_chat, OpenAIChatTarget) - assert scenario._adversarial_chat._temperature == 1.2 - - def test_init_with_adversarial_chat(self, mock_objective_scorer, mock_memory_seed_groups): - """Test initialization with adversarial chat (for red teaming attack variation).""" - adversarial_chat = MagicMock(OpenAIChatTarget) - adversarial_chat.get_identifier.return_value = _mock_target_id("CustomAdversary") - - with patch.object(Cyber, "_resolve_seed_groups", return_value=mock_memory_seed_groups): - scenario = Cyber( - adversarial_chat=adversarial_chat, - objective_scorer=mock_objective_scorer, - ) - assert scenario._adversarial_chat == adversarial_chat - assert scenario._adversarial_config.target == adversarial_chat + def test_default_dataset_config_max_dataset_size(self): + config = Cyber.default_dataset_config() + assert config.max_dataset_size == 4 - @pytest.mark.asyncio - async def test_init_raises_exception_when_no_datasets_available(self, mock_objective_target, mock_objective_scorer): - """Test that initialization raises ValueError when datasets are not available in memory.""" - # Don't mock _resolve_seed_groups, let it try to load from empty memory + def test_initialization_with_custom_scorer(self, mock_objective_scorer): scenario = Cyber(objective_scorer=mock_objective_scorer) + assert scenario._objective_scorer == mock_objective_scorer - # Error should occur during initialize_async when _get_atomic_attacks_async resolves seed groups - with pytest.raises(ValueError, match="DatasetConfiguration has no seed_groups"): - await scenario.initialize_async(objective_target=mock_objective_target) - + def test_initialization_with_default_scorer(self): + scenario = Cyber() + assert scenario._objective_scorer_identifier is not None -@pytest.mark.usefixtures(*FIXTURES) -class TestCyberAttackGeneration: - """Tests for Cyber attack generation.""" + def test_scenario_name_is_cyber(self, mock_objective_scorer): + scenario = Cyber(objective_scorer=mock_objective_scorer) + assert scenario.name == "Cyber" @pytest.mark.asyncio - async def test_attack_generation_for_all( - self, mock_objective_target, mock_objective_scorer, mock_memory_seed_groups, mock_dataset_config + @patch.object( + DatasetConfiguration, "get_seed_attack_groups", return_value={"malware": _make_seed_groups("malware")} + ) + async def test_initialization_defaults_to_all_strategy( + self, + _mock_groups, + mock_objective_target, + mock_objective_scorer, ): - """Test that _get_atomic_attacks_async returns atomic attacks.""" - with patch.object(Cyber, "_resolve_seed_groups", return_value=mock_memory_seed_groups): - scenario = Cyber(objective_scorer=mock_objective_scorer) - - await scenario.initialize_async(objective_target=mock_objective_target, dataset_config=mock_dataset_config) - atomic_attacks = await scenario._get_atomic_attacks_async() + scenario = Cyber(objective_scorer=mock_objective_scorer) + await scenario.initialize_async(objective_target=mock_objective_target) + # ALL expands to prompt_sending + red_teaming → 2 strategies + assert len(scenario._scenario_strategies) == 2 - assert len(atomic_attacks) > 0 - assert all(run.attack_technique is not None for run in atomic_attacks) + @pytest.mark.asyncio + async def test_initialize_raises_when_no_datasets(self, mock_objective_target, mock_objective_scorer): + """Dataset resolution fails from empty memory.""" + scenario = Cyber(objective_scorer=mock_objective_scorer) + with pytest.raises(ValueError, match="DatasetConfiguration has no seed_groups"): + await scenario.initialize_async(objective_target=mock_objective_target) @pytest.mark.asyncio - async def test_attack_generation_for_singleturn( + @patch.object( + DatasetConfiguration, "get_seed_attack_groups", return_value={"malware": _make_seed_groups("malware")} + ) + async def test_memory_labels_stored( self, + _mock_groups, mock_objective_target, mock_objective_scorer, - mock_memory_seed_groups, - mock_dataset_config, - fast_cyberstrategy, ): - """Test that the single turn attack generation works.""" - with patch.object(Cyber, "_resolve_seed_groups", return_value=mock_memory_seed_groups): - scenario = Cyber( - objective_scorer=mock_objective_scorer, - ) - - await scenario.initialize_async( - objective_target=mock_objective_target, - scenario_strategies=[fast_cyberstrategy], - dataset_config=mock_dataset_config, - ) - atomic_attacks = await scenario._get_atomic_attacks_async() - for run in atomic_attacks: - assert isinstance(run.attack_technique.attack, PromptSendingAttack) + labels = {"test_run": "123"} + scenario = Cyber(objective_scorer=mock_objective_scorer) + await scenario.initialize_async(objective_target=mock_objective_target, memory_labels=labels) + assert scenario._memory_labels == labels @pytest.mark.asyncio - async def test_attack_generation_for_multiturn( + @patch.object( + DatasetConfiguration, "get_seed_attack_groups", return_value={"malware": _make_seed_groups("malware")} + ) + async def test_initialize_async_with_max_concurrency( self, + _mock_groups, mock_objective_target, mock_objective_scorer, - mock_memory_seed_groups, - mock_dataset_config, - slow_cyberstrategy, ): - """Test that the multi turn attack generation works.""" - with patch.object(Cyber, "_resolve_seed_groups", return_value=mock_memory_seed_groups): - scenario = Cyber( - objective_scorer=mock_objective_scorer, - ) - - await scenario.initialize_async( - objective_target=mock_objective_target, - scenario_strategies=[slow_cyberstrategy], - dataset_config=mock_dataset_config, - ) - atomic_attacks = await scenario._get_atomic_attacks_async() - - for run in atomic_attacks: - assert isinstance(run.attack_technique.attack, RedTeamingAttack) + scenario = Cyber(objective_scorer=mock_objective_scorer) + await scenario.initialize_async(objective_target=mock_objective_target, max_concurrency=20) + assert scenario._max_concurrency == 20 - @pytest.mark.asyncio - async def test_attack_runs_include_objectives( - self, mock_objective_target, mock_objective_scorer, mock_memory_seed_groups, mock_dataset_config - ): - """Test that attack runs include objectives for each seed prompt.""" - with patch.object(Cyber, "_resolve_seed_groups", return_value=mock_memory_seed_groups): - scenario = Cyber( - objective_scorer=mock_objective_scorer, - ) - await scenario.initialize_async(objective_target=mock_objective_target, dataset_config=mock_dataset_config) - atomic_attacks = await scenario._get_atomic_attacks_async() +# =========================================================================== +# Attack generation tests +# =========================================================================== + + +@pytest.mark.usefixtures(*FIXTURES) +class TestCyberAttackGeneration: + """Tests for _get_atomic_attacks_async with various strategies.""" - # Check that objectives are created for each seed prompt - for run in atomic_attacks: - assert len(run.objectives) > 0 + async def _init_and_get_attacks( + self, + *, + mock_objective_target, + mock_objective_scorer, + strategies=None, + seed_groups: dict[str, list[SeedAttackGroup]] | None = None, + ): + """Helper: initialize scenario and return atomic attacks.""" + groups = seed_groups or {"malware": _make_seed_groups("malware")} + with patch.object(DatasetConfiguration, "get_seed_attack_groups", return_value=groups): + scenario = Cyber(objective_scorer=mock_objective_scorer) + init_kwargs = {"objective_target": mock_objective_target} + if strategies: + init_kwargs["scenario_strategies"] = strategies + await scenario.initialize_async(**init_kwargs) + return await scenario._get_atomic_attacks_async() @pytest.mark.asyncio - async def test_get_atomic_attacks_async_returns_attacks( - self, mock_objective_target, mock_objective_scorer, mock_memory_seed_groups, mock_dataset_config + async def test_all_strategy_produces_prompt_sending_and_red_teaming( + self, mock_objective_target, mock_objective_scorer ): - """Test that _get_atomic_attacks_async returns atomic attacks.""" - with patch.object(Cyber, "_resolve_seed_groups", return_value=mock_memory_seed_groups): - scenario = Cyber( - objective_scorer=mock_objective_scorer, - ) + attacks = await self._init_and_get_attacks( + mock_objective_target=mock_objective_target, + mock_objective_scorer=mock_objective_scorer, + strategies=[_strategy_class().ALL], + ) + technique_classes = {type(a.attack_technique.attack) for a in attacks} + assert technique_classes == {PromptSendingAttack, RedTeamingAttack} - await scenario.initialize_async(objective_target=mock_objective_target, dataset_config=mock_dataset_config) - atomic_attacks = await scenario._get_atomic_attacks_async() - assert len(atomic_attacks) > 0 - assert all(run.attack_technique is not None for run in atomic_attacks) + @pytest.mark.asyncio + async def test_single_turn_strategy_produces_prompt_sending(self, mock_objective_target, mock_objective_scorer): + attacks = await self._init_and_get_attacks( + mock_objective_target=mock_objective_target, + mock_objective_scorer=mock_objective_scorer, + strategies=[_strategy_class().SINGLE_TURN], + ) + technique_classes = {type(a.attack_technique.attack) for a in attacks} + assert technique_classes == {PromptSendingAttack} + @pytest.mark.asyncio + async def test_multi_turn_strategy_produces_red_teaming(self, mock_objective_target, mock_objective_scorer): + attacks = await self._init_and_get_attacks( + mock_objective_target=mock_objective_target, + mock_objective_scorer=mock_objective_scorer, + strategies=[_strategy_class().MULTI_TURN], + ) + technique_classes = {type(a.attack_technique.attack) for a in attacks} + assert technique_classes == {RedTeamingAttack} -@pytest.mark.usefixtures(*FIXTURES) -class TestCyberLifecycle: - """ - Tests for Cyber lifecycle, including initialize_async and execution. - """ + @pytest.mark.asyncio + async def test_default_strategy_produces_both_techniques(self, mock_objective_target, mock_objective_scorer): + """Default (ALL) should produce both PromptSending and RedTeaming.""" + attacks = await self._init_and_get_attacks( + mock_objective_target=mock_objective_target, + mock_objective_scorer=mock_objective_scorer, + ) + technique_classes = {type(a.attack_technique.attack) for a in attacks} + assert technique_classes == {PromptSendingAttack, RedTeamingAttack} @pytest.mark.asyncio - async def test_initialize_async_with_max_concurrency( - self, mock_objective_target, mock_objective_scorer, mock_memory_seed_groups, mock_dataset_config - ): - """Test initialization with custom max_concurrency.""" - with patch.object(Cyber, "_resolve_seed_groups", return_value=mock_memory_seed_groups): - scenario = Cyber(objective_scorer=mock_objective_scorer) - await scenario.initialize_async( - objective_target=mock_objective_target, max_concurrency=20, dataset_config=mock_dataset_config - ) - assert scenario._max_concurrency == 20 + async def test_single_technique_selection(self, mock_objective_target, mock_objective_scorer): + attacks = await self._init_and_get_attacks( + mock_objective_target=mock_objective_target, + mock_objective_scorer=mock_objective_scorer, + strategies=[_strategy_class()("prompt_sending")], + ) + assert len(attacks) > 0 + for a in attacks: + assert isinstance(a.attack_technique.attack, PromptSendingAttack) @pytest.mark.asyncio - async def test_initialize_async_with_memory_labels( - self, mock_objective_target, mock_objective_scorer, mock_memory_seed_groups, mock_dataset_config - ): - """Test initialization with memory labels.""" - memory_labels = {"test": "cyber", "category": "scenario"} + async def test_atomic_attack_names_are_unique(self, mock_objective_target, mock_objective_scorer): + attacks = await self._init_and_get_attacks( + mock_objective_target=mock_objective_target, + mock_objective_scorer=mock_objective_scorer, + ) + names = [a.atomic_attack_name for a in attacks] + assert len(names) == len(set(names)) + for name in names: + assert "_" in name - with patch.object(Cyber, "_resolve_seed_groups", return_value=mock_memory_seed_groups): - scenario = Cyber( - objective_scorer=mock_objective_scorer, - ) - await scenario.initialize_async( - memory_labels=memory_labels, - objective_target=mock_objective_target, - dataset_config=mock_dataset_config, - ) + @pytest.mark.asyncio + async def test_attacks_include_seed_groups(self, mock_objective_target, mock_objective_scorer): + attacks = await self._init_and_get_attacks( + mock_objective_target=mock_objective_target, + mock_objective_scorer=mock_objective_scorer, + strategies=[_strategy_class()("prompt_sending")], + ) + for a in attacks: + assert len(a.objectives) > 0 - assert scenario._memory_labels == memory_labels + @pytest.mark.asyncio + async def test_raises_when_not_initialized(self, mock_objective_scorer): + scenario = Cyber(objective_scorer=mock_objective_scorer) + with pytest.raises(ValueError, match="Scenario not properly initialized"): + await scenario._get_atomic_attacks_async() + + +# =========================================================================== +# Dynamic export tests +# =========================================================================== @pytest.mark.usefixtures(*FIXTURES) -class TestCyberProperties: - """ - Tests for Cyber properties and attributes. - """ +class TestCyberDynamicExport: + """Tests for CyberStrategy lazy resolution from __init__.py.""" - def test_scenario_version_is_set(self, mock_objective_scorer, mock_memory_seed_groups): - """Test that scenario version is properly set.""" - with patch.object(Cyber, "_resolve_seed_groups", return_value=mock_memory_seed_groups): - scenario = Cyber( - objective_scorer=mock_objective_scorer, - ) + def test_cyber_strategy_resolves_from_module(self): + from pyrit.scenario.scenarios.airt import CyberStrategy - assert scenario.VERSION == 1 + assert CyberStrategy is _strategy_class() - @pytest.mark.asyncio - async def test_no_target_duplication(self, mock_objective_target, mock_memory_seed_groups, mock_dataset_config): - """Test that all three targets (adversarial, object, scorer) are distinct.""" - with patch.object(Cyber, "_resolve_seed_groups", return_value=mock_memory_seed_groups): - scenario = Cyber() - await scenario.initialize_async(objective_target=mock_objective_target, dataset_config=mock_dataset_config) - - objective_target = scenario._objective_target - - # this works because TrueFalseCompositeScorer subclasses TrueFalseScorer, - # but TrueFalseScorer itself (the type for ScorerConfig) does not have ._scorers. - scorer_target = scenario._scorer_config.objective_scorer._scorers[0] # type: ignore[arg-type] - adversarial_target = scenario._adversarial_chat - - assert objective_target != scorer_target - assert objective_target != adversarial_target - assert scorer_target != adversarial_target + +# =========================================================================== +# Registry integration tests +# =========================================================================== + + +@pytest.mark.usefixtures(*FIXTURES) +class TestCyberRegistryIntegration: + """Tests for attack technique registry wiring via Cyber scenario.""" + + def test_cyber_factories_include_prompt_sending_and_red_teaming(self, mock_objective_scorer): + scenario = Cyber(objective_scorer=mock_objective_scorer) + factories = scenario._get_attack_technique_factories() + # Cyber uses all registered techniques from the registry; prompt_sending + red_teaming are present + assert "prompt_sending" in factories + assert "red_teaming" in factories + assert factories["prompt_sending"].attack_class is PromptSendingAttack + assert factories["red_teaming"].attack_class is RedTeamingAttack + + def test_red_teaming_factory_has_adversarial_config(self, mock_objective_scorer): + """red_teaming factory should have adversarial config baked in.""" + scenario = Cyber(objective_scorer=mock_objective_scorer) + factories = scenario._get_attack_technique_factories() + assert "attack_adversarial_config" in factories["red_teaming"]._attack_kwargs + + def test_register_idempotent(self): + """Calling register_scenario_techniques twice doesn't duplicate entries.""" + register_scenario_techniques() + register_scenario_techniques() + registry = AttackTechniqueRegistry.get_registry_singleton() + assert len([n for n in registry.get_names() if n == "red_teaming"]) == 1 diff --git a/tests/unit/scenario/test_rapid_response.py b/tests/unit/scenario/test_rapid_response.py index 9d8b088fb..e215bd250 100644 --- a/tests/unit/scenario/test_rapid_response.py +++ b/tests/unit/scenario/test_rapid_response.py @@ -86,11 +86,11 @@ def reset_technique_registry(): AttackTechniqueRegistry.reset_instance() TargetRegistry.reset_instance() - RapidResponse._strategy_class = None + RapidResponse._cached_strategy_class = None yield AttackTechniqueRegistry.reset_instance() TargetRegistry.reset_instance() - RapidResponse._strategy_class = None + RapidResponse._cached_strategy_class = None @pytest.fixture(autouse=True) @@ -133,84 +133,6 @@ def _make_seed_groups(name: str) -> list[SeedAttackGroup]: FIXTURES = ["patch_central_database", "mock_runtime_env"] -# =========================================================================== -# Strategy enum tests -# =========================================================================== - - -class TestRapidResponseStrategy: - """Tests for the dynamically-generated RapidResponseStrategy enum.""" - - def test_technique_members_exist(self): - """All four technique members are accessible by value.""" - strat = _strategy_class() - assert strat("prompt_sending").value == "prompt_sending" - assert strat("role_play").value == "role_play" - assert strat("many_shot").value == "many_shot" - assert strat("tap").value == "tap" - - def test_aggregate_members_exist(self): - """All four aggregate members are accessible.""" - strat = _strategy_class() - assert strat.ALL.value == "all" - assert strat.DEFAULT.value == "default" - assert strat.SINGLE_TURN.value == "single_turn" - assert strat.MULTI_TURN.value == "multi_turn" - - def test_total_member_count(self): - """4 aggregates + 4 techniques = 8 members.""" - assert len(list(_strategy_class())) == 8 - - def test_non_aggregate_count(self): - """get_all_strategies returns only the 4 technique members.""" - non_aggregate = _strategy_class().get_all_strategies() - assert len(non_aggregate) == 4 - - def test_aggregate_tags(self): - tags = _strategy_class().get_aggregate_tags() - assert tags == {"all", "default", "single_turn", "multi_turn"} - - def test_default_expands_to_prompt_sending_and_many_shot(self): - """DEFAULT aggregate should expand to prompt_sending + many_shot.""" - strat = _strategy_class() - expanded = strat.normalize_strategies({strat.DEFAULT}) - values = {s.value for s in expanded} - assert values == {"prompt_sending", "many_shot"} - - def test_single_turn_expands_to_prompt_sending_and_role_play(self): - strat = _strategy_class() - expanded = strat.normalize_strategies({strat.SINGLE_TURN}) - values = {s.value for s in expanded} - assert values == {"prompt_sending", "role_play"} - - def test_multi_turn_expands_to_many_shot_and_tap(self): - strat = _strategy_class() - expanded = strat.normalize_strategies({strat.MULTI_TURN}) - values = {s.value for s in expanded} - assert values == {"many_shot", "tap"} - - def test_all_expands_to_all_techniques(self): - strat = _strategy_class() - expanded = strat.normalize_strategies({strat.ALL}) - values = {s.value for s in expanded} - assert values == {"prompt_sending", "role_play", "many_shot", "tap"} - - def test_strategy_values_are_unique(self): - strat = _strategy_class() - values = [s.value for s in strat] - assert len(values) == len(set(values)) - - def test_invalid_strategy_value_raises(self): - strat = _strategy_class() - with pytest.raises(ValueError): - strat("nonexistent") - - def test_invalid_strategy_name_raises(self): - strat = _strategy_class() - with pytest.raises(KeyError): - strat["Nonexistent"] - - # =========================================================================== # Initialization / class-level tests # =========================================================================== @@ -356,29 +278,33 @@ async def test_single_turn_strategy_produces_prompt_sending_and_role_play( assert technique_classes == {PromptSendingAttack, RolePlayAttack} @pytest.mark.asyncio - async def test_multi_turn_strategy_produces_many_shot_and_tap(self, mock_objective_target, mock_objective_scorer): + async def test_multi_turn_strategy_produces_multi_turn_attacks(self, mock_objective_target, mock_objective_scorer): attacks = await self._init_and_get_attacks( mock_objective_target=mock_objective_target, mock_objective_scorer=mock_objective_scorer, strategies=[_strategy_class().MULTI_TURN], ) technique_classes = {type(a.attack_technique.attack) for a in attacks} - assert technique_classes == {ManyShotJailbreakAttack, TreeOfAttacksWithPruningAttack} + assert len(technique_classes) >= 2 + assert {ManyShotJailbreakAttack, TreeOfAttacksWithPruningAttack} <= technique_classes @pytest.mark.asyncio - async def test_all_strategy_produces_all_four_techniques(self, mock_objective_target, mock_objective_scorer): + async def test_all_strategy_produces_attacks_for_every_technique( + self, mock_objective_target, mock_objective_scorer + ): attacks = await self._init_and_get_attacks( mock_objective_target=mock_objective_target, mock_objective_scorer=mock_objective_scorer, strategies=[_strategy_class().ALL], ) technique_classes = {type(a.attack_technique.attack) for a in attacks} - assert technique_classes == { + # Should include all known core techniques + assert { PromptSendingAttack, RolePlayAttack, ManyShotJailbreakAttack, TreeOfAttacksWithPruningAttack, - } + } <= technique_classes @pytest.mark.asyncio async def test_single_technique_selection(self, mock_objective_target, mock_objective_scorer): @@ -525,10 +451,10 @@ def test_rapid_response_ignores_technique_name(self, mock_objective_scorer): class TestCoreTechniques: """Tests for shared AttackTechniqueFactory builders in scenario_techniques.py.""" - def test_instance_returns_all_four_factories(self, mock_objective_scorer): + def test_instance_returns_all_factories(self, mock_objective_scorer): scenario = RapidResponse(objective_scorer=mock_objective_scorer) factories = scenario._get_attack_technique_factories() - assert set(factories.keys()) == {"prompt_sending", "role_play", "many_shot", "tap"} + assert {"prompt_sending", "role_play", "many_shot", "tap"} <= set(factories.keys()) assert factories["prompt_sending"].attack_class is PromptSendingAttack assert factories["role_play"].attack_class is RolePlayAttack assert factories["many_shot"].attack_class is ManyShotJailbreakAttack @@ -599,14 +525,14 @@ def test_register_populates_registry(self, mock_adversarial_target): register_scenario_techniques() registry = AttackTechniqueRegistry.get_registry_singleton() names = set(registry.get_names()) - assert names == {"prompt_sending", "role_play", "many_shot", "tap"} + assert {"prompt_sending", "role_play", "many_shot", "tap"} <= names def test_register_idempotent(self, mock_adversarial_target): """Calling register_scenario_techniques() twice doesn't duplicate entries.""" register_scenario_techniques() register_scenario_techniques() registry = AttackTechniqueRegistry.get_registry_singleton() - assert len(registry) == 4 + assert len(registry) == len(SCENARIO_TECHNIQUES) def test_register_preserves_custom(self, mock_adversarial_target): """Pre-registered custom techniques aren't overwritten.""" @@ -619,8 +545,7 @@ def test_register_preserves_custom(self, mock_adversarial_target): # role_play should still be the custom factory factories = registry.get_factories() assert factories["role_play"] is custom_factory - # Other 3 should have been registered normally - assert len(factories) == 4 + assert len(factories) == len(SCENARIO_TECHNIQUES) def test_get_factories_returns_dict(self, mock_adversarial_target): """get_factories() returns a dict of name → factory.""" @@ -628,7 +553,7 @@ def test_get_factories_returns_dict(self, mock_adversarial_target): registry = AttackTechniqueRegistry.get_registry_singleton() factories = registry.get_factories() assert isinstance(factories, dict) - assert set(factories.keys()) == {"prompt_sending", "role_play", "many_shot", "tap"} + assert {"prompt_sending", "role_play", "many_shot", "tap"} <= set(factories.keys()) assert factories["prompt_sending"].attack_class is PromptSendingAttack def test_scenario_base_class_reads_from_registry(self, mock_objective_scorer): @@ -636,12 +561,12 @@ def test_scenario_base_class_reads_from_registry(self, mock_objective_scorer): scenario = RapidResponse(objective_scorer=mock_objective_scorer) factories = scenario._get_attack_technique_factories() - # Should have all 4 core techniques from the registry - assert set(factories.keys()) == {"prompt_sending", "role_play", "many_shot", "tap"} + # Should have all core techniques from the registry + assert {"prompt_sending", "role_play", "many_shot", "tap"} <= set(factories.keys()) # Registry should also have them registry = AttackTechniqueRegistry.get_registry_singleton() - assert set(registry.get_names()) == {"prompt_sending", "role_play", "many_shot", "tap"} + assert {"prompt_sending", "role_play", "many_shot", "tap"} <= set(registry.get_names()) def test_tags_assigned_correctly(self, mock_adversarial_target): """Core techniques have correct tags (single_turn / multi_turn).""" @@ -651,8 +576,8 @@ def test_tags_assigned_correctly(self, mock_adversarial_target): single_turn = {e.name for e in registry.get_by_tag(tag="single_turn")} multi_turn = {e.name for e in registry.get_by_tag(tag="multi_turn")} - assert single_turn == {"prompt_sending", "role_play"} - assert multi_turn == {"many_shot", "tap"} + assert {"prompt_sending", "role_play"} <= single_turn + assert {"many_shot", "tap"} <= multi_turn # =========================================================================== @@ -664,11 +589,13 @@ def test_tags_assigned_correctly(self, mock_adversarial_target): class TestRegistrationAndFactoryFromSpec: """Tests for register_scenario_techniques and AttackTechniqueRegistry.build_factory_from_spec.""" - def test_register_populates_all_four_techniques(self): - """register_scenario_techniques with default adversarial registers all 4 techniques.""" + def test_register_populates_all_techniques(self): + """register_scenario_techniques registers all catalog techniques.""" register_scenario_techniques() registry = AttackTechniqueRegistry.get_registry_singleton() - assert set(registry.get_names()) == {"prompt_sending", "role_play", "many_shot", "tap"} + registered = set(registry.get_names()) + expected = {s.name for s in SCENARIO_TECHNIQUES} + assert registered == expected def test_register_with_custom_adversarial_uses_default(self, mock_adversarial_target): """Registry always bakes default adversarial target, not caller-specific.""" @@ -688,7 +615,7 @@ def test_register_idempotent(self, mock_adversarial_target): register_scenario_techniques() register_scenario_techniques() registry = AttackTechniqueRegistry.get_registry_singleton() - assert len(registry) == 4 + assert len(registry) == len(SCENARIO_TECHNIQUES) def test_register_preserves_custom_preregistered(self, mock_adversarial_target): """Pre-registered custom techniques are not overwritten.""" @@ -699,7 +626,7 @@ def test_register_preserves_custom_preregistered(self, mock_adversarial_target): register_scenario_techniques() # role_play should still be the custom factory assert registry.get_factories()["role_play"] is custom_factory - assert len(registry) == 4 + assert len(registry) == len(SCENARIO_TECHNIQUES) def test_register_assigns_correct_tags(self, mock_adversarial_target): """Tags from AttackTechniqueSpec are applied correctly.""" @@ -708,8 +635,8 @@ def test_register_assigns_correct_tags(self, mock_adversarial_target): single_turn = {e.name for e in registry.get_by_tag(tag="single_turn")} multi_turn = {e.name for e in registry.get_by_tag(tag="multi_turn")} - assert single_turn == {"prompt_sending", "role_play"} - assert multi_turn == {"many_shot", "tap"} + assert {"prompt_sending", "role_play"} <= single_turn + assert {"many_shot", "tap"} <= multi_turn def test_register_from_specs_custom_list(self, mock_adversarial_target): """register_from_specs accepts a custom list of AttackTechniqueSpecs.""" @@ -879,10 +806,10 @@ def test_extra_kwargs_reserved_key_raises(self): with pytest.raises(ValueError, match="attack_adversarial_config"): AttackTechniqueRegistry.build_factory_from_spec(spec) - def test_scenario_techniques_list_has_four_entries(self): - assert len(SCENARIO_TECHNIQUES) == 4 - names = {s.name for s in SCENARIO_TECHNIQUES} - assert names == {"prompt_sending", "role_play", "many_shot", "tap"} + def test_scenario_techniques_list_nonempty_with_unique_names(self): + assert len(SCENARIO_TECHNIQUES) >= 1 + names = [s.name for s in SCENARIO_TECHNIQUES] + assert len(names) == len(set(names)), "Duplicate technique names in SCENARIO_TECHNIQUES" def test_frozen_spec(self): """AttackTechniqueSpec is frozen (immutable).""" diff --git a/tests/unit/scenario/test_scenario_strategy_invariants.py b/tests/unit/scenario/test_scenario_strategy_invariants.py new file mode 100644 index 000000000..3fa9bbaa8 --- /dev/null +++ b/tests/unit/scenario/test_scenario_strategy_invariants.py @@ -0,0 +1,183 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +""" +Shared structural invariants for dynamically-generated ScenarioStrategy enums. + +These tests verify that the strategy machinery works correctly for every +scenario that builds a strategy class via the technique registry. Adding a +new technique to the catalog should not require updating these tests. +""" + +from unittest.mock import patch + +import pytest + +from pyrit.registry.object_registries.attack_technique_registry import AttackTechniqueRegistry +from pyrit.scenario.core.scenario_strategy import ScenarioStrategy + +# --------------------------------------------------------------------------- +# Synthetic many-shot examples — prevents reading the real JSON during tests +# --------------------------------------------------------------------------- +_MOCK_MANY_SHOT_EXAMPLES = [{"question": f"q{i}", "answer": f"a{i}"} for i in range(100)] + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def _reset_registries(): + """Reset singletons and cached strategy classes between every test.""" + from pyrit.registry import TargetRegistry + from pyrit.scenario.scenarios.airt.cyber import Cyber + from pyrit.scenario.scenarios.airt.rapid_response import RapidResponse + + AttackTechniqueRegistry.reset_instance() + TargetRegistry.reset_instance() + Cyber._cached_strategy_class = None + RapidResponse._cached_strategy_class = None + yield + AttackTechniqueRegistry.reset_instance() + TargetRegistry.reset_instance() + Cyber._cached_strategy_class = None + RapidResponse._cached_strategy_class = None + + +@pytest.fixture(autouse=True) +def _patch_many_shot_load(): + """Prevent ManyShotJailbreakAttack from loading the full bundled dataset.""" + with patch( + "pyrit.executor.attack.single_turn.many_shot_jailbreak.load_many_shot_jailbreaking_dataset", + return_value=_MOCK_MANY_SHOT_EXAMPLES, + ): + yield + + +@pytest.fixture(autouse=True) +def _mock_runtime_env(): + """Provide minimal env vars so OpenAIChatTarget fallback doesn't fail.""" + with patch.dict( + "os.environ", + { + "AZURE_OPENAI_GPT4O_UNSAFE_CHAT_ENDPOINT": "https://test.openai.azure.com/", + "AZURE_OPENAI_GPT4O_UNSAFE_CHAT_KEY": "test-key", + "AZURE_OPENAI_GPT4O_UNSAFE_CHAT_MODEL": "gpt-4", + "OPENAI_CHAT_ENDPOINT": "https://test.openai.azure.com/", + "OPENAI_CHAT_KEY": "test-key", + "OPENAI_CHAT_MODEL": "gpt-4", + }, + ): + yield + + +# --------------------------------------------------------------------------- +# Parametrize: one entry per scenario that uses a dynamic strategy class +# --------------------------------------------------------------------------- + + +def _get_rapid_response_strategy(): + from pyrit.scenario.scenarios.airt.rapid_response import RapidResponse + + return RapidResponse.get_strategy_class() + + +def _get_cyber_strategy(): + from pyrit.scenario.scenarios.airt.cyber import Cyber + + return Cyber.get_strategy_class() + + +SCENARIO_STRATEGY_BUILDERS = [ + pytest.param(_get_rapid_response_strategy, id="RapidResponse"), + pytest.param(_get_cyber_strategy, id="Cyber"), +] + + +# --------------------------------------------------------------------------- +# Structural invariant tests +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize("get_strategy", SCENARIO_STRATEGY_BUILDERS) +def test_strategy_is_scenario_strategy_subclass(get_strategy): + """Generated class must be a ScenarioStrategy subclass.""" + assert issubclass(get_strategy(), ScenarioStrategy) + + +@pytest.mark.parametrize("get_strategy", SCENARIO_STRATEGY_BUILDERS) +def test_has_at_least_one_technique(get_strategy): + """Every scenario must have at least one non-aggregate technique.""" + strat = get_strategy() + assert len(strat.get_all_strategies()) >= 1 + + +@pytest.mark.parametrize("get_strategy", SCENARIO_STRATEGY_BUILDERS) +def test_has_all_aggregate(get_strategy): + """Every scenario must include the ALL aggregate.""" + strat = get_strategy() + assert "all" in strat.get_aggregate_tags() + assert strat.ALL.value == "all" + + +@pytest.mark.parametrize("get_strategy", SCENARIO_STRATEGY_BUILDERS) +def test_member_count_is_techniques_plus_aggregates(get_strategy): + """Total enum members = techniques + aggregates.""" + strat = get_strategy() + techniques = strat.get_all_strategies() + aggregates = strat.get_aggregate_strategies() + assert len(list(strat)) == len(techniques) + len(aggregates) + + +@pytest.mark.parametrize("get_strategy", SCENARIO_STRATEGY_BUILDERS) +def test_values_are_unique(get_strategy): + """No two members share a value.""" + strat = get_strategy() + values = [s.value for s in strat] + assert len(values) == len(set(values)) + + +@pytest.mark.parametrize("get_strategy", SCENARIO_STRATEGY_BUILDERS) +def test_invalid_value_raises(get_strategy): + """Constructing with a bogus value raises ValueError.""" + strat = get_strategy() + with pytest.raises(ValueError): + strat("nonexistent_strategy_xyzzy") + + +@pytest.mark.parametrize("get_strategy", SCENARIO_STRATEGY_BUILDERS) +def test_all_expands_to_every_technique(get_strategy): + """ALL must expand to exactly the full set of non-aggregate techniques.""" + strat = get_strategy() + expanded = strat.expand({strat.ALL}) + assert set(expanded) == set(strat.get_all_strategies()) + + +@pytest.mark.parametrize("get_strategy", SCENARIO_STRATEGY_BUILDERS) +def test_each_aggregate_expands_to_nonempty_subset(get_strategy): + """Every aggregate tag expands to a non-empty subset of techniques.""" + strat = get_strategy() + all_techniques = set(strat.get_all_strategies()) + for aggregate in strat.get_aggregate_strategies(): + expanded = set(strat.expand({aggregate})) + assert len(expanded) >= 1, f"Aggregate {aggregate.value!r} expanded to empty set" + assert expanded <= all_techniques, f"Aggregate {aggregate.value!r} expanded outside technique set" + + +@pytest.mark.parametrize("get_strategy", SCENARIO_STRATEGY_BUILDERS) +def test_aggregates_are_disjoint_from_techniques(get_strategy): + """Aggregate members and technique members don't overlap.""" + strat = get_strategy() + agg_values = {s.value for s in strat.get_aggregate_strategies()} + tech_values = {s.value for s in strat.get_all_strategies()} + assert agg_values.isdisjoint(tech_values) + + +@pytest.mark.parametrize("get_strategy", SCENARIO_STRATEGY_BUILDERS) +def test_expanding_a_technique_returns_itself(get_strategy): + """Expanding a single non-aggregate technique returns just that technique.""" + strat = get_strategy() + for technique in strat.get_all_strategies(): + expanded = strat.expand({technique}) + assert expanded == [technique]