Skip to content

make expected outputs column optional #13821

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: llm-experiments-rebase
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 29 additions & 20 deletions ddtrace/llmobs/experimentation/_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,10 @@ def _validate_data(self, data: List[Dict[str, Union[str, Dict[str, Any]]]]) -> N
f"got {sorted(new_keys)}"
)

required_keys = {'input', 'expected_output'}
required_keys = {'input'}
if not required_keys.issubset(first_row_keys):
missing = required_keys - first_row_keys
raise ValueError(f"Records must contain 'input' and 'expected_output' fields. Missing: {missing}")
raise ValueError(f"Records must contain the 'input' field")

# Validate consistency within new data
for row in data:
Expand Down Expand Up @@ -363,7 +363,10 @@ def _prepare_batch_payload(self, overwrite: bool) -> Dict[str, Any]:
if self._changes['added']:
insert_records = []
for record in self._changes['added']:
new_record = {"input": record["input"], "expected_output": record["expected_output"]}
new_record = {"input": record["input"]}
if record.get("expected_output"):
new_record["expected_output"] = record["expected_output"]

metadata = {k: v for k, v in record.items() if k not in ["input", "expected_output", "record_id"]}
if metadata:
new_record["metadata"] = metadata
Expand Down Expand Up @@ -539,8 +542,10 @@ def _build_insert_record(record: Dict[str, Any]) -> Dict[str, Any]:
"""Convert an internal record representation into the *insert_records* payload format."""
new_rec = {
"input": record["input"],
"expected_output": record["expected_output"],

}
if record.get("expected_output"):
new_rec["expected_output"] = record["expected_output"]
metadata = {k: v for k, v in record.items() if k not in ["input", "expected_output", "record_id"]}
if metadata:
new_rec["metadata"] = metadata
Expand All @@ -556,7 +561,7 @@ def _build_update_record(old: Dict[str, Any], new: Dict[str, Any]) -> Dict[str,
if old.get("input") != new.get("input"):
upd["input"] = new["input"]
if old.get("expected_output") != new.get("expected_output"):
upd["expected_output"] = new["expected_output"]
upd["expected_output"] = new.get("expected_output")
# Diff metadata.
old_meta = {k: v for k, v in old.items() if k not in ["input", "expected_output", "record_id"]}
new_meta = {k: v for k, v in new.items() if k not in ["input", "expected_output", "record_id"]}
Expand Down Expand Up @@ -605,7 +610,7 @@ def _send_batch_updates(
attrs["update_records"] = update_records
if delete_records:
attrs["delete_records"] = delete_records

# Use create_new_version for first chunk, then overwrite=True for subsequent chunks
# to append to the version established by the first chunk
if idx == 0:
Expand Down Expand Up @@ -657,12 +662,11 @@ def from_csv(
Dataset: A new Dataset instance containing the CSV data, structured for LLM experiments.

Raises:
ValueError: If input_columns or expected_output_columns are not provided,
or if the CSV is missing those columns, or if the file is empty.
ValueError: If input_columns is not provided, or if the CSV is missing those columns, or if the file is empty.
DatasetFileError: If there are issues reading the CSV file (e.g., file not found, permission error, malformed).
"""
if input_columns is None or expected_output_columns is None:
raise ValueError("`input_columns` and `expected_output_columns` must be provided.")
if input_columns is None:
raise ValueError("`input_columns` must be provided.")

data = []
try:
Expand All @@ -683,7 +687,9 @@ def from_csv(

header_columns = reader.fieldnames
missing_input_columns = [col for col in input_columns if col not in header_columns]
missing_output_columns = [col for col in expected_output_columns if col not in header_columns]
missing_output_columns = False
if expected_output_columns is not None:
missing_output_columns = [col for col in expected_output_columns if col not in header_columns]

if missing_input_columns:
raise ValueError(f"Input columns not found in CSV header: {missing_input_columns}")
Expand All @@ -698,7 +704,7 @@ def from_csv(

# Determine metadata columns (all columns not used for input or expected output)
metadata_columns = [
col for col in header_columns if col not in input_columns and col not in expected_output_columns
col for col in header_columns if col not in input_columns and (expected_output_columns is not None and col not in expected_output_columns)
]

for row in rows:
Expand All @@ -713,7 +719,9 @@ def from_csv(

try:
input_data = row[input_columns[0]] if len(input_columns) == 1 else {col: row[col] for col in input_columns}
expected_output_data = row[expected_output_columns[0]] if len(expected_output_columns) == 1 else {col: row[col] for col in expected_output_columns}
expected_output_data = None
if expected_output_columns is not None and len(expected_output_columns) > 0:
expected_output_data = row[expected_output_columns[0]] if len(expected_output_columns) == 1 else {col: row[col] for col in expected_output_columns}

metadata = {}
for col in metadata_columns:
Expand All @@ -726,13 +734,14 @@ def from_csv(
# Other errors during row processing also indicate CSV issues
raise DatasetFileError(f"Error parsing CSV file (row processing): {e}")

data.append(
{
"input": input_data,
"expected_output": expected_output_data,
**metadata,
}
)
to_append = {
"input": input_data,
**metadata,
}

if expected_output_data:
to_append["expected_output"] = expected_output_data
data.append(to_append)
except csv.Error as e:
# Catch CSV-specific parsing errors
raise DatasetFileError(f"Error parsing CSV file: {e}")
Expand Down
12 changes: 5 additions & 7 deletions tests/llmobs/test_experimentation_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -710,11 +710,11 @@ def test_push_synced_no_changes(self, experiments_vcr, synced_dataset, capsys):
"""Test pushing a synced dataset with no local changes."""
initial_len = len(synced_dataset)
initial_version = synced_dataset._datadog_dataset_version

# This cassette should ideally show no POST requests or minimal GETs
with experiments_vcr.use_cassette("test_dataset_push_synced_no_change.yaml"):
synced_dataset.push()

captured = capsys.readouterr()
assert f"Dataset '{synced_dataset.name}' (v{initial_version}) is already synced and has no pending changes" in captured.out

Expand Down Expand Up @@ -1093,11 +1093,9 @@ def test_from_csv_missing_output_column(self, csv_file_simple):

def test_from_csv_missing_column_specifications(self, csv_file_simple):
"""Test calling from_csv without input/output columns raises ValueError."""
with pytest.raises(ValueError, match="`input_columns` and `expected_output_columns` must be provided"):
with pytest.raises(ValueError, match="`input_columns` must be provided"):
dne.Dataset.from_csv(csv_file_simple, name="bad")
with pytest.raises(ValueError, match="`input_columns` and `expected_output_columns` must be provided"):
dne.Dataset.from_csv(csv_file_simple, name="bad", input_columns=["question"])
with pytest.raises(ValueError, match="`input_columns` and `expected_output_columns` must be provided"):
with pytest.raises(ValueError, match="`input_columns` must be provided"):
dne.Dataset.from_csv(csv_file_simple, name="bad", expected_output_columns=["answer"])

def test_from_csv_malformed_file(self, csv_file_malformed):
Expand Down Expand Up @@ -1216,4 +1214,4 @@ def test_repr_empty_dataset(self):
assert "Structure:" not in rep_clean # No structure derivable when empty
assert "Datadog: Local only" in rep_clean
# Should show the deletion as a change until pushed/synced
assert "Changes: -1 deleted" in rep_clean
assert "Changes: -1 deleted" in rep_clean
Loading