diff --git a/ddtrace/llmobs/experimentation/_dataset.py b/ddtrace/llmobs/experimentation/_dataset.py index 2d7e2838a41..e1bf81f0336 100644 --- a/ddtrace/llmobs/experimentation/_dataset.py +++ b/ddtrace/llmobs/experimentation/_dataset.py @@ -222,10 +222,10 @@ def _validate_data(self, data: List[Dict[str, Union[str, Dict[str, Any]]]]) -> N f"got {sorted(new_keys)}" ) - required_keys = {'input', 'expected_output'} + required_keys = {'input'} if not required_keys.issubset(first_row_keys): missing = required_keys - first_row_keys - raise ValueError(f"Records must contain 'input' and 'expected_output' fields. Missing: {missing}") + raise ValueError(f"Records must contain the 'input' field") # Validate consistency within new data for row in data: @@ -363,7 +363,10 @@ def _prepare_batch_payload(self, overwrite: bool) -> Dict[str, Any]: if self._changes['added']: insert_records = [] for record in self._changes['added']: - new_record = {"input": record["input"], "expected_output": record["expected_output"]} + new_record = {"input": record["input"]} + if record.get("expected_output"): + new_record["expected_output"] = record["expected_output"] + metadata = {k: v for k, v in record.items() if k not in ["input", "expected_output", "record_id"]} if metadata: new_record["metadata"] = metadata @@ -539,8 +542,10 @@ def _build_insert_record(record: Dict[str, Any]) -> Dict[str, Any]: """Convert an internal record representation into the *insert_records* payload format.""" new_rec = { "input": record["input"], - "expected_output": record["expected_output"], + } + if record.get("expected_output"): + new_rec["expected_output"] = record["expected_output"] metadata = {k: v for k, v in record.items() if k not in ["input", "expected_output", "record_id"]} if metadata: new_rec["metadata"] = metadata @@ -556,7 +561,7 @@ def _build_update_record(old: Dict[str, Any], new: Dict[str, Any]) -> Dict[str, if old.get("input") != new.get("input"): upd["input"] = new["input"] if old.get("expected_output") != new.get("expected_output"): - upd["expected_output"] = new["expected_output"] + upd["expected_output"] = new.get("expected_output") # Diff metadata. old_meta = {k: v for k, v in old.items() if k not in ["input", "expected_output", "record_id"]} new_meta = {k: v for k, v in new.items() if k not in ["input", "expected_output", "record_id"]} @@ -605,7 +610,7 @@ def _send_batch_updates( attrs["update_records"] = update_records if delete_records: attrs["delete_records"] = delete_records - + # Use create_new_version for first chunk, then overwrite=True for subsequent chunks # to append to the version established by the first chunk if idx == 0: @@ -657,12 +662,11 @@ def from_csv( Dataset: A new Dataset instance containing the CSV data, structured for LLM experiments. Raises: - ValueError: If input_columns or expected_output_columns are not provided, - or if the CSV is missing those columns, or if the file is empty. + ValueError: If input_columns is not provided, or if the CSV is missing those columns, or if the file is empty. DatasetFileError: If there are issues reading the CSV file (e.g., file not found, permission error, malformed). """ - if input_columns is None or expected_output_columns is None: - raise ValueError("`input_columns` and `expected_output_columns` must be provided.") + if input_columns is None: + raise ValueError("`input_columns` must be provided.") data = [] try: @@ -683,7 +687,9 @@ def from_csv( header_columns = reader.fieldnames missing_input_columns = [col for col in input_columns if col not in header_columns] - missing_output_columns = [col for col in expected_output_columns if col not in header_columns] + missing_output_columns = False + if expected_output_columns is not None: + missing_output_columns = [col for col in expected_output_columns if col not in header_columns] if missing_input_columns: raise ValueError(f"Input columns not found in CSV header: {missing_input_columns}") @@ -698,7 +704,7 @@ def from_csv( # Determine metadata columns (all columns not used for input or expected output) metadata_columns = [ - col for col in header_columns if col not in input_columns and col not in expected_output_columns + col for col in header_columns if col not in input_columns and (expected_output_columns is not None and col not in expected_output_columns) ] for row in rows: @@ -713,7 +719,9 @@ def from_csv( try: input_data = row[input_columns[0]] if len(input_columns) == 1 else {col: row[col] for col in input_columns} - expected_output_data = row[expected_output_columns[0]] if len(expected_output_columns) == 1 else {col: row[col] for col in expected_output_columns} + expected_output_data = None + if expected_output_columns is not None and len(expected_output_columns) > 0: + expected_output_data = row[expected_output_columns[0]] if len(expected_output_columns) == 1 else {col: row[col] for col in expected_output_columns} metadata = {} for col in metadata_columns: @@ -726,13 +734,14 @@ def from_csv( # Other errors during row processing also indicate CSV issues raise DatasetFileError(f"Error parsing CSV file (row processing): {e}") - data.append( - { - "input": input_data, - "expected_output": expected_output_data, - **metadata, - } - ) + to_append = { + "input": input_data, + **metadata, + } + + if expected_output_data: + to_append["expected_output"] = expected_output_data + data.append(to_append) except csv.Error as e: # Catch CSV-specific parsing errors raise DatasetFileError(f"Error parsing CSV file: {e}") diff --git a/tests/llmobs/test_experimentation_dataset.py b/tests/llmobs/test_experimentation_dataset.py index 35e40903c73..d1a636c0f05 100644 --- a/tests/llmobs/test_experimentation_dataset.py +++ b/tests/llmobs/test_experimentation_dataset.py @@ -710,11 +710,11 @@ def test_push_synced_no_changes(self, experiments_vcr, synced_dataset, capsys): """Test pushing a synced dataset with no local changes.""" initial_len = len(synced_dataset) initial_version = synced_dataset._datadog_dataset_version - + # This cassette should ideally show no POST requests or minimal GETs with experiments_vcr.use_cassette("test_dataset_push_synced_no_change.yaml"): synced_dataset.push() - + captured = capsys.readouterr() assert f"Dataset '{synced_dataset.name}' (v{initial_version}) is already synced and has no pending changes" in captured.out @@ -1093,11 +1093,9 @@ def test_from_csv_missing_output_column(self, csv_file_simple): def test_from_csv_missing_column_specifications(self, csv_file_simple): """Test calling from_csv without input/output columns raises ValueError.""" - with pytest.raises(ValueError, match="`input_columns` and `expected_output_columns` must be provided"): + with pytest.raises(ValueError, match="`input_columns` must be provided"): dne.Dataset.from_csv(csv_file_simple, name="bad") - with pytest.raises(ValueError, match="`input_columns` and `expected_output_columns` must be provided"): - dne.Dataset.from_csv(csv_file_simple, name="bad", input_columns=["question"]) - with pytest.raises(ValueError, match="`input_columns` and `expected_output_columns` must be provided"): + with pytest.raises(ValueError, match="`input_columns` must be provided"): dne.Dataset.from_csv(csv_file_simple, name="bad", expected_output_columns=["answer"]) def test_from_csv_malformed_file(self, csv_file_malformed): @@ -1216,4 +1214,4 @@ def test_repr_empty_dataset(self): assert "Structure:" not in rep_clean # No structure derivable when empty assert "Datadog: Local only" in rep_clean # Should show the deletion as a change until pushed/synced - assert "Changes: -1 deleted" in rep_clean \ No newline at end of file + assert "Changes: -1 deleted" in rep_clean