From a14e96b338c8d36c3f23b35f0a619777a61c2fde Mon Sep 17 00:00:00 2001 From: Ledraw <2508346509@qq.com> Date: Tue, 12 May 2026 16:45:37 +0800 Subject: [PATCH] Fix web testing flag completion and safe log rendering --- .../capabilities/submit_flag.py | 11 +++++ .../usecases/web/with_explanation.py | 10 ++-- src/hackingBuddyGPT/utils/logging.py | 38 +++++++++------ tests/test_logging.py | 42 +++++++++++++++++ tests/test_web_testing.py | 46 +++++++++++++++++++ 5 files changed, 131 insertions(+), 16 deletions(-) create mode 100644 tests/test_logging.py create mode 100644 tests/test_web_testing.py diff --git a/src/hackingBuddyGPT/capabilities/submit_flag.py b/src/hackingBuddyGPT/capabilities/submit_flag.py index 35da7e6c..2dcfadd8 100644 --- a/src/hackingBuddyGPT/capabilities/submit_flag.py +++ b/src/hackingBuddyGPT/capabilities/submit_flag.py @@ -9,6 +9,7 @@ class SubmitFlag(Capability): flag_format: str valid_flags: Set[str] success_function: Callable[[], None] = None + accept_any_flag: bool = False submitted_valid_flags: Set[str] = field(default_factory=set, init=False) @@ -16,6 +17,16 @@ def describe(self) -> str: return f"Submits a flag, which is the goal of all activities. The flag format is {self.flag_format}. If you find a flag following the that format, that you have not yet submitted, then the most important first step is to submit it, ignoring all other possibilities of further action" def __call__(self, flag: str) -> str: + if self.accept_any_flag: + if flag in self.submitted_valid_flags: + return "Flag already submitted" + + self.submitted_valid_flags.add(flag) + if self.success_function is not None: + self.success_function() + + return f"Flag submitted ({len(self.submitted_valid_flags)})" + if flag not in self.valid_flags: return "Not a valid flag" diff --git a/src/hackingBuddyGPT/usecases/web/with_explanation.py b/src/hackingBuddyGPT/usecases/web/with_explanation.py index 5ed7f44a..c1bfcf91 100644 --- a/src/hackingBuddyGPT/usecases/web/with_explanation.py +++ b/src/hackingBuddyGPT/usecases/web/with_explanation.py @@ -1,7 +1,7 @@ from dataclasses import field -from typing import List, Any, Union, Dict, Iterable, Optional +from typing import Any, Dict, Iterable, List, Optional, Union -from openai.types.chat import ChatCompletionMessageParam, ChatCompletionMessage +from openai.types.chat import ChatCompletionMessage, ChatCompletionMessageParam from openai.types.chat.chat_completion_chunk import ChoiceDelta from hackingBuddyGPT.capabilities import Capability @@ -32,6 +32,10 @@ class WebTestingWithExplanation(Agent): desc="A comma (,) separated list of flags to find", default="hostname,dir,username,rootfile,secretfile,adminpass", ) + accept_any_flag: bool = parameter( + desc="Accept any submitted flag and stop the run. Disable this to require one of the configured flags.", + default=True, + ) _prompt_history: Prompt = field(default_factory=list) _context: Context = field(default_factory=lambda: {"notes": list()}) @@ -41,7 +45,7 @@ class WebTestingWithExplanation(Agent): def init(self): super().init() self._context["host"] = self.host - self.add_capability(SubmitFlag(self.flag_format_description, set(self.flag_template.format(flag=flag) for flag in self.flags.split(",")), success_function=self.all_flags_found)) + self.add_capability(SubmitFlag(self.flag_format_description, set(self.flag_template.format(flag=flag) for flag in self.flags.split(",")), success_function=self.all_flags_found, accept_any_flag=self.accept_any_flag)) self.add_capability(HTTPRequest(self.host)) def before_run(self): diff --git a/src/hackingBuddyGPT/utils/logging.py b/src/hackingBuddyGPT/utils/logging.py index 5acee710..98f07014 100644 --- a/src/hackingBuddyGPT/utils/logging.py +++ b/src/hackingBuddyGPT/utils/logging.py @@ -1,21 +1,33 @@ import datetime -from enum import Enum +import threading import time from dataclasses import dataclass, field +from enum import Enum from functools import wraps from typing import Optional, Union -import threading from dataclasses_json.api import dataclass_json +from rich.console import Group +from rich.panel import Panel +from rich.text import Text +from websockets.sync.client import ClientConnection +from websockets.sync.client import connect as ws_connect from hackingBuddyGPT.utils import Console, DbStorage, LLMResult, configurable, parameter -from hackingBuddyGPT.utils.db_storage.db_storage import StreamAction from hackingBuddyGPT.utils.configurable import Global, Transparent -from rich.console import Group -from rich.panel import Panel -from websockets.sync.client import ClientConnection, connect as ws_connect +from hackingBuddyGPT.utils.db_storage.db_storage import ( + Message, + MessageStreamPart, + Run, + Section, + StreamAction, + ToolCall, + ToolCallStreamPart, +) + -from hackingBuddyGPT.utils.db_storage.db_storage import Run, Section, Message, MessageStreamPart, ToolCall, ToolCallStreamPart +def plain_text(value) -> Text: + return Text("" if value is None else str(value)) def log_section(name: str, logger_field_name: str = "log"): @@ -120,7 +132,7 @@ def add_message(self, role: str, content: str, tokens_query: int, tokens_respons self._last_message_id += 1 self.log_db.add_message(self.run.id, message_id, self._current_conversation, role, content, tokens_query, tokens_response, duration) - self.console.print(Panel(content, title=(("" if self._current_conversation is None else f"{self._current_conversation} - ") + role))) + self.console.print(Panel(plain_text(content), title=(("" if self._current_conversation is None else f"{self._current_conversation} - ") + role))) return message_id @@ -130,8 +142,8 @@ def _add_or_update_message(self, message_id: int, conversation: Optional[str], r def add_tool_call(self, message_id: int, tool_call_id: str, function_name: str, arguments: str, result_text: str, duration: datetime.timedelta): self.console.print(Panel( Group( - Panel(arguments, title="arguments"), - Panel(result_text, title="result"), + Panel(plain_text(arguments), title="arguments"), + Panel(plain_text(result_text), title="result"), ), title=f"Tool Call: {function_name}")) self.log_db.add_tool_call(self.run.id, message_id, tool_call_id, function_name, arguments, result_text, duration) @@ -231,7 +243,7 @@ def add_message(self, role: str, content: str, tokens_query: int, tokens_respons msg = Message(self.run.id, message_id, version=1, conversation=self._current_conversation, role=role, content=content, duration=duration, tokens_query=tokens_query, tokens_response=tokens_response) self.send(MessageType.MESSAGE, msg) - self.console.print(Panel(content, title=(("" if self._current_conversation is None else f"{self._current_conversation} - ") + role))) + self.console.print(Panel(plain_text(content), title=(("" if self._current_conversation is None else f"{self._current_conversation} - ") + role))) return message_id @@ -242,8 +254,8 @@ def _add_or_update_message(self, message_id: int, conversation: Optional[str], r def add_tool_call(self, message_id: int, tool_call_id: str, function_name: str, arguments: str, result_text: str, duration: datetime.timedelta): self.console.print(Panel( Group( - Panel(arguments, title="arguments"), - Panel(result_text, title="result"), + Panel(plain_text(arguments), title="arguments"), + Panel(plain_text(result_text), title="result"), ), title=f"Tool Call: {function_name}")) tc = ToolCall(self.run.id, message_id, tool_call_id, 0, function_name, arguments, "success", result_text, duration) diff --git a/tests/test_logging.py b/tests/test_logging.py new file mode 100644 index 00000000..7ecb4313 --- /dev/null +++ b/tests/test_logging.py @@ -0,0 +1,42 @@ +import datetime +from unittest.mock import Mock + +from rich.console import Console + +from hackingBuddyGPT.utils.logging import LocalLogger + + +def test_local_logger_prints_message_content_as_plain_text(): + db = Mock() + db.create_run.return_value = 1 + logger = LocalLogger(log_db=db, console=Console(record=True)) + logger.start_run("test", "{}") + + content = "binary-looking response with invalid rich markup [/not-open]" + + logger.add_message("assistant", content, 0, 0, datetime.timedelta(0)) + + db.add_message.assert_called_once_with(1, 0, None, "assistant", content, 0, 0, datetime.timedelta(0)) + + +def test_local_logger_prints_tool_result_as_plain_text(): + db = Mock() + db.create_run.return_value = 1 + logger = LocalLogger(log_db=db, console=Console(record=True)) + logger.start_run("test", "{}") + + arguments = '{"path": "/favicon.ico"}' + result_text = "HTTP/1.1 200 OK\r\n\r\n\x00binary-looking response [/not-open]" + duration = datetime.timedelta(milliseconds=1) + + logger.add_tool_call(0, "tool-call-0", "http_request", arguments, result_text, duration) + + db.add_tool_call.assert_called_once_with( + 1, + 0, + "tool-call-0", + "http_request", + arguments, + result_text, + duration, + ) diff --git a/tests/test_web_testing.py b/tests/test_web_testing.py new file mode 100644 index 00000000..00498ce2 --- /dev/null +++ b/tests/test_web_testing.py @@ -0,0 +1,46 @@ +from unittest.mock import Mock + +from hackingBuddyGPT.capabilities.submit_flag import SubmitFlag +from hackingBuddyGPT.usecases.web.with_explanation import WebTestingWithExplanation + + +def test_submit_flag_can_accept_any_flag_and_call_success_callback(): + success_callback = Mock() + submit_flag = SubmitFlag( + flag_format="any CTF flag", + valid_flags=set(), + success_function=success_callback, + accept_any_flag=True, + ) + + result = submit_flag("CTF{unknown_flag}") + + assert result == "Flag submitted (1)" + success_callback.assert_called_once_with() + + +def test_web_testing_stops_after_unknown_flag_submission_by_default(): + agent = WebTestingWithExplanation(llm=Mock(), log=Mock(), flags="known") + agent.init() + + result = agent._capabilities["SubmitFlag"]("CTF{unknown_flag}") + + assert result == "Flag submitted (1)" + assert agent._all_flags_found is True + agent.log.status_message.assert_called_once_with("All flags found! Congratulations!") + + +def test_submit_flag_can_still_require_configured_flags(): + success_callback = Mock() + submit_flag = SubmitFlag( + flag_format="known flags only", + valid_flags={"FLAG.known.GALF"}, + success_function=success_callback, + accept_any_flag=False, + ) + + assert submit_flag("CTF{unknown_flag}") == "Not a valid flag" + success_callback.assert_not_called() + + assert submit_flag("FLAG.known.GALF") == "Flag submitted (1/1)" + success_callback.assert_called_once_with()