Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions src/vulca/_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ def parse_llm_json(text: str) -> dict:
# Fix trailing commas before } or ]
text = re.sub(r",\s*([}\]])", r"\1", text)

# Fix a rare LLM typo where a key starts with two double quotes:
# { "L5": 0.7, ""missing_required_subjects": [] }
text = re.sub(r'([,{]\s*)""([A-Za-z_][A-Za-z0-9_]*"\s*:)', r'\1"\2', text)

# Fix single quotes → double quotes (careful with apostrophes in text)
# Only replace quotes that look like JSON keys/values
text = re.sub(r"(?<=[\[{,:])\s*'([^']*?)'\s*(?=[,}\]:])", r' "\1"', text)
Expand Down
188 changes: 158 additions & 30 deletions src/vulca/_vlm.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# Token budget: start low, escalate on truncation
_DEFAULT_MAX_TOKENS = 3072
_ESCALATED_MAX_TOKENS = 8192
_CONTENT_LOCK_MAX_TOKENS = 16384
_MAX_ESCALATION_ATTEMPTS = 1

# Local (Ollama) models consistently emit >3072 tokens for the L1-L5 JSON
Expand Down Expand Up @@ -221,32 +222,64 @@ def _build_dynamic_suffix(
return "\n".join(p for p in parts if p)

def _extract_scoring(text: str) -> str:
"""Extract content inside the **last** <scoring>...</scoring> block.
"""Extract parseable scoring JSON from a two-phase VLM response.

Implements the two-phase scratchpad protocol: the model writes free-form
observations in <observation> tags (discarded), then structured JSON in
<scoring> tags (parsed). Falls back to full text for backward compatibility
with responses that do not use the tag protocol.

Uses rfind for the last ``</scoring>`` to avoid mis-matching when earlier
text (e.g. observation or JSON values) accidentally contains ``<scoring>``.
Prefer the last valid <scoring> block. If the model omits scoring tags,
strip scratchpad observation blocks and return the first balanced JSON
object. Falling back this way avoids Gemini scratchpad braces poisoning the
generic JSON parser.
"""
close_tag = "</scoring>"
close_idx = text.rfind(close_tag)
if close_idx == -1:
return text
prefix = text[:close_idx]
open_tag = "<scoring>"
# Search backwards for the <scoring> that yields content starting with '{'
search_end = len(prefix)
while True:
open_idx = prefix.rfind(open_tag, 0, search_end)
if open_idx == -1:
return text
candidate = prefix[open_idx + len(open_tag):].strip()
scoring_blocks = list(
re.finditer(
r"<scoring>\s*(.*?)\s*</scoring>",
text,
flags=re.IGNORECASE | re.DOTALL,
)
)
for match in reversed(scoring_blocks):
candidate = match.group(1).strip()
if candidate.startswith("{"):
return candidate
search_end = open_idx

without_observation = re.sub(
r"<observation>.*?</observation>",
"",
text,
flags=re.IGNORECASE | re.DOTALL,
)
json_candidate = _first_balanced_json(without_observation)
if json_candidate:
return json_candidate
return text.strip()


def _first_balanced_json(text: str) -> str:
start = text.find("{")
if start < 0:
return ""

depth = 0
in_string = False
escaped = False
for index, char in enumerate(text[start:], start=start):
if in_string:
if escaped:
escaped = False
elif char == "\\":
escaped = True
elif char == '"':
in_string = False
continue

if char == '"':
in_string = True
elif char == "{":
depth += 1
elif char == "}":
depth -= 1
if depth == 0:
return text[start : index + 1].strip()
return ""


def _build_extra_dimensions_prompt(extras: list[dict]) -> str:
Expand Down Expand Up @@ -544,6 +577,7 @@ async def score_image(
*,
mode: str = "strict",
model: str = "",
content_lock: dict | None = None,
) -> dict:
"""Call Gemini Vision to score an image on L1-L5.

Expand Down Expand Up @@ -572,9 +606,21 @@ async def score_image(
{"type": "image_url", "image_url": {"url": f"data:{mime};base64,{img_b64}"}},
]
if subject:
user_parts.append({"type": "text", "text": f"Subject/context: {subject}"})
user_text = f"Subject/context: {subject}"
else:
user_parts.append({"type": "text", "text": "Evaluate this artwork."})
user_text = "Evaluate this artwork."
resolved_content_lock = None
if content_lock:
from vulca.content_lock import (
build_content_fidelity_prompt,
content_lock_from_dict,
)

resolved_content_lock = content_lock_from_dict(content_lock)
fidelity_prompt = build_content_fidelity_prompt(resolved_content_lock)
if fidelity_prompt:
user_text = f"{user_text}\n\n{fidelity_prompt}"
user_parts.append({"type": "text", "text": user_text})

try:
messages = [
Expand All @@ -591,10 +637,19 @@ async def score_image(

# Adaptive token budget: cloud models start small (cost-conscious),
# local models start at the escalated budget since tokens are free
# and Gemma-class models regularly exceed 3072.
max_tokens = _LOCAL_DEFAULT_MAX_TOKENS if is_local else _DEFAULT_MAX_TOKENS
# and Gemma-class models regularly exceed 3072. Content-lock scoring
# asks for additional gate fields, so allow one larger final attempt
# when the model truncates twice.
token_budgets = (
[_LOCAL_DEFAULT_MAX_TOKENS]
if is_local
else [_DEFAULT_MAX_TOKENS, _ESCALATED_MAX_TOKENS]
)
if content_lock and _CONTENT_LOCK_MAX_TOKENS not in token_budgets:
token_budgets.append(_CONTENT_LOCK_MAX_TOKENS)
max_tokens = token_budgets[0]
resp = None
for attempt in range(_MAX_ESCALATION_ATTEMPTS + 1):
for attempt, max_tokens in enumerate(token_budgets):
# Local models (Ollama) need longer timeout for first load
timeout = 300 if model.startswith("ollama") else 55
call_kwargs = dict(
Expand All @@ -609,14 +664,13 @@ async def score_image(
call_kwargs["api_base"] = api_base
resp = await litellm.acompletion(**call_kwargs)
finish_reason = getattr(resp.choices[0], "finish_reason", "stop")
if finish_reason == "length" and attempt < _MAX_ESCALATION_ATTEMPTS:
if finish_reason == "length" and attempt < len(token_budgets) - 1:
logger.info(
"VLM response truncated (finish_reason=length) at %d tokens; "
"escalating to %d tokens",
max_tokens,
_ESCALATED_MAX_TOKENS,
token_budgets[attempt + 1],
)
max_tokens = _ESCALATED_MAX_TOKENS
else:
break

Expand Down Expand Up @@ -651,6 +705,29 @@ async def score_image(
logger.debug("VLM debug dump failed: %s", _dump_exc)

parsed_json = parse_llm_json(scoring_text)
content_fidelity_gate = None
if resolved_content_lock is not None:
from vulca.content_lock import (
build_blind_relation_gate,
build_content_fidelity_gate,
)

content_fidelity_gate = build_content_fidelity_gate(
resolved_content_lock,
parsed_json,
)
if resolved_content_lock.required_relations:
blind_read = await _blind_relation_read(
img_b64=img_b64,
mime=mime,
api_key=api_key,
model=model,
api_base=api_base,
content_lock=resolved_content_lock,
)
content_fidelity_gate.update(
build_blind_relation_gate(resolved_content_lock, blind_read)
)

# Use _parse_vlm_response to extract and validate all fields (including extras)
parsed = _parse_vlm_response(parsed_json, extra_keys=extra_keys)
Expand All @@ -673,6 +750,8 @@ async def score_image(
data[f"{level}_reference_technique"] = ref_techniques.get(level, "")
# Include risk_flags so _engine.py can read it from the flat dict
data["risk_flags"] = parsed["risk_flags"]
if content_fidelity_gate is not None:
data["content_fidelity_gate"] = content_fidelity_gate
# Store extra_keys and names in data so _engine.py can split core vs extra
data["_extra_keys"] = extra_keys
data["_extra_names"] = {e["key"]: e["name"] for e in extra_dims[:3]}
Expand All @@ -690,3 +769,52 @@ async def score_image(
fallback[f"{level}_observations"] = ""
fallback[f"{level}_reference_technique"] = ""
return fallback


async def _blind_relation_read(
*,
img_b64: str,
mime: str,
api_key: str,
model: str,
api_base: str | None,
content_lock,
) -> dict:
"""Run an image-only relation read without caption or intended-relation anchors."""
try:
from vulca._parse import parse_llm_json
from vulca.content_lock import build_blind_relation_read_prompt

prompt = build_blind_relation_read_prompt(content_lock)
if not prompt:
return {}

user_parts = [
{"type": "image_url", "image_url": {"url": f"data:{mime};base64,{img_b64}"}},
{"type": "text", "text": prompt},
]
call_kwargs = dict(
model=model,
messages=[
{
"role": "system",
"content": (
"You are a strict image-only visual relationship reader. "
"Use only visible evidence in the image."
),
},
{"role": "user", "content": user_parts},
],
max_tokens=2048,
temperature=0.0,
api_key=api_key,
timeout=300 if model.startswith("ollama") else 55,
)
if api_base:
call_kwargs["api_base"] = api_base
resp = await litellm.acompletion(**call_kwargs)
text = resp.choices[0].message.content.strip()
return parse_llm_json(text)
except Exception as exc:
logger.warning("Blind relation read failed: %s", exc)
return {"_error": str(exc)}
39 changes: 39 additions & 0 deletions src/vulca/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ def main(argv: list[str] | None = None) -> None:
create_p.add_argument("--ref-type", default="full", choices=["style", "composition", "full"],
help="Reference type: style, composition, or full")
create_p.add_argument("--colors", default="", help="Hex color palette (comma-separated, e.g. '#C87F4A,#5F8A50')")
create_p.add_argument(
"--content-lock",
action="store_true",
help="Treat explicit subjects and visible attributes in the intent as non-negotiable constraints",
)
create_p.add_argument(
"--output-is-artwork-itself",
action="store_true",
help="Require the output to be the artwork surface itself, not a gallery/photo/mockup display",
)
create_p.add_argument("--output", "-o", default="", help="Save generated image to this path (default: ./vulca-<session>.png)")

# traditions command
Expand Down Expand Up @@ -843,6 +853,33 @@ def _cmd_create(args: argparse.Namespace) -> None:
node_params: dict[str, dict] = {}
if weights:
node_params["evaluate"] = {"custom_weights": weights}
if getattr(args, "content_lock", False) or getattr(args, "output_is_artwork_itself", False):
from vulca.content_lock import ContentLock, extract_content_lock

artifact_boundary = (
getattr(args, "output_is_artwork_itself", False)
or getattr(args, "content_lock", False)
)
if getattr(args, "content_lock", False):
lock = extract_content_lock(
args.intent,
output_is_artwork_itself=artifact_boundary,
)
else:
lock = ContentLock(
original_intent=" ".join(args.intent.strip().split()),
output_is_artwork_itself=artifact_boundary,
)
if lock.has_requirements or lock.output_is_artwork_itself:
lock_data = lock.to_dict()
node_params["generate"] = {
**node_params.get("generate", {}),
"content_lock": lock_data,
}
node_params["evaluate"] = {
**node_params.get("evaluate", {}),
"content_lock": lock_data,
}

pipeline_input = PipelineInput(
subject=args.subject or args.intent,
Expand Down Expand Up @@ -892,6 +929,8 @@ def _cmd_create(args: argparse.Namespace) -> None:
reference=getattr(args, "reference", "") or "",
ref_type=getattr(args, "ref_type", "full") or "full",
colors=getattr(args, "colors", "") or "",
content_lock=getattr(args, "content_lock", False),
output_is_artwork_itself=getattr(args, "output_is_artwork_itself", False),
)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
Expand Down
Loading