Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion smr_alignment/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ readme = "README.md"
requires-python = ">=3.11"
authors = [{ name = "Sebastian Heppner", email = "mail@s-heppner.com" }]
dependencies = [
"semantic_match_registry>=0.0.1",
"semantic_match_registry>=1.0.0",
"networkx>=3.4.2",
"tqdm>=4.46.1",
"numpy>=2.3.4",
"cvxpy>=1.7.3",
]

[project.optional-dependencies]
Expand Down
231 changes: 231 additions & 0 deletions smr_alignment/src/smr_alignment/metric_alignment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
"""
Optimize a given graph with multiple metrics to minimize triangle violations.

How to use:

# 1) Your graph already has per-edge metric_scores
mao = MetricAlignmentOptimizer(G)

# 2) Learn weights (sub-sample triplets if the graph is big)
w = mao.optimize_weights(eps=1e-8, sample_triplets=50_000, entropy_lambda=1e-3)

# 3) Apply unified scores (and make them the working weight)
mao.apply_unified_scores(weights=w, replace_weight=True)

"""
import math
from typing import Dict, List, Optional, Sequence, Tuple
from itertools import combinations

import numpy as np
import cvxpy as cp
from tqdm import tqdm
import networkx as nx


class MetricAlignmentOptimizer:
"""
Learn metric weights w (w>=0, sum w=1) that minimize the sum of
triangle-violation margins in log-space, then apply unified scores.

Assumes each edge (u,v) has:
- d["metric_scores"] : Dict[metric_id, score in [0,1]]
- d["weight"] : working score (will optionally be overwritten)
"""

def __init__(self, G: nx.DiGraph):
self.G = G
self.metrics_: Optional[List[str]] = None
self.weights_: Optional[Dict[str, float]] = None

# ---------- utilities ----------

@staticmethod
def _clamp_score(x: float, epsilon: float) -> float:
"""
Clamp a given (score) value between (epsilon, 1-epsilon)
"""
return min(max(float(x), epsilon), 1.0 - epsilon)

def _collect_pair_logs(
self, eps: float = 1e-8, attr: str = "log_scores_by_metric"
) -> Dict[Tuple[str, str], Dict[str, float]]:
"""
Build (u,v) -> {metric: -log(clamped score)} and cache on edge as attr.
"""
pair_logs: Dict[Tuple[str, str], Dict[str, float]] = {}
for u, v, d in tqdm(self.G.edges(data=True), desc="Collecting log scores"):
ms = d.get("metric_scores", {})
logs = {m: -math.log(self._clamp_score(s, eps)) for m, s in ms.items()}
d[attr] = logs
pair_logs[(u, v)] = logs
# global metric set
self.metrics_ = sorted({m for logs in pair_logs.values() for m in logs})
return pair_logs

def _alpha_vectors(
self,
pair_logs: Dict[Tuple[str, str], Dict[str, float]],
metrics: Sequence[str],
sample_triplets: Optional[int] = None,
both_orientations: bool = True,
) -> np.ndarray:
"""
Build alpha vectors for triplets, representing total violation of a metric. For a directed triple (A,B,C):
alpha_m = c_m(A,C) - c_m(A,B) - c_m(B,C)
If a metric is missing on any of those pairs, contribute 0 for that metric.
Only constructs alphas for triplets where all three *pairs* exist in the graph.
"""
nodes = list(self.G.nodes())
if len(nodes) < 3:
return np.zeros((0, len(metrics)))

# Only consider triplets where the three directed pairs exist
def pairs_exist(A, B, C):
return ((A, B) in pair_logs) and ((B, C) in pair_logs) and ((A, C) in pair_logs)

triplets = [(A, B, C) for A, B, C in combinations(nodes, 3)]
if sample_triplets and len(triplets) > sample_triplets:
stride = max(1, len(triplets) // sample_triplets)
triplets = triplets[::stride]

alphas: List[List[float]] = []
for A, B, C in tqdm(triplets, desc="Building alpha vectors"):
# consider both A->B->C and C->B->A (directed graph)
orientations = [(A, B, C), (C, B, A)] if both_orientations else [(A, B, C)]
for X, Y, Z in orientations:
if not pairs_exist(X, Y, Z):
continue
c_xy = pair_logs[(X, Y)]
c_yz = pair_logs[(Y, Z)]
c_xz = pair_logs[(X, Z)]
alpha = []
for m in metrics:
cxz = c_xz.get(m)
cxy = c_xy.get(m)
cyz = c_yz.get(m)
if cxz is None or cxy is None or cyz is None:
alpha.append(0.0) # ignore missing metric for this triple
else:
alpha.append(cxz - (cxy + cyz))
# skip all-zero vector? not necessary; harmless to keep
alphas.append(alpha)
if not alphas:
return np.zeros((0, len(metrics)))
return np.asarray(alphas, dtype=float)

# ---------- optimization ----------

def optimize_weights(
self,
eps: float = 1e-8,
sample_triplets: Optional[int] = None,
entropy_lambda: float = 0.0,
solver: str = "ECOS",
verbose: bool = False,
) -> Dict[str, float]:
"""
Solve: minimize sum_i pos(alpha_i · w) + entropy_lambda * sum w_i log w_i
s.t. w >= 0, sum w = 1
"""
pair_logs = self._collect_pair_logs(eps)
metrics = self.metrics_ or []
if not metrics:
# No metric data at all; fall back to uniform singleton
self.weights_ = {}
return {}

A = self._alpha_vectors(pair_logs, metrics, sample_triplets=sample_triplets)
if A.shape[0] == 0:
# no usable triplets; uniform weights
w = np.ones(len(metrics)) / len(metrics)
self.weights_ = {m: float(w[i]) for i, m in enumerate(metrics)}
return self.weights_

w = cp.Variable(len(metrics), nonneg=True) # type: ignore
margins = A @ w
loss = cp.sum(cp.pos(margins)) # piecewise-linear convex

if entropy_lambda > 0:
# cp.entr(x) = x*log(x); we add small offset to keep it defined near zero
loss += entropy_lambda * cp.sum(cp.entr(w + 1e-16))

constraints = [cp.sum(w) == 1.0] # cp.sum(w) == 1.0 in CVXPY doesn’t return a plain bool at runtime,
# it returns a Constraint object, confusing type checkers. Therefore, the "type: ignore" below.
prob = cp.Problem(cp.Minimize(loss), constraints) # type: ignore

try:
prob.solve(solver=solver, verbose=verbose)
except Exception:
prob.solve(solver="SCS", verbose=verbose)

if w.value is None: # type: ignore
raise RuntimeError("Weight optimization failed; check solver output/logs.")

w_arr = np.clip(w.value, 0.0, None) # type: ignore
s = float(w_arr.sum())
if s <= 0:
w_arr = np.ones_like(w_arr) / len(w_arr)
else:
w_arr /= s

self.weights_ = {m: float(w_arr[i]) for i, m in enumerate(metrics)}
return self.weights_

# ---------- apply unified score ----------

def apply_unified_scores(
self,
weights: Optional[Dict[str, float]] = None,
eps: float = 1e-8,
out_cost_attr: str = "unified_log_cost",
out_sim_attr: str = "unified_similarity",
replace_weight: bool = True,
) -> None:
"""
For each edge (u,v), compute:
c_hat = sum_m w_m * (-log clamp(s_m))
s_hat = exp(-c_hat) = prod_m clamp(s_m)^{w_m}

Also store:
- metric_scores_weighted: {m: clamp(s_m)^{w_m}} (skip w_m == 0)
- metric_weights: {m: w_m} (only metrics present on this edge)

If replace_weight is True, set edge 'weight' to max(metric_scores_weighted.values())
(i.e., the strongest weighted metric for that edge).
"""
if weights is None:
if self.weights_ is None:
raise ValueError("No weights supplied and optimize_weights() has not been run.")
weights = self.weights_

for _, _, d in tqdm(self.G.edges(data=True), desc="Applying unified scores"):
ms = d.get("metric_scores", {})
if not ms:
continue

# Per-edge subset of weights (only for metrics present here)
edge_w = {m: float(weights.get(m, 0.0)) for m in ms.keys() if weights.get(m, 0.0) > 0.0}
d["metric_weights"] = edge_w # report what we used on this edge

# Unified cost/similarity (log-space blend = weighted geometric mean in real space)
c_hat = 0.0
for m, w_m in edge_w.items():
s_m = self._clamp_score(ms[m], eps) # or self._clamp01(...)
c_hat += w_m * (-math.log(s_m))
s_hat = math.exp(-c_hat)

d[out_cost_attr] = c_hat
d[out_sim_attr] = s_hat

# Per-metric weighted scores in real space: s_m^{w_m}
metric_scores_weighted = {}
for m, w_m in edge_w.items():
s_m = self._clamp_score(ms[m], eps)
# Avoid s_m ** 0 = 1.0 inflation by skipping zero-weight metrics (already filtered)
metric_scores_weighted[m] = s_m ** w_m
d["metric_scores_weighted"] = metric_scores_weighted

# Working weight: max of weighted metric scores (your requested policy)
if replace_weight and metric_scores_weighted:
d["weight"] = max(metric_scores_weighted.values())
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
For more details, I'd like to refer to my dissertation.
"""
import math
from tqdm import tqdm

from tqdm import tqdm
import networkx as nx

from smr.algorithm import SemanticMatchGraph
Expand Down
120 changes: 120 additions & 0 deletions smr_alignment/tests/test_metric_alignment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import math
import unittest
import networkx as nx

from smr_alignment.metric_alignment import MetricAlignmentOptimizer


TOLERANCE = 1e-8


class TestMetricAlignmentOptimizer(unittest.TestCase):
def setUp(self):
"""
Build a tiny directed triangle A->B->C and A->C with two metrics:
- m_good respects the similarity triangle inequality exactly:
s_AC = s_AB * s_BC = 0.9 * 0.9 = 0.81
- m_bad violates it strongly:
s_AC << s_AB * s_BC
Optimizer should push weight to m_good ≈ 1.
"""
self.G = nx.DiGraph()
# A -> B
self.G.add_edge("A", "B", metric_scores={"m_good": 0.9, "m_bad": 0.9})
self.G["A"]["B"]["weight"] = max(self.G["A"]["B"]["metric_scores"].values())

# B -> C
self.G.add_edge("B", "C", metric_scores={"m_good": 0.9, "m_bad": 0.9})
self.G["B"]["C"]["weight"] = max(self.G["B"]["C"]["metric_scores"].values())

# A -> C (direct)
self.G.add_edge("A", "C", metric_scores={"m_good": 0.81, "m_bad": 0.2})
self.G["A"]["C"]["weight"] = max(self.G["A"]["C"]["metric_scores"].values())

self.mao = MetricAlignmentOptimizer(self.G)

def test_optimize_weights_prefers_triangle_consistent_metric(self):
w = self.mao.optimize_weights(eps=1e-8, entropy_lambda=0.0)
# keys present
self.assertIn("m_good", w)
self.assertIn("m_bad", w)
# nonnegative and sum to 1
self.assertGreaterEqual(w["m_good"], -TOLERANCE)
self.assertGreaterEqual(w["m_bad"], -TOLERANCE)
self.assertAlmostEqual(w["m_good"] + w["m_bad"], 1.0, places=7)

# should strongly prefer m_good
self.assertGreater(w["m_good"], 0.95)
self.assertLess(w["m_bad"], 0.05)

def test_collect_pair_logs_sets_attribute(self):
pair_logs = self.mao._collect_pair_logs(eps=1e-8, attr="log_scores_by_metric")
# attribute set on all edges and finite
for _, _, d in self.G.edges(data=True):
self.assertIn("log_scores_by_metric", d)
for val in d["log_scores_by_metric"].values():
self.assertTrue(math.isfinite(val))

# pair_logs keys match edges
self.assertEqual(set(pair_logs.keys()), {("A", "B"), ("B", "C"), ("A", "C")})

def test_alpha_vectors_nonempty_and_correct_width(self):
pair_logs = self.mao._collect_pair_logs(eps=1e-8)
metrics = self.mao.metrics_
self.assertIsNotNone(metrics)
A = self.mao._alpha_vectors(pair_logs, metrics)
# shape: (num_triplets*orientations, num_metrics)
self.assertEqual(A.shape[1], len(metrics))
self.assertGreater(A.shape[0], 0)

def test_apply_unified_scores_sets_expected_fields_and_weight_policy(self):
w = {"m_good": 0.97, "m_bad": 0.03} # simulate learned weights
self.mao.apply_unified_scores(weights=w, replace_weight=True)

# Check one edge thoroughly
d = self.G["A"]["C"]
self.assertIn("unified_log_cost", d)
self.assertIn("unified_similarity", d)
self.assertIn("metric_weights", d)
self.assertIn("metric_scores_weighted", d)

# per-edge subset of weights only includes present metrics
self.assertEqual(set(d["metric_weights"].keys()), {"m_good", "m_bad"})
self.assertAlmostEqual(d["metric_weights"]["m_good"], 0.97, places=7)
self.assertAlmostEqual(d["metric_weights"]["m_bad"], 0.03, places=7)

# expected unified score/product: prod s_m^{w_m}
s_good = 0.81
s_bad = 0.2
s_hat_expected = (s_good ** 0.97) * (s_bad ** 0.03)
self.assertAlmostEqual(d["unified_similarity"], s_hat_expected, places=10)

# expected log cost
c_expected = 0.97 * (-math.log(s_good)) + 0.03 * (-math.log(s_bad))
self.assertAlmostEqual(d["unified_log_cost"], c_expected, places=10)

# metric_scores_weighted holds s_m^{w_m}
self.assertIn("m_good", d["metric_scores_weighted"])
self.assertIn("m_bad", d["metric_scores_weighted"])
self.assertAlmostEqual(d["metric_scores_weighted"]["m_good"], s_good ** 0.97, places=10)
self.assertAlmostEqual(d["metric_scores_weighted"]["m_bad"], s_bad ** 0.03, places=10)

# working weight policy: max of weighted metric scores
expected_operational = max(s_good ** 0.97, s_bad ** 0.03)
self.assertAlmostEqual(d["weight"], expected_operational, places=10)

def test_no_metrics_returns_empty_weights_and_no_crash_on_apply(self):
G2 = nx.DiGraph()
G2.add_edge("X", "Y", weight=0.5) # no metric_scores
mao2 = MetricAlignmentOptimizer(G2)
w = mao2.optimize_weights(eps=1e-8)
self.assertEqual(w, {}) # nothing to learn

# apply should no-op gracefully
mao2.apply_unified_scores(weights=w, replace_weight=True)
# weight unchanged
self.assertAlmostEqual(G2["X"]["Y"]["weight"], 0.5, places=12)


if __name__ == "__main__":
unittest.main()