diff --git a/adapters/python/coflux/serialization.py b/adapters/python/coflux/serialization.py index 837483b..c1c7ab4 100644 --- a/adapters/python/coflux/serialization.py +++ b/adapters/python/coflux/serialization.py @@ -3,11 +3,14 @@ from __future__ import annotations import collections +import datetime +import decimal import importlib import io import json import pickle import tempfile +import uuid from pathlib import Path from typing import Any, Callable @@ -71,13 +74,29 @@ def _encode(v: Any) -> Any: "type": "dict", "items": [_encode(x) for kv in items for x in kv], } - elif isinstance(v, set): + elif isinstance(v, (set, frozenset)): return { "type": "set", "items": [_encode(x) for x in sorted(v, key=repr)], } elif isinstance(v, tuple): return {"type": "tuple", "items": [_encode(x) for x in v]} + elif isinstance(v, datetime.datetime): + return {"type": "datetime", "value": v.isoformat()} + elif isinstance(v, datetime.date): + return {"type": "date", "value": v.isoformat()} + elif isinstance(v, datetime.time): + return {"type": "time", "value": v.isoformat()} + elif isinstance(v, datetime.timedelta): + return {"type": "duration", "value": v.total_seconds()} + elif isinstance(v, (bytes, bytearray)): + path = write_temp_file(v) + references.append(["fragment", "bytes", path, len(v), {}]) + return {"type": "ref", "index": len(references) - 1} + elif isinstance(v, decimal.Decimal): + return {"type": "decimal", "value": str(v)} + elif isinstance(v, uuid.UUID): + return {"type": "uuid", "value": str(v)} elif isinstance(v, Execution): references.append(["execution", v.id, v.module, v.target]) return {"type": "ref", "index": len(references) - 1} @@ -183,6 +202,18 @@ def _decode(v: Any) -> Any: return {_decode(x) for x in v["items"]} elif t == "tuple": return tuple(_decode(x) for x in v["items"]) + elif t == "datetime": + return datetime.datetime.fromisoformat(v["value"]) + elif t == "date": + return datetime.date.fromisoformat(v["value"]) + elif t == "time": + return datetime.time.fromisoformat(v["value"]) + elif t == "duration": + return datetime.timedelta(seconds=v["value"]) + elif t == "decimal": + return decimal.Decimal(v["value"]) + elif t == "uuid": + return uuid.UUID(v["value"]) elif t == "ref": return _resolve_ref(v["index"]) else: @@ -204,7 +235,10 @@ def _resolve_ref(index: int) -> Any: metadata = ref[4] if len(ref) > 4 and isinstance(ref[4], dict) else {} if not path: return None - if fmt == "pickle": + if fmt == "bytes": + with open(path, "rb") as f: + return f.read() + elif fmt == "pickle": with open(path, "rb") as f: return pickle.load(f) elif fmt == "json" and "model" in metadata: diff --git a/cli/cmd/coflux/follow.go b/cli/cmd/coflux/follow.go index 1c8018e..11a6ea7 100644 --- a/cli/cmd/coflux/follow.go +++ b/cli/cmd/coflux/follow.go @@ -509,6 +509,8 @@ const ( colorGreen = "\033[32m" colorYellow = "\033[33m" colorBlue = "\033[34m" + colorMagenta = "\033[35m" + colorCyan = "\033[36m" colorDim = "\033[90m" colorDimGreen = "\033[2;32m" colorBrightRed = "\033[91m" diff --git a/cli/cmd/coflux/logs.go b/cli/cmd/coflux/logs.go index efb3fb1..4537dea 100644 --- a/cli/cmd/coflux/logs.go +++ b/cli/cmd/coflux/logs.go @@ -124,7 +124,7 @@ func resolveStepAttempt(data map[string]any, stepAttempt string) (string, error) // interpolateTemplate replaces {key} placeholders in the template with // formatted values. Returns the interpolated message and any extra key-value // pairs not referenced in the template. -func interpolateTemplate(template string, values map[string]any) (string, map[string]string) { +func interpolateTemplate(template string, values map[string]any, color bool) (string, map[string]string) { if len(values) == 0 { return template, nil } @@ -135,7 +135,7 @@ func interpolateTemplate(template string, values map[string]any) (string, map[st if !strings.Contains(result, placeholder) { continue } - formatted := formatLogValue(val) + formatted := formatLogValue(val, color) result = strings.ReplaceAll(result, placeholder, formatted) used[key] = true } @@ -147,7 +147,7 @@ func interpolateTemplate(template string, values map[string]any) (string, map[st if extras == nil { extras = map[string]string{} } - extras[key] = formatLogValue(val) + extras[key] = formatLogValue(val, color) } return result, extras } @@ -155,7 +155,7 @@ func interpolateTemplate(template string, values map[string]any) (string, map[st // formatLogValue formats a single log value entry. // The value is expected to be {"type": "raw", "data": ..., "references": [...]} // or {"type": "blob", "key": "...", "size": N, "references": [...]}. -func formatLogValue(val any) string { +func formatLogValue(val any, color bool) string { v, ok := val.(map[string]any) if !ok { return fmt.Sprintf("%v", val) @@ -164,7 +164,10 @@ func formatLogValue(val any) string { switch typ { case "raw": references, _ := v["references"].([]any) - return formatLogData(v["data"], references) + if color { + return formatLogData(v["data"], references) + } + return formatLogDataPlain(v["data"], references) case "blob": size, _ := v["size"].(float64) return fmt.Sprintf("", humanSize(int64(size))) @@ -269,7 +272,7 @@ type logState struct { // printLogEntry prints a log entry, showing date and execution headers only // when they change. Returns updated state. func printLogEntry(entry logclient.LogEntry, labelMap map[string]executionLabel, color bool, state logState) logState { - message, extras := interpolateTemplate(entry.Template, entry.Values) + message, extras := interpolateTemplate(entry.Template, entry.Values, color) t := time.UnixMilli(entry.Timestamp).UTC() date := t.Format("2006-01-02") ts := t.Format("15:04:05.000") diff --git a/cli/cmd/coflux/runs.go b/cli/cmd/coflux/runs.go index dc99e3a..bd782d2 100644 --- a/cli/cmd/coflux/runs.go +++ b/cli/cmd/coflux/runs.go @@ -1,11 +1,16 @@ package main import ( + "encoding/json" "fmt" + "io" "os" + "path/filepath" "strings" + "github.com/bitroot/coflux/cli/internal/blob" "github.com/spf13/cobra" + "github.com/spf13/viper" "golang.org/x/term" ) @@ -64,14 +69,19 @@ func init() { } var runsResultCmd = &cobra.Command{ - Use: "result ", - Short: "Get the result of a run", - Long: `Get the result of a run. + Use: "result ", + Short: "Get the result of a run, step, or execution", + Long: `Get the result of a run, step, or execution. -Returns the JSON-formatted result of the root workflow execution. +The target can be: + Result of the initial step's latest execution + : Result of the latest execution of a specific step + :: Result of a specific execution Example: - coflux runs result abc123`, + coflux runs result RTbj + coflux runs result RTbj:2 + coflux runs result RTbj:2:1`, Args: cobra.ExactArgs(1), RunE: runRunsResult, } @@ -110,17 +120,7 @@ func findRootStep(data map[string]any) (step map[string]any, exec map[string]any continue } step = s - - executions, _ := s["executions"].(map[string]any) - var latestAttempt string - for attempt := range executions { - if latestAttempt == "" || attempt > latestAttempt { - latestAttempt = attempt - } - } - if latestAttempt != "" { - exec, _ = executions[latestAttempt].(map[string]any) - } + exec = latestExecution(s) return } return @@ -196,28 +196,90 @@ func runRunsInspect(cmd *cobra.Command, args []string) error { return nil } -func runRunsResult(cmd *cobra.Command, args []string) error { - runID := args[0] +// findStepByNumber finds a step by its step number. +func findStepByNumber(data map[string]any, stepNumber string) map[string]any { + steps, _ := data["steps"].(map[string]any) + for _, stepData := range steps { + s, ok := stepData.(map[string]any) + if !ok { + continue + } + sn, _ := s["stepNumber"].(float64) + if fmt.Sprintf("%d", int(sn)) == stepNumber { + return s + } + } + return nil +} - data, _, err := captureRunTopic(cmd, runID) +// latestExecution returns the latest execution (highest attempt) of a step. +func latestExecution(step map[string]any) map[string]any { + executions, _ := step["executions"].(map[string]any) + var latestAttempt string + for attempt := range executions { + if latestAttempt == "" || attempt > latestAttempt { + latestAttempt = attempt + } + } + if latestAttempt != "" { + exec, _ := executions[latestAttempt].(map[string]any) + return exec + } + return nil +} + +// parseResultTarget parses a target string into runID, stepNumber, attempt. +func parseResultTarget(target string) (runID, stepNumber, attempt string) { + parts := strings.SplitN(target, ":", 3) + runID = parts[0] + if len(parts) >= 2 { + stepNumber = parts[1] + } + if len(parts) >= 3 { + attempt = parts[2] + } + return +} + +// loadBlobData loads a blob's JSON data using the blob store. +func loadBlobData(key string) (any, error) { + token, err := resolveToken() if err != nil { - return err + return nil, err + } + stores, err := createBlobStoresFromViper(token) + if err != nil { + return nil, fmt.Errorf("failed to create blob stores: %w", err) + } + if len(stores) == 0 { + return nil, fmt.Errorf("blob store not configured") } - _, rootExec := findRootStep(data) - if rootExec == nil { - return fmt.Errorf("no execution found for run %s", runID) + blobManager := blob.NewManager(stores, filepath.Join(os.TempDir(), "coflux-cache", "blobs"), viper.GetInt("blobs.threshold")) + reader, err := blobManager.Get(key) + if err != nil { + return nil, fmt.Errorf("failed to load blob: %w", err) } + defer func() { _ = reader.Close() }() - result, _ := rootExec["result"].(map[string]any) - if result == nil { - return fmt.Errorf("run %s has no result yet", runID) + blobData, err := io.ReadAll(reader) + if err != nil { + return nil, fmt.Errorf("failed to read blob: %w", err) } - if isOutput("json") { - return outputJSON(result) + var parsed any + if err := json.Unmarshal(blobData, &parsed); err != nil { + return nil, fmt.Errorf("failed to parse blob JSON: %w", err) } + return parsed, nil +} +func printResult(result map[string]any) error { + color := term.IsTerminal(int(os.Stdout.Fd())) + fmtData := formatDataPlain + if color { + fmtData = formatData + } resultType, _ := result["type"].(string) switch resultType { case "value": @@ -226,13 +288,19 @@ func runRunsResult(cmd *cobra.Command, args []string) error { break } valueType, _ := value["type"].(string) + references, _ := value["references"].([]any) switch valueType { case "raw": - references, _ := value["references"].([]any) - fmt.Println(formatData(value["data"], references)) + fmt.Println(fmtData(value["data"], references)) case "blob": - size, _ := value["size"].(float64) - fmt.Printf("\n", humanSize(int64(size))) + key, _ := value["key"].(string) + blobData, err := loadBlobData(key) + if err != nil { + size, _ := value["size"].(float64) + fmt.Printf("\n", humanSize(int64(size))) + } else { + fmt.Println(fmtData(blobData, references)) + } } case "error": errData, _ := result["error"].(map[string]any) @@ -270,10 +338,60 @@ func runRunsResult(cmd *cobra.Command, args []string) error { default: fmt.Println(resultType) } - return nil } +func runRunsResult(cmd *cobra.Command, args []string) error { + target := args[0] + runID, stepNumber, attempt := parseResultTarget(target) + + data, _, err := captureRunTopic(cmd, runID) + if err != nil { + return err + } + + var exec map[string]any + if stepNumber == "" { + // Run ID only: get the root step's latest execution + _, exec = findRootStep(data) + if exec == nil { + return fmt.Errorf("no execution found for run %s", runID) + } + } else if attempt == "" { + // Run ID + step number: get latest execution of that step + step := findStepByNumber(data, stepNumber) + if step == nil { + return fmt.Errorf("step %s not found in run %s", stepNumber, runID) + } + exec = latestExecution(step) + if exec == nil { + return fmt.Errorf("no execution found for step %s in run %s", stepNumber, runID) + } + } else { + // Run ID + step number + attempt: get specific execution + step := findStepByNumber(data, stepNumber) + if step == nil { + return fmt.Errorf("step %s not found in run %s", stepNumber, runID) + } + executions, _ := step["executions"].(map[string]any) + exec, _ = executions[attempt].(map[string]any) + if exec == nil { + return fmt.Errorf("execution attempt %s not found for step %s in run %s", attempt, stepNumber, runID) + } + } + + result, _ := exec["result"].(map[string]any) + if result == nil { + return fmt.Errorf("no result yet") + } + + if isOutput("json") { + return outputJSON(result) + } + + return printResult(result) +} + var runsCancelCmd = &cobra.Command{ Use: "cancel ", Short: "Cancel an execution", diff --git a/cli/cmd/coflux/value.go b/cli/cmd/coflux/value.go index ef6f401..ec59126 100644 --- a/cli/cmd/coflux/value.go +++ b/cli/cmd/coflux/value.go @@ -5,30 +5,68 @@ import ( "strings" ) +func formatDuration(seconds float64) string { + days := int(seconds) / 86400 + hours := (int(seconds) % 86400) / 3600 + minutes := (int(seconds) % 3600) / 60 + secs := seconds - float64(int(seconds)-int(seconds)%60) + var parts []string + if days > 0 { + parts = append(parts, fmt.Sprintf("%dd", days)) + } + if hours > 0 { + parts = append(parts, fmt.Sprintf("%dh", hours)) + } + if minutes > 0 { + parts = append(parts, fmt.Sprintf("%dm", minutes)) + } + if secs > 0 || len(parts) == 0 { + if secs == float64(int(secs)) { + parts = append(parts, fmt.Sprintf("%ds", int(secs))) + } else { + parts = append(parts, fmt.Sprintf("%gs", secs)) + } + } + return strings.Join(parts, "") +} + type refFormatter func(ref any) string -// formatDataWith recursively formats a Data value as human-readable text, -// using the provided reference formatter to resolve {"type": "ref", "index": N} entries. -func formatDataWith(data any, references []any, fmtRef refFormatter) string { +type valueFormatter struct { + fmtRef refFormatter + colored bool +} + +func (f *valueFormatter) color(color, s string) string { + if !f.colored { + return s + } + return color + s + colorReset +} + +func (f *valueFormatter) format(data any, references []any, depth int) string { switch v := data.(type) { case string: - return fmt.Sprintf(`"%s"`, v) + return f.color(colorGreen, fmt.Sprintf(`"%s"`, v)) case float64: + var s string if v == float64(int64(v)) { - return fmt.Sprintf("%d", int64(v)) + s = fmt.Sprintf("%d", int64(v)) + } else { + s = fmt.Sprintf("%g", v) } - return fmt.Sprintf("%g", v) + return f.color(colorMagenta, s) case bool: if v { - return "True" + return f.color(colorRed, "True") } - return "False" + return f.color(colorRed, "False") case nil: - return "None" + return f.color(colorRed, "None") case []any: items := make([]string, len(v)) for i, item := range v { - items[i] = formatDataWith(item, references, fmtRef) + items[i] = f.format(item, references, depth) } return "[" + strings.Join(items, ", ") + "]" case map[string]any: @@ -36,44 +74,70 @@ func formatDataWith(data any, references []any, fmtRef refFormatter) string { switch typ { case "dict": items, _ := v["items"].([]any) - pairs := make([]string, 0, len(items)/2) + if len(items) == 0 { + return "{}" + } + indent := strings.Repeat(" ", depth+1) + outdent := strings.Repeat(" ", depth) + arrow := f.color(colorDim, " ↦ ") + var lines []string for i := 0; i+1 < len(items); i += 2 { - key := formatDataWith(items[i], references, fmtRef) - val := formatDataWith(items[i+1], references, fmtRef) - pairs = append(pairs, key+": "+val) + key := f.format(items[i], references, depth+1) + val := f.format(items[i+1], references, depth+1) + lines = append(lines, indent+key+arrow+val) } - return "{" + strings.Join(pairs, ", ") + "}" + return "{\n" + strings.Join(lines, "\n") + "\n" + outdent + "}" case "set": items, _ := v["items"].([]any) + if len(items) == 0 { + return "∅" + } parts := make([]string, len(items)) for i, item := range items { - parts[i] = formatDataWith(item, references, fmtRef) + parts[i] = f.format(item, references, depth) } return "{" + strings.Join(parts, ", ") + "}" case "tuple": items, _ := v["items"].([]any) parts := make([]string, len(items)) for i, item := range items { - parts[i] = formatDataWith(item, references, fmtRef) + parts[i] = f.format(item, references, depth) } return "(" + strings.Join(parts, ", ") + ")" + case "datetime", "date", "time": + val, _ := v["value"].(string) + return f.color(colorYellow, val) + case "duration": + seconds, _ := v["value"].(float64) + return f.color(colorYellow, formatDuration(seconds)) + case "decimal": + val, _ := v["value"].(string) + return f.color(colorCyan, val) + case "uuid": + val, _ := v["value"].(string) + return val case "ref": index, _ := v["index"].(float64) idx := int(index) if idx >= 0 && idx < len(references) { - return fmtRef(references[idx]) + return f.color(colorDim, f.fmtRef(references[idx])) } - return "" + return f.color(colorDim, "") } } return fmt.Sprintf("%v", data) } -// formatData recursively formats a Data value as human-readable text. -// The references slice corresponds to the value's references array, -// used to resolve {"type": "ref", "index": N} entries. +// formatData formats a Data value as human-readable text with colors. func formatData(data any, references []any) string { - return formatDataWith(data, references, formatReference) + f := &valueFormatter{fmtRef: formatReference, colored: true} + return f.format(data, references, 0) +} + +// formatDataPlain formats a Data value as plain text (no colors). +func formatDataPlain(data any, references []any) string { + f := &valueFormatter{fmtRef: formatReference, colored: false} + return f.format(data, references, 0) } func formatReference(ref any) string { @@ -88,9 +152,9 @@ func formatReference(ref any) string { module, _ := r["module"].(string) target, _ := r["target"].(string) if module != "" || target != "" { - return fmt.Sprintf("", module, target) + return fmt.Sprintf("", module, target) } - return "" + return "" case "asset": asset, _ := r["asset"].(map[string]any) if asset != nil { @@ -112,8 +176,6 @@ func formatReference(ref any) string { } // formatLogReference formats a reference in the flat log format. -// Log references have fields at the top level (e.g. r["module"], r["target"]) -// rather than nested under r["execution"] or r["asset"]. func formatLogReference(ref any) string { r, ok := ref.(map[string]any) if !ok { @@ -126,9 +188,9 @@ func formatLogReference(ref any) string { module, _ := r["module"].(string) target, _ := r["target"].(string) if module != "" || target != "" { - return fmt.Sprintf("", module, target) + return fmt.Sprintf("", module, target) } - return "" + return "" case "asset": name, _ := r["name"].(string) totalCount, _ := r["totalCount"].(float64) @@ -145,7 +207,14 @@ func formatLogReference(ref any) string { return "" } -// formatLogData formats a log value's data using the flat log reference format. +// formatLogData formats a log value's data using the flat log reference format with colors. func formatLogData(data any, references []any) string { - return formatDataWith(data, references, formatLogReference) + f := &valueFormatter{fmtRef: formatLogReference, colored: true} + return f.format(data, references, 0) +} + +// formatLogDataPlain formats a log value's data as plain text. +func formatLogDataPlain(data any, references []any) string { + f := &valueFormatter{fmtRef: formatLogReference, colored: false} + return f.format(data, references, 0) }