Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions adapters/python/coflux/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
from __future__ import annotations

import collections
import datetime
import decimal
import importlib
import io
import json
import pickle
import tempfile
import uuid
from pathlib import Path
from typing import Any, Callable

Expand Down Expand Up @@ -71,13 +74,29 @@ def _encode(v: Any) -> Any:
"type": "dict",
"items": [_encode(x) for kv in items for x in kv],
}
elif isinstance(v, set):
elif isinstance(v, (set, frozenset)):
return {
"type": "set",
"items": [_encode(x) for x in sorted(v, key=repr)],
}
elif isinstance(v, tuple):
return {"type": "tuple", "items": [_encode(x) for x in v]}
elif isinstance(v, datetime.datetime):
return {"type": "datetime", "value": v.isoformat()}
elif isinstance(v, datetime.date):
return {"type": "date", "value": v.isoformat()}
elif isinstance(v, datetime.time):
return {"type": "time", "value": v.isoformat()}
elif isinstance(v, datetime.timedelta):
return {"type": "duration", "value": v.total_seconds()}
elif isinstance(v, (bytes, bytearray)):
path = write_temp_file(v)
references.append(["fragment", "bytes", path, len(v), {}])
return {"type": "ref", "index": len(references) - 1}
elif isinstance(v, decimal.Decimal):
return {"type": "decimal", "value": str(v)}
elif isinstance(v, uuid.UUID):
return {"type": "uuid", "value": str(v)}
elif isinstance(v, Execution):
references.append(["execution", v.id, v.module, v.target])
return {"type": "ref", "index": len(references) - 1}
Expand Down Expand Up @@ -183,6 +202,18 @@ def _decode(v: Any) -> Any:
return {_decode(x) for x in v["items"]}
elif t == "tuple":
return tuple(_decode(x) for x in v["items"])
elif t == "datetime":
return datetime.datetime.fromisoformat(v["value"])
elif t == "date":
return datetime.date.fromisoformat(v["value"])
elif t == "time":
return datetime.time.fromisoformat(v["value"])
elif t == "duration":
return datetime.timedelta(seconds=v["value"])
elif t == "decimal":
return decimal.Decimal(v["value"])
elif t == "uuid":
return uuid.UUID(v["value"])
elif t == "ref":
return _resolve_ref(v["index"])
else:
Expand All @@ -204,7 +235,10 @@ def _resolve_ref(index: int) -> Any:
metadata = ref[4] if len(ref) > 4 and isinstance(ref[4], dict) else {}
if not path:
return None
if fmt == "pickle":
if fmt == "bytes":
with open(path, "rb") as f:
return f.read()
elif fmt == "pickle":
with open(path, "rb") as f:
return pickle.load(f)
elif fmt == "json" and "model" in metadata:
Expand Down
2 changes: 2 additions & 0 deletions cli/cmd/coflux/follow.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ const (
colorGreen = "\033[32m"
colorYellow = "\033[33m"
colorBlue = "\033[34m"
colorMagenta = "\033[35m"
colorCyan = "\033[36m"
colorDim = "\033[90m"
colorDimGreen = "\033[2;32m"
colorBrightRed = "\033[91m"
Expand Down
15 changes: 9 additions & 6 deletions cli/cmd/coflux/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func resolveStepAttempt(data map[string]any, stepAttempt string) (string, error)
// interpolateTemplate replaces {key} placeholders in the template with
// formatted values. Returns the interpolated message and any extra key-value
// pairs not referenced in the template.
func interpolateTemplate(template string, values map[string]any) (string, map[string]string) {
func interpolateTemplate(template string, values map[string]any, color bool) (string, map[string]string) {
if len(values) == 0 {
return template, nil
}
Expand All @@ -135,7 +135,7 @@ func interpolateTemplate(template string, values map[string]any) (string, map[st
if !strings.Contains(result, placeholder) {
continue
}
formatted := formatLogValue(val)
formatted := formatLogValue(val, color)
result = strings.ReplaceAll(result, placeholder, formatted)
used[key] = true
}
Expand All @@ -147,15 +147,15 @@ func interpolateTemplate(template string, values map[string]any) (string, map[st
if extras == nil {
extras = map[string]string{}
}
extras[key] = formatLogValue(val)
extras[key] = formatLogValue(val, color)
}
return result, extras
}

// formatLogValue formats a single log value entry.
// The value is expected to be {"type": "raw", "data": ..., "references": [...]}
// or {"type": "blob", "key": "...", "size": N, "references": [...]}.
func formatLogValue(val any) string {
func formatLogValue(val any, color bool) string {
v, ok := val.(map[string]any)
if !ok {
return fmt.Sprintf("%v", val)
Expand All @@ -164,7 +164,10 @@ func formatLogValue(val any) string {
switch typ {
case "raw":
references, _ := v["references"].([]any)
return formatLogData(v["data"], references)
if color {
return formatLogData(v["data"], references)
}
return formatLogDataPlain(v["data"], references)
case "blob":
size, _ := v["size"].(float64)
return fmt.Sprintf("<blob (%s)>", humanSize(int64(size)))
Expand Down Expand Up @@ -269,7 +272,7 @@ type logState struct {
// printLogEntry prints a log entry, showing date and execution headers only
// when they change. Returns updated state.
func printLogEntry(entry logclient.LogEntry, labelMap map[string]executionLabel, color bool, state logState) logState {
message, extras := interpolateTemplate(entry.Template, entry.Values)
message, extras := interpolateTemplate(entry.Template, entry.Values, color)
t := time.UnixMilli(entry.Timestamp).UTC()
date := t.Format("2006-01-02")
ts := t.Format("15:04:05.000")
Expand Down
184 changes: 151 additions & 33 deletions cli/cmd/coflux/runs.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package main

import (
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"strings"

"github.com/bitroot/coflux/cli/internal/blob"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"golang.org/x/term"
)

Expand Down Expand Up @@ -64,14 +69,19 @@ func init() {
}

var runsResultCmd = &cobra.Command{
Use: "result <run-id>",
Short: "Get the result of a run",
Long: `Get the result of a run.
Use: "result <target>",
Short: "Get the result of a run, step, or execution",
Long: `Get the result of a run, step, or execution.

Returns the JSON-formatted result of the root workflow execution.
The target can be:
<run-id> Result of the initial step's latest execution
<run-id>:<step> Result of the latest execution of a specific step
<run-id>:<step>:<attempt> Result of a specific execution

Example:
coflux runs result abc123`,
coflux runs result RTbj
coflux runs result RTbj:2
coflux runs result RTbj:2:1`,
Args: cobra.ExactArgs(1),
RunE: runRunsResult,
}
Expand Down Expand Up @@ -110,17 +120,7 @@ func findRootStep(data map[string]any) (step map[string]any, exec map[string]any
continue
}
step = s

executions, _ := s["executions"].(map[string]any)
var latestAttempt string
for attempt := range executions {
if latestAttempt == "" || attempt > latestAttempt {
latestAttempt = attempt
}
}
if latestAttempt != "" {
exec, _ = executions[latestAttempt].(map[string]any)
}
exec = latestExecution(s)
return
}
return
Expand Down Expand Up @@ -196,28 +196,90 @@ func runRunsInspect(cmd *cobra.Command, args []string) error {
return nil
}

func runRunsResult(cmd *cobra.Command, args []string) error {
runID := args[0]
// findStepByNumber finds a step by its step number.
func findStepByNumber(data map[string]any, stepNumber string) map[string]any {
steps, _ := data["steps"].(map[string]any)
for _, stepData := range steps {
s, ok := stepData.(map[string]any)
if !ok {
continue
}
sn, _ := s["stepNumber"].(float64)
if fmt.Sprintf("%d", int(sn)) == stepNumber {
return s
}
}
return nil
}

data, _, err := captureRunTopic(cmd, runID)
// latestExecution returns the latest execution (highest attempt) of a step.
func latestExecution(step map[string]any) map[string]any {
executions, _ := step["executions"].(map[string]any)
var latestAttempt string
for attempt := range executions {
if latestAttempt == "" || attempt > latestAttempt {
latestAttempt = attempt
}
}
if latestAttempt != "" {
exec, _ := executions[latestAttempt].(map[string]any)
return exec
}
return nil
}

// parseResultTarget parses a target string into runID, stepNumber, attempt.
func parseResultTarget(target string) (runID, stepNumber, attempt string) {
parts := strings.SplitN(target, ":", 3)
runID = parts[0]
if len(parts) >= 2 {
stepNumber = parts[1]
}
if len(parts) >= 3 {
attempt = parts[2]
}
return
}

// loadBlobData loads a blob's JSON data using the blob store.
func loadBlobData(key string) (any, error) {
token, err := resolveToken()
if err != nil {
return err
return nil, err
}
stores, err := createBlobStoresFromViper(token)
if err != nil {
return nil, fmt.Errorf("failed to create blob stores: %w", err)
}
if len(stores) == 0 {
return nil, fmt.Errorf("blob store not configured")
}

_, rootExec := findRootStep(data)
if rootExec == nil {
return fmt.Errorf("no execution found for run %s", runID)
blobManager := blob.NewManager(stores, filepath.Join(os.TempDir(), "coflux-cache", "blobs"), viper.GetInt("blobs.threshold"))
reader, err := blobManager.Get(key)
if err != nil {
return nil, fmt.Errorf("failed to load blob: %w", err)
}
defer func() { _ = reader.Close() }()

result, _ := rootExec["result"].(map[string]any)
if result == nil {
return fmt.Errorf("run %s has no result yet", runID)
blobData, err := io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("failed to read blob: %w", err)
}

if isOutput("json") {
return outputJSON(result)
var parsed any
if err := json.Unmarshal(blobData, &parsed); err != nil {
return nil, fmt.Errorf("failed to parse blob JSON: %w", err)
}
return parsed, nil
}

func printResult(result map[string]any) error {
color := term.IsTerminal(int(os.Stdout.Fd()))
fmtData := formatDataPlain
if color {
fmtData = formatData
}
resultType, _ := result["type"].(string)
switch resultType {
case "value":
Expand All @@ -226,13 +288,19 @@ func runRunsResult(cmd *cobra.Command, args []string) error {
break
}
valueType, _ := value["type"].(string)
references, _ := value["references"].([]any)
switch valueType {
case "raw":
references, _ := value["references"].([]any)
fmt.Println(formatData(value["data"], references))
fmt.Println(fmtData(value["data"], references))
case "blob":
size, _ := value["size"].(float64)
fmt.Printf("<blob (%s)>\n", humanSize(int64(size)))
key, _ := value["key"].(string)
blobData, err := loadBlobData(key)
if err != nil {
size, _ := value["size"].(float64)
fmt.Printf("<blob (%s)>\n", humanSize(int64(size)))
} else {
fmt.Println(fmtData(blobData, references))
}
}
case "error":
errData, _ := result["error"].(map[string]any)
Expand Down Expand Up @@ -270,10 +338,60 @@ func runRunsResult(cmd *cobra.Command, args []string) error {
default:
fmt.Println(resultType)
}

return nil
}

func runRunsResult(cmd *cobra.Command, args []string) error {
target := args[0]
runID, stepNumber, attempt := parseResultTarget(target)

data, _, err := captureRunTopic(cmd, runID)
if err != nil {
return err
}

var exec map[string]any
if stepNumber == "" {
// Run ID only: get the root step's latest execution
_, exec = findRootStep(data)
if exec == nil {
return fmt.Errorf("no execution found for run %s", runID)
}
} else if attempt == "" {
// Run ID + step number: get latest execution of that step
step := findStepByNumber(data, stepNumber)
if step == nil {
return fmt.Errorf("step %s not found in run %s", stepNumber, runID)
}
exec = latestExecution(step)
if exec == nil {
return fmt.Errorf("no execution found for step %s in run %s", stepNumber, runID)
}
} else {
// Run ID + step number + attempt: get specific execution
step := findStepByNumber(data, stepNumber)
if step == nil {
return fmt.Errorf("step %s not found in run %s", stepNumber, runID)
}
executions, _ := step["executions"].(map[string]any)
exec, _ = executions[attempt].(map[string]any)
if exec == nil {
return fmt.Errorf("execution attempt %s not found for step %s in run %s", attempt, stepNumber, runID)
}
}

result, _ := exec["result"].(map[string]any)
if result == nil {
return fmt.Errorf("no result yet")
}

if isOutput("json") {
return outputJSON(result)
}

return printResult(result)
}

var runsCancelCmd = &cobra.Command{
Use: "cancel <execution-id>",
Short: "Cancel an execution",
Expand Down
Loading