Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"python-envs.defaultEnvManager": "ms-python.python:conda",
"python-envs.defaultPackageManager": "ms-python.python:conda",
"python-envs.pythonProjects": []
}
2 changes: 1 addition & 1 deletion geometry/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def area_of_rectangle_with_semicircle_combined(length_of_shared_side: float | st
For instance, If the length of the side opposite to the shared side is provided, we have already known the length of equilateral triangle's side length. Then we call `equation = primeter_of_rectangle_with_equilateral_triangle(length_of_shared_side=known_length, length_of_other_side='x', result=known_primeter)`.
Otherwise, if we don't know the length of the side opposite to the shared side, but we know the length of the other side, then we call `equation = primeter_of_rectangle_with_equilateral_triangle(length_of_shared_side='x', length_of_other_side=known_length, result=known_primeter)`.

# USER REQUEST #: Given the geometry diagram <img src='dataset/test_geomverse/test_geomverse_TEST_D2_B100_data_1/1.png'> and the matplotlib code of the geometry:
# USER REQUEST #: Given the geometry diagram <img src='dataset/Dataset_GeomVerse/test_geomverse_TEST_D2_B100_data_1/1.png'> and the matplotlib code of the geometry:
```python
import matplotlib.pyplot as plt
import matplotlib.patches as patches
Expand Down
9 changes: 8 additions & 1 deletion geometry/solver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
import os
import sys
import argparse, shutil
from pathlib import Path

from agent import GeoProUserAgent
from prompt import GeoPromptVisuoThink
Expand All @@ -12,8 +14,13 @@
from tqdm import tqdm
from copy import deepcopy

_PROJECT_ROOT = Path(__file__).resolve().parents[1]
_CONFIG_DIR = _PROJECT_ROOT / "visual-navigation"
if str(_CONFIG_DIR) not in sys.path:
sys.path.insert(0, str(_CONFIG_DIR))

# the max reasoning steps / tree search depth
from config import MAX_REPLY
from config import MAX_REPLY # noqa: E402


def aux_step(task_type: str) -> bool:
Expand Down
283 changes: 238 additions & 45 deletions geometry/utils_llm.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,248 @@
import json
import os
import re
import sys
from datetime import datetime
from autogen.agentchat.contrib.img_utils import (
gpt4v_formatter,
)
from autogen.oai.client import OpenAIWrapper
from config import llm_config
from utils_misc import print_error
from pathlib import Path
from time import sleep
from copy import deepcopy
from utils_misc import print_error
from typing import Dict, List, Tuple

import torch
from PIL import Image

try:
from .utils_misc import print_error # type: ignore
except ImportError: # when executed as a script (no package context)
from utils_misc import print_error # type: ignore

_PROJECT_ROOT = Path(__file__).resolve().parents[1]
_GEOMETRY_DIR = Path(__file__).resolve().parent
_CONFIG_DIR = _PROJECT_ROOT / "visual-navigation"
for _path in (str(_GEOMETRY_DIR), str(_CONFIG_DIR)):
if _path not in sys.path:
sys.path.insert(0, _path)

from config import ( # noqa: E402
HF_DEVICE,
HF_MAX_NEW_TOKENS,
HF_MODEL_ID,
HF_REPETITION_PENALTY,
HF_SYSTEM_PROMPT,
HF_TEMPERATURE,
HF_TOP_P,
HF_TRUST_REMOTE_CODE,
)

try:
from transformers import AutoProcessor, Qwen2VLForConditionalGeneration
except Exception as exc: # pragma: no cover - handled at runtime
AutoProcessor = None # type: ignore
Qwen2VLForConditionalGeneration = None # type: ignore
_IMPORT_ERROR = exc
else:
_IMPORT_ERROR = None


IMG_TAG_PATTERN = re.compile(r"<img\s+src=['\"]([^'\"]+)['\"][^>]*>", re.IGNORECASE)

_processor = None
_model = None


def _select_device() -> str:
pref = (HF_DEVICE or "").strip()
if not pref or pref.lower() == "auto":
return "cuda" if torch.cuda.is_available() else "cpu"
return pref


def _resolve_image_path(path_str: str) -> Path:
path = Path(path_str)
if path.is_absolute() and path.exists():
return path

candidates = [
Path.cwd() / path,
_PROJECT_ROOT / path,
Path(_PROJECT_ROOT / "geometry") / path,
]
# Handle dataset paths that omit the "geometry" prefix.
if path_str.startswith("dataset/"):
relative = path
try:
relative = path.relative_to("dataset")
except ValueError:
relative = path
candidates.append(_PROJECT_ROOT / "dataset" / "geometry" / relative)

for candidate in candidates:
if candidate.exists():
return candidate
return path


def _load_image(path_str: str) -> Image.Image | None:
resolved = _resolve_image_path(path_str)
if not resolved.exists():
print_error(f"[HF-Qwen] image not found: {path_str}")
return None
try:
with Image.open(resolved) as img:
return img.convert("RGB")
except Exception as exc: # pragma: no cover - I/O errors at runtime
print_error(f"[HF-Qwen] failed to open image {resolved}: {exc}")
return None


def _ensure_model():
global _processor, _model, _device, _dtype

if _processor is not None and _model is not None:
return

if _IMPORT_ERROR is not None:
raise RuntimeError(
"Failed to import transformers/Qwen2VL. "
"Install transformers>=4.41 and accelerate together with torch."
) from _IMPORT_ERROR

os.environ.setdefault("HF_HUB_ENABLE_HF_TRANSFER", "0")
os.environ.setdefault("HF_HUB_DISABLE_XET", "1")
os.environ.setdefault("HF_HUB_DISABLE_TELEMETRY", "1")

model_device = _select_device()
model_device_lower = model_device.lower()
use_cuda = model_device_lower.startswith("cuda") and torch.cuda.is_available()
torch_dtype = torch.float16 if use_cuda else torch.float32

TOKEN_G = 0
TOKEN_USED = 0
# print error message with red color and bold。
_processor = AutoProcessor.from_pretrained(
HF_MODEL_ID,
trust_remote_code=HF_TRUST_REMOTE_CODE,
)
load_kwargs: Dict[str, object] = {
"dtype": torch_dtype,
"trust_remote_code": HF_TRUST_REMOTE_CODE,
}
if use_cuda:
load_kwargs["device_map"] = "auto"
else:
load_kwargs["device_map"] = None

def chat_vlm(prompt: str, history_messages = None, temperature: float = 0., retry_times: int = 10):
global TOKEN_USED, TOKEN_G
call_config = deepcopy(llm_config)
for s_config in call_config['config_list']:
s_config['temperature'] = temperature
_model = Qwen2VLForConditionalGeneration.from_pretrained(
HF_MODEL_ID,
**load_kwargs,
)
if not use_cuda:
_model.to("cpu")


def _content_to_messages(messages: List[Dict[str, str]]) -> Tuple[List[Dict[str, object]], List[Image.Image]]:
multimodal_messages: List[Dict[str, object]] = []
collected_images: List[Image.Image] = []

for msg in messages:
raw = msg["content"]
items: List[Dict[str, object]] = []
last_idx = 0

for match in IMG_TAG_PATTERN.finditer(raw):
start, end = match.span()
text_chunk = raw[last_idx:start]
if text_chunk:
items.append({"type": "text", "text": text_chunk})

img_src = match.group(1)
image = _load_image(img_src)
if image is not None:
items.append({"type": "image"})
collected_images.append(image)
else:
# Preserve the tag text when image loading fails.
items.append({"type": "text", "text": match.group(0)})
last_idx = end

tail_text = raw[last_idx:]
if tail_text or not items:
items.append({"type": "text", "text": tail_text})

# Reduce to string if there is only one plain-text entry.
if len(items) == 1 and items[0].get("type") == "text":
content = items[0]["text"]
else:
content = items

multimodal_messages.append({"role": msg["role"], "content": content})

return multimodal_messages, collected_images


def _generate_with_qwen(clean_messages: List[Dict[str, str]], temperature: float) -> str:
_ensure_model()
assert _processor is not None and _model is not None # for type checkers

working_messages = clean_messages
if HF_SYSTEM_PROMPT and (not working_messages or working_messages[0].get("role") != "system"):
working_messages = [{"role": "system", "content": HF_SYSTEM_PROMPT}] + working_messages

formatted_messages, images = _content_to_messages(working_messages)
chat_text = _processor.apply_chat_template(
formatted_messages,
tokenize=False,
add_generation_prompt=True,
)

processor_kwargs: Dict[str, object] = {"text": [chat_text]}
if images:
processor_kwargs["images"] = [images]

inputs = _processor(
**processor_kwargs,
return_tensors="pt",
)

torch_device = torch.device(_select_device() if torch.cuda.is_available() else "cpu")
inputs = {k: v.to(torch_device) if isinstance(v, torch.Tensor) else v for k, v in inputs.items()}

gen_kwargs: Dict[str, object] = {
"max_new_tokens": HF_MAX_NEW_TOKENS,
"do_sample": temperature > 0,
}
if temperature > 0:
gen_kwargs["temperature"] = temperature
gen_kwargs["top_p"] = HF_TOP_P
else:
gen_kwargs["do_sample"] = False

if HF_REPETITION_PENALTY != 1.0:
gen_kwargs["repetition_penalty"] = HF_REPETITION_PENALTY

with torch.no_grad():
generated = _model.generate(**inputs, **gen_kwargs)

input_length = inputs["input_ids"].shape[-1]
response_ids = generated[:, input_length:]
decoded = _processor.batch_decode(response_ids, skip_special_tokens=True)[0]
return decoded.strip()


def chat_vlm(prompt: str, history_messages=None, temperature: float = 0.0, retry_times: int = 3):
if history_messages is None:
history_messages = []

clean_messages = history_messages + [{"role": "user", "content": prompt}]

interval = 1
for i in range(retry_times):
for attempt in range(retry_times):
try:
if history_messages is None:
history_messages = []
clean_messages = history_messages + [{"role": "user", "content": prompt}]
dirty_messages = [{'role': mdict['role'], 'content': gpt4v_formatter(mdict['content'])} for mdict in clean_messages]

client = OpenAIWrapper(**call_config)
response = client.create(
messages=dirty_messages,
timeout=600,
)
messages = clean_messages + [{"role": "assistant", "content": response.choices[0].message.content}]
print(response.usage)
# TOKEN_USED += response.usage.total_tokens
# TOKEN_G += response.usage.completion_tokens
# print_error(f'[Token Gen] {id(TOKEN_G)} {TOKEN_G - response.usage.completion_tokens} -> {TOKEN_G}')
# print_error(f'[Token Used] {id(TOKEN_USED)} {TOKEN_USED - response.usage.total_tokens} -> {TOKEN_USED}')
return response.choices[0].message.content, messages
except Exception as e:
if 'limit' in str(e):
sleep(interval)
interval = min(interval * 2, 60)
print_error(e)
if i >= (retry_times - 1):
raise e
response_content = _generate_with_qwen(clean_messages, temperature)
messages = clean_messages + [{"role": "assistant", "content": response_content}]
return response_content, messages
except Exception as exc: # pragma: no cover - runtime robustness
print_error(f"[HF-Qwen] generation failed (attempt {attempt + 1}/{retry_times}): {exc}")
if attempt >= retry_times - 1:
raise
sleep(interval)
interval = min(interval * 2, 60)


if __name__ == "__main__":
# print(llm_config)
print(chat_vlm('Hello.', temperature=0.8)[0])
demo_prompt = "Hello! Introduce yourself briefly."
print(chat_vlm(demo_prompt, temperature=0.2)[0])
1 change: 1 addition & 0 deletions notebookfff95ced13 (1).ipynb

Large diffs are not rendered by default.

Loading