diff --git a/cli/cmd/coflux/follow.go b/cli/cmd/coflux/follow.go index 11a6ea7..e7251c8 100644 --- a/cli/cmd/coflux/follow.go +++ b/cli/cmd/coflux/follow.go @@ -33,7 +33,7 @@ func watchRun(ctx context.Context, host string, secure bool, token string, runID } defer client.Close() - sub := client.Subscribe("runs/"+runID+"/"+workspaceID, nil) + sub := client.Subscribe("workspaces/"+workspaceID+"/runs/"+runID, nil) defer sub.Unsubscribe() // Ctrl+C detaches (stops watching) but leaves the workflow running @@ -102,7 +102,7 @@ func waitForRootResult(ctx context.Context, host string, secure bool, token stri } defer client.Close() - sub := client.Subscribe("runs/"+runID+"/"+workspaceID, nil) + sub := client.Subscribe("workspaces/"+workspaceID+"/runs/"+runID, nil) defer sub.Unsubscribe() for { @@ -511,6 +511,7 @@ const ( colorBlue = "\033[34m" colorMagenta = "\033[35m" colorCyan = "\033[36m" + colorBold = "\033[1m" colorDim = "\033[90m" colorDimGreen = "\033[2;32m" colorBrightRed = "\033[91m" diff --git a/cli/cmd/coflux/main.go b/cli/cmd/coflux/main.go index 96e264b..1685aac 100644 --- a/cli/cmd/coflux/main.go +++ b/cli/cmd/coflux/main.go @@ -98,7 +98,9 @@ func init() { assetsCmd.GroupID = "management" blobsCmd.GroupID = "management" logsCmd.GroupID = "management" - rootCmd.AddCommand(workspacesCmd, manifestsCmd, poolsCmd, tokensCmd, assetsCmd, blobsCmd, logsCmd) + sessionsCmd.GroupID = "management" + queueCmd.GroupID = "management" + rootCmd.AddCommand(workspacesCmd, manifestsCmd, poolsCmd, tokensCmd, assetsCmd, blobsCmd, logsCmd, sessionsCmd, queueCmd) } func initConfig(cmd *cobra.Command, args []string) error { diff --git a/cli/cmd/coflux/manifests.go b/cli/cmd/coflux/manifests.go index 93fc458..58b7b74 100644 --- a/cli/cmd/coflux/manifests.go +++ b/cli/cmd/coflux/manifests.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sort" + "strings" "github.com/bitroot/coflux/cli/internal/adapter" "github.com/bitroot/coflux/cli/internal/api" @@ -16,6 +17,7 @@ var manifestsCmd = &cobra.Command{ } var manifestsAdapter []string +var manifestsInspectWatch bool func init() { manifestsCmd.PersistentFlags().StringSliceVar(&manifestsAdapter, "adapter", nil, "Adapter command (e.g., --adapter python,-m,coflux)") @@ -24,6 +26,8 @@ func init() { manifestsCmd.AddCommand(manifestsRegisterCmd) manifestsCmd.AddCommand(manifestsArchiveCmd) manifestsCmd.AddCommand(manifestsInspectCmd) + + manifestsInspectCmd.Flags().BoolVar(&manifestsInspectWatch, "watch", false, "Watch for changes") } var manifestsDiscoverCmd = &cobra.Command{ @@ -192,6 +196,11 @@ func runManifestsInspect(cmd *cobra.Command, args []string) error { return err } + token, err := resolveToken() + if err != nil { + return err + } + client, err := newClient() if err != nil { return err @@ -202,18 +211,40 @@ func runManifestsInspect(cmd *cobra.Command, args []string) error { return err } + if isOutput("json") { + manifests, err := client.GetManifests(cmd.Context(), workspaceID) + if err != nil { + return fmt.Errorf("failed to get manifests: %w", err) + } + return outputJSON(manifests) + } + + if manifestsInspectWatch { + return watchTopics(cmd.Context(), getHost(), isSecure(), token, + []string{"workspaces/" + workspaceID + "/manifests"}, + func(data []map[string]any) []string { + if data[0] == nil { + return nil + } + return renderManifestLines(data[0]) + }, + ) + } + manifests, err := client.GetManifests(cmd.Context(), workspaceID) if err != nil { return fmt.Errorf("failed to get manifests: %w", err) } - if isOutput("json") { - return outputJSON(manifests) + for _, line := range renderManifestLines(manifests) { + fmt.Println(line) } + return nil +} +func renderManifestLines(manifests map[string]any) []string { if len(manifests) == 0 { - fmt.Println("No modules registered.") - return nil + return []string{colorDim + "No modules registered." + colorReset} } modules := make([]string, 0, len(manifests)) @@ -222,10 +253,15 @@ func runManifestsInspect(cmd *cobra.Command, args []string) error { } sort.Strings(modules) - for _, module := range modules { + var lines []string + for i, module := range modules { + if i > 0 { + lines = append(lines, "") + } + lines = append(lines, colorBold+module+colorReset) + targets, ok := manifests[module].(map[string]any) if !ok { - fmt.Printf("%s\n", module) continue } names := make([]string, 0, len(targets)) @@ -234,11 +270,34 @@ func runManifestsInspect(cmd *cobra.Command, args []string) error { } sort.Strings(names) for _, name := range names { - fmt.Printf("%s/%s\n", module, name) + detail := formatTargetDetail(targets[name]) + if detail != "" { + lines = append(lines, " "+name+" "+colorDim+detail+colorReset) + } else { + lines = append(lines, " "+name) + } } } + return lines +} - return nil +func formatTargetDetail(raw any) string { + t, ok := raw.(map[string]any) + if !ok { + return "" + } + if params, ok := t["parameters"].([]any); ok && len(params) > 0 { + var names []string + for _, p := range params { + if pm, ok := p.(map[string]any); ok { + if name, ok := pm["name"].(string); ok { + names = append(names, name) + } + } + } + return "(" + strings.Join(names, ", ") + ")" + } + return "" } func buildManifests(manifest *adapter.DiscoveryManifest) map[string]map[string]any { diff --git a/cli/cmd/coflux/pools.go b/cli/cmd/coflux/pools.go index 83f5654..48a02c0 100644 --- a/cli/cmd/coflux/pools.go +++ b/cli/cmd/coflux/pools.go @@ -2,7 +2,9 @@ package main import ( "fmt" + "sort" "strings" + "time" "github.com/spf13/cobra" ) @@ -14,8 +16,10 @@ var poolsCmd = &cobra.Command{ func init() { poolsCmd.AddCommand(poolsListCmd) + poolsCmd.AddCommand(poolsGetCmd) poolsCmd.AddCommand(poolsUpdateCmd) poolsCmd.AddCommand(poolsDeleteCmd) + poolsCmd.AddCommand(poolsLaunchesCmd) } // pools list @@ -46,6 +50,10 @@ func runPoolsList(cmd *cobra.Command, args []string) error { return err } + if isOutput("json") { + return outputJSON(pools) + } + if len(pools) == 0 { fmt.Println("No pools found.") return nil @@ -53,7 +61,10 @@ func runPoolsList(cmd *cobra.Command, args []string) error { var rows [][]string for name, pool := range pools { - launcher := getString(pool, "launcherType") + launcher := "" + if l, ok := pool["launcher"].(map[string]any); ok { + launcher = getString(l, "type") + } modules := "" if m, ok := pool["modules"].([]any); ok { var mods []string @@ -72,12 +83,369 @@ func runPoolsList(cmd *cobra.Command, args []string) error { return nil } +// pools get +var poolsGetCmd = &cobra.Command{ + Use: "get ", + Short: "Get pool configuration", + Args: cobra.ExactArgs(1), + RunE: runPoolsGet, +} + +func runPoolsGet(cmd *cobra.Command, args []string) error { + name := args[0] + + workspace, err := requireWorkspace() + if err != nil { + return err + } + + client, err := newClient() + if err != nil { + return err + } + + workspaceID, err := resolveWorkspaceID(cmd.Context(), client, workspace) + if err != nil { + return err + } + + pool, err := client.GetPool(cmd.Context(), workspaceID, name) + if err != nil { + return fmt.Errorf("failed to get pool '%s': %w", name, err) + } + + if isOutput("json") { + return outputJSON(pool) + } + + // Modules + if modules, ok := pool["modules"].([]any); ok && len(modules) > 0 { + var mods []string + for _, m := range modules { + if s, ok := m.(string); ok { + mods = append(mods, s) + } + } + fmt.Printf("Modules: %s\n", strings.Join(mods, ", ")) + } + + // Provides + if provides := encodeProvides(pool["provides"]); provides != "" { + fmt.Printf("Provides: %s\n", provides) + } + + // Launcher + if launcher, ok := pool["launcher"].(map[string]any); ok { + fmt.Printf("Launcher: %s\n", getString(launcher, "type")) + if image := getString(launcher, "image"); image != "" { + fmt.Printf("Image: %s\n", image) + } + if dockerHost := getString(launcher, "dockerHost"); dockerHost != "" { + fmt.Printf("Docker host: %s\n", dockerHost) + } + if serverHost := getString(launcher, "serverHost"); serverHost != "" { + fmt.Printf("Server host: %s\n", serverHost) + } + } + + return nil +} + +// pools launches +var poolsLaunchesWatch bool + +var poolsLaunchesCmd = &cobra.Command{ + Use: "launches [worker-id]", + Short: "Show pool launches", + Long: `Show launches for a pool. Without a worker ID, lists all workers. +With a worker ID, shows details for that specific launch. + +Use --watch to stream live updates.`, + Args: cobra.RangeArgs(1, 2), + RunE: runPoolsLaunches, +} + +func init() { + poolsLaunchesCmd.Flags().BoolVar(&poolsLaunchesWatch, "watch", false, "Watch for changes") +} + +func runPoolsLaunches(cmd *cobra.Command, args []string) error { + poolName := args[0] + + if len(args) == 2 { + return runPoolsLaunchDetail(cmd, poolName, args[1]) + } + + if poolsLaunchesWatch { + return runPoolsLaunchFollow(cmd, poolName) + } + + return runPoolsLaunchList(cmd, poolName) +} + +func runPoolsLaunchList(cmd *cobra.Command, poolName string) error { + workspace, err := requireWorkspace() + if err != nil { + return err + } + + client, err := newClient() + if err != nil { + return err + } + + workspaceID, err := resolveWorkspaceID(cmd.Context(), client, workspace) + if err != nil { + return err + } + + data, err := client.CaptureTopic(cmd.Context(), "workspaces/"+workspaceID+"/pools/"+poolName) + if err != nil { + return fmt.Errorf("pool '%s' not found", poolName) + } + + workers, _ := data["workers"].(map[string]any) + + if isOutput("json") { + return outputJSON(workers) + } + + for _, line := range renderWorkerLines(data) { + fmt.Println(line) + } + return nil +} + +func runPoolsLaunchFollow(cmd *cobra.Command, poolName string) error { + workspace, err := requireWorkspace() + if err != nil { + return err + } + + apiClient, err := newClient() + if err != nil { + return err + } + + workspaceID, err := resolveWorkspaceID(cmd.Context(), apiClient, workspace) + if err != nil { + return err + } + + token, err := resolveToken() + if err != nil { + return err + } + + return watchTopics(cmd.Context(), getHost(), isSecure(), token, + []string{"workspaces/" + workspaceID + "/pools/" + poolName}, + func(data []map[string]any) []string { + if data[0] == nil { + return nil + } + return renderWorkerLines(data[0]) + }, + ) +} + +type workerRow struct { + id string + status string + error string + ts int64 +} + +func buildWorkerRows(workers map[string]any) []workerRow { + var rows []workerRow + for id, raw := range workers { + w, ok := raw.(map[string]any) + if !ok { + continue + } + status, errMsg, ts := workerStatus(w) + rows = append(rows, workerRow{id: id, status: status, error: errMsg, ts: ts}) + } + // Sort by timestamp descending (most recent first) + sort.Slice(rows, func(i, j int) bool { + return rows[i].ts > rows[j].ts + }) + return rows +} + +func renderWorkerLines(data map[string]any) []string { + workers, _ := data["workers"].(map[string]any) + if workers == nil { + return []string{colorDim + "No launches." + colorReset} + } + + rows := buildWorkerRows(workers) + + if len(rows) == 0 { + return []string{colorDim + "No launches." + colorReset} + } + + var lines []string + for _, r := range rows { + ts := colorDim + formatMillis(r.ts) + colorReset + statusStr := formatWorkerStatus(r.status) + line := fmt.Sprintf("%s %s %s", ts, r.id, statusStr) + if r.error != "" { + line += " " + r.error + } + lines = append(lines, line) + } + return lines +} + +func workerStatus(w map[string]any) (status, errMsg string, ts int64) { + startingAt := getFloat64(w, "startingAt") + startedAt := getFloat64(w, "startedAt") + deactivatedAt := getFloat64(w, "deactivatedAt") + stoppingAt := getFloat64(w, "stoppingAt") + startError, _ := w["startError"].(string) + workerError, _ := w["error"].(string) + connected, _ := w["connected"].(bool) + + if deactivatedAt > 0 { + ts = int64(deactivatedAt) + if workerError != "" { + return "failed", workerError, ts + } + if startError != "" { + return "failed", startError, ts + } + return "stopped", "", ts + } + if stoppingAt > 0 { + return "stopping", "", int64(stoppingAt) + } + if startedAt > 0 { + ts = int64(startedAt) + if connected { + return "running", "", ts + } + return "started", "", ts + } + if startError != "" { + return "failed", startError, int64(startingAt) + } + if startingAt > 0 { + return "starting", "", int64(startingAt) + } + return "unknown", "", 0 +} + +func formatWorkerStatus(status string) string { + switch status { + case "running": + return colorGreen + "◆ Running" + colorReset + case "started": + return colorBlue + "◆ Started" + colorReset + case "starting": + return colorDim + "◇ Starting" + colorReset + case "stopping": + return colorYellow + "◆ Stopping" + colorReset + case "stopped": + return colorDim + "◇ Stopped" + colorReset + case "failed": + return colorRed + "✗ Failed" + colorReset + default: + return colorDim + "◇ Unknown" + colorReset + } +} + +func runPoolsLaunchDetail(cmd *cobra.Command, poolName, workerID string) error { + workspace, err := requireWorkspace() + if err != nil { + return err + } + + client, err := newClient() + if err != nil { + return err + } + + workspaceID, err := resolveWorkspaceID(cmd.Context(), client, workspace) + if err != nil { + return err + } + + data, err := client.CaptureTopic(cmd.Context(), "workspaces/"+workspaceID+"/pools/"+poolName) + if err != nil { + return fmt.Errorf("pool '%s' not found", poolName) + } + + workers, _ := data["workers"].(map[string]any) + if workers == nil { + return fmt.Errorf("worker '%s' not found", workerID) + } + + w, ok := workers[workerID].(map[string]any) + if !ok { + return fmt.Errorf("worker '%s' not found", workerID) + } + + if isOutput("json") { + return outputJSON(w) + } + + // Text output + status, errMsg, _ := workerStatus(w) + statusLine := formatWorkerStatus(status) + if errMsg != "" { + statusLine += " " + errMsg + } + fmt.Printf("Status: %s\n", statusLine) + + if v := getFloat64(w, "startingAt"); v > 0 { + fmt.Printf("Starting at: %s\n", formatMillis(int64(v))) + } + if v := getFloat64(w, "startedAt"); v > 0 { + fmt.Printf("Started at: %s\n", formatMillis(int64(v))) + } + if v := getFloat64(w, "stoppingAt"); v > 0 { + fmt.Printf("Stopping at: %s\n", formatMillis(int64(v))) + } + if v := getFloat64(w, "stoppedAt"); v > 0 { + fmt.Printf("Stopped at: %s\n", formatMillis(int64(v))) + } + if v := getFloat64(w, "deactivatedAt"); v > 0 { + fmt.Printf("Deactivated at: %s\n", formatMillis(int64(v))) + } + if connected, ok := w["connected"].(bool); ok { + fmt.Printf("Connected: %v\n", connected) + } + + if logs, _ := w["logs"].(string); logs != "" { + fmt.Printf("\nLogs:\n%s%s%s\n", colorDim, logs, colorReset) + } + + return nil +} + +func formatMillis(ms int64) string { + t := time.Unix(ms/1000, (ms%1000)*int64(time.Millisecond)).UTC() + return t.Format("2006-01-02 15:04:05 UTC") +} + +func getFloat64(m map[string]any, key string) float64 { + if v, ok := m[key]; ok { + if f, ok := v.(float64); ok { + return f + } + } + return 0 +} + // pools update var ( - poolsUpdateModules []string - poolsUpdateProvides []string - poolsUpdateDockerImage string - poolsUpdateDockerHost string + poolsUpdateModules []string + poolsUpdateProvides []string + poolsUpdateDockerImage string + poolsUpdateDockerHost string + poolsUpdateNoDockerHost bool + poolsUpdateServerHost string + poolsUpdateNoServerHost bool ) var poolsUpdateCmd = &cobra.Command{ @@ -92,6 +460,11 @@ func init() { poolsUpdateCmd.Flags().StringSliceVar(&poolsUpdateProvides, "provides", nil, "Features that workers provide") poolsUpdateCmd.Flags().StringVar(&poolsUpdateDockerImage, "docker-image", "", "Docker image") poolsUpdateCmd.Flags().StringVar(&poolsUpdateDockerHost, "docker-host", "", "Docker host") + poolsUpdateCmd.Flags().BoolVar(&poolsUpdateNoDockerHost, "no-docker-host", false, "Unset Docker host (use default socket)") + poolsUpdateCmd.Flags().StringVar(&poolsUpdateServerHost, "server-host", "", "Coflux server host (overrides server default)") + poolsUpdateCmd.Flags().BoolVar(&poolsUpdateNoServerHost, "no-server-host", false, "Unset server host (use server default)") + poolsUpdateCmd.MarkFlagsMutuallyExclusive("docker-host", "no-docker-host") + poolsUpdateCmd.MarkFlagsMutuallyExclusive("server-host", "no-server-host") } func runPoolsUpdate(cmd *cobra.Command, args []string) error { @@ -125,7 +498,7 @@ func runPoolsUpdate(cmd *cobra.Command, args []string) error { if poolsUpdateProvides != nil { pool["provides"] = parseProvides(poolsUpdateProvides) } - if poolsUpdateDockerImage != "" || poolsUpdateDockerHost != "" { + if poolsUpdateDockerImage != "" || poolsUpdateDockerHost != "" || poolsUpdateNoDockerHost || poolsUpdateServerHost != "" || poolsUpdateNoServerHost { launcher, ok := pool["launcher"].(map[string]any) if !ok || getString(launcher, "type") != "docker" { launcher = map[string]any{"type": "docker"} @@ -135,6 +508,13 @@ func runPoolsUpdate(cmd *cobra.Command, args []string) error { } if poolsUpdateDockerHost != "" { launcher["dockerHost"] = poolsUpdateDockerHost + } else if poolsUpdateNoDockerHost { + delete(launcher, "dockerHost") + } + if poolsUpdateServerHost != "" { + launcher["serverHost"] = poolsUpdateServerHost + } else if poolsUpdateNoServerHost { + delete(launcher, "serverHost") } pool["launcher"] = launcher } diff --git a/cli/cmd/coflux/queue.go b/cli/cmd/coflux/queue.go new file mode 100644 index 0000000..0d37d21 --- /dev/null +++ b/cli/cmd/coflux/queue.go @@ -0,0 +1,346 @@ +package main + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + "github.com/spf13/cobra" +) + +var queueNoWatch bool + +var queueCmd = &cobra.Command{ + Use: "queue", + Short: "Show the execution queue", + RunE: runQueue, +} + +func init() { + queueCmd.Flags().BoolVar(&queueNoWatch, "no-watch", false, "Show queue snapshot and exit") +} + +type queueEntry struct { + id string + module string + target string + executeAfter float64 + assignedAt float64 + dependencies []string + requires map[string][]string +} + +func parseQueueEntries(data map[string]any) []queueEntry { + var entries []queueEntry + for id, raw := range data { + e, ok := raw.(map[string]any) + if !ok { + continue + } + entry := queueEntry{id: id} + entry.module, _ = e["module"].(string) + entry.target, _ = e["target"].(string) + if ea, ok := e["executeAfter"].(float64); ok { + entry.executeAfter = ea + } + if aa, ok := e["assignedAt"].(float64); ok { + entry.assignedAt = aa + } + if deps, ok := e["dependencies"].([]any); ok { + for _, d := range deps { + if s, ok := d.(string); ok { + entry.dependencies = append(entry.dependencies, s) + } + } + } + if req, ok := e["requires"].(map[string]any); ok { + entry.requires = make(map[string][]string) + for k, v := range req { + if arr, ok := v.([]any); ok { + for _, val := range arr { + if sv, ok := val.(string); ok { + entry.requires[k] = append(entry.requires[k], sv) + } + } + } + } + } + entries = append(entries, entry) + } + sort.Slice(entries, func(i, j int) bool { + return entries[i].id < entries[j].id + }) + return entries +} + +type queueStatus string + +const ( + queueStatusScheduled queueStatus = "scheduled" + queueStatusPaused queueStatus = "paused" + queueStatusDependencies queueStatus = "dependencies" + queueStatusNoSession queueStatus = "no-session" + queueStatusUnknown queueStatus = "unknown" + queueStatusAssigned queueStatus = "assigned" +) + +func getQueueEntryStatus(e queueEntry, sessions []sessionEntry, workspaceState string) queueStatus { + if e.assignedAt > 0 { + return queueStatusAssigned + } + if e.executeAfter > 0 && e.executeAfter > float64(time.Now().UnixMilli()) { + return queueStatusScheduled + } + if workspaceState == "paused" { + return queueStatusPaused + } + if len(e.dependencies) > 0 { + return queueStatusDependencies + } + if !hasCompatibleSessionForEntry(e, sessions) { + return queueStatusNoSession + } + return queueStatusUnknown +} + +func hasCompatibleSessionForEntry(e queueEntry, sessions []sessionEntry) bool { + for _, s := range sessions { + if !s.connected { + continue + } + if s.workerState != "" && s.workerState != "active" { + continue + } + if s.concurrency > 0 && s.executions >= s.concurrency { + continue + } + if !sessionHasTarget(s, e.module, e.target) { + continue + } + if !sessionSatisfiesRequires(s, e.requires) { + continue + } + return true + } + return false +} + +func sessionHasTarget(s sessionEntry, module, target string) bool { + if s.targetMap == nil { + return false + } + targets, ok := s.targetMap[module] + if !ok { + return false + } + for _, t := range targets { + if t == target { + return true + } + } + return false +} + +func sessionSatisfiesRequires(s sessionEntry, requires map[string][]string) bool { + for k, values := range requires { + provided, ok := s.provides[k] + if !ok { + return false + } + found := false + for _, v := range values { + for _, p := range provided { + if v == p { + found = true + break + } + } + if found { + break + } + } + if !found { + return false + } + } + return true +} + +func queueStatusIndicator(status queueStatus) string { + switch status { + case queueStatusAssigned: + return colorBlue + "●" + colorReset + case queueStatusScheduled: + return colorDim + "●" + colorReset + case queueStatusPaused: + return colorYellow + "●" + colorReset + case queueStatusDependencies: + return "\033[38;5;208m●" + colorReset // orange + case queueStatusNoSession: + return colorRed + "●" + colorReset + case queueStatusUnknown: + return "\033[38;5;217m●" + colorReset // light red + default: + return colorDim + "●" + colorReset + } +} + +func queueStatusLabel(status queueStatus) string { + switch status { + case queueStatusAssigned: + return "assigned" + case queueStatusScheduled: + return "scheduled" + case queueStatusPaused: + return "paused" + case queueStatusDependencies: + return "waiting" + case queueStatusNoSession: + return "no session" + case queueStatusUnknown: + return "unknown" + default: + return "" + } +} + +func runQueue(cmd *cobra.Command, args []string) error { + workspace, err := requireWorkspace() + if err != nil { + return err + } + + token, err := resolveToken() + if err != nil { + return err + } + + client, err := newClient() + if err != nil { + return err + } + + workspaceID, err := resolveWorkspaceID(cmd.Context(), client, workspace) + if err != nil { + return err + } + + if isOutput("json") { + data, err := client.CaptureTopic(cmd.Context(), "workspaces/"+workspaceID+"/queue") + if err != nil { + return err + } + return outputJSON(data) + } + + if queueNoWatch { + data, err := client.CaptureTopic(cmd.Context(), "workspaces/"+workspaceID+"/queue") + if err != nil { + return err + } + sessionsData, err := client.CaptureTopic(cmd.Context(), "workspaces/"+workspaceID+"/sessions") + if err != nil { + return err + } + entries := parseQueueEntries(data) + sessions := parseSessionEntriesWithTargets(sessionsData) + printQueueTable(entries, sessions, "") + return nil + } + + return watchQueue(cmd.Context(), getHost(), isSecure(), token, workspaceID, workspace) +} + +func printQueueTable(entries []queueEntry, sessions []sessionEntry, workspaceState string) { + if len(entries) == 0 { + fmt.Println("Queue is empty.") + return + } + + lines := renderQueueTable(entries, sessions, workspaceState) + for _, line := range lines { + fmt.Println(line) + } +} + +func watchQueue(ctx context.Context, host string, secure bool, token string, workspaceID string, workspaceName string) error { + return watchTopics(ctx, host, secure, token, + []string{ + "workspaces/" + workspaceID + "/queue", + "workspaces/" + workspaceID + "/sessions", + "workspaces", + }, + func(data []map[string]any) []string { + queueData, sessionsData, workspacesData := data[0], data[1], data[2] + if queueData == nil || sessionsData == nil { + return nil + } + + entries := parseQueueEntries(queueData) + sessions := parseSessionEntriesWithTargets(sessionsData) + + workspaceState := "" + if workspacesData != nil { + for _, ws := range workspacesData { + if wsMap, ok := ws.(map[string]any); ok { + if name, _ := wsMap["name"].(string); name == workspaceName { + workspaceState, _ = wsMap["state"].(string) + break + } + } + } + } + + return renderQueueTable(entries, sessions, workspaceState) + }, + ) +} + +// parseSessionEntriesWithTargets parses session data including raw target maps +// for compatibility checking. +func parseSessionEntriesWithTargets(data map[string]any) []sessionEntry { + entries := parseSessionEntries(data) + // Re-parse to populate targetMap for compatibility checking + for i, entry := range entries { + if raw, ok := data[entry.id]; ok { + if s, ok := raw.(map[string]any); ok { + if targets, ok := s["targets"].(map[string]any); ok { + entries[i].targetMap = make(map[string][]string) + for module, names := range targets { + if arr, ok := names.([]any); ok { + for _, n := range arr { + if name, ok := n.(string); ok { + entries[i].targetMap[module] = append(entries[i].targetMap[module], name) + } + } + } + } + } + } + } + } + return entries +} + +func renderQueueTable(entries []queueEntry, sessions []sessionEntry, workspaceState string) []string { + headers := []string{"Execution", "Target", "Requires", "Dependencies", "Status"} + var rows [][]string + for _, e := range entries { + status := getQueueEntryStatus(e, sessions, workspaceState) + target := e.target + " " + colorDim + "(" + e.module + ")" + colorReset + deps := "" + if len(e.dependencies) > 0 { + deps = strings.Join(e.dependencies, ", ") + } + rows = append(rows, []string{ + queueStatusIndicator(status) + " " + e.id, + target, + formatProvides(e.requires), + deps, + colorDim + queueStatusLabel(status) + colorReset, + }) + } + return formatTable(headers, rows) +} diff --git a/cli/cmd/coflux/runs.go b/cli/cmd/coflux/runs.go index bd782d2..c9f785d 100644 --- a/cli/cmd/coflux/runs.go +++ b/cli/cmd/coflux/runs.go @@ -103,7 +103,7 @@ func captureRunTopic(cmd *cobra.Command, runID string) (map[string]any, string, return nil, "", err } - topicPath := fmt.Sprintf("runs/%s/%s", runID, workspaceID) + topicPath := fmt.Sprintf("workspaces/%s/runs/%s", workspaceID, runID) data, err := client.CaptureTopic(cmd.Context(), topicPath) if err != nil { return nil, "", fmt.Errorf("failed to get run: %w", err) diff --git a/cli/cmd/coflux/sessions.go b/cli/cmd/coflux/sessions.go new file mode 100644 index 0000000..fe783c2 --- /dev/null +++ b/cli/cmd/coflux/sessions.go @@ -0,0 +1,284 @@ +package main + +import ( + "context" + "fmt" + "sort" + "strings" + "unicode/utf8" + + "github.com/spf13/cobra" +) + +var sessionsCmd = &cobra.Command{ + Use: "sessions", + Short: "Manage sessions", +} + +var sessionsListWatch bool + +var sessionsListCmd = &cobra.Command{ + Use: "list", + Short: "List active sessions", + RunE: runSessionsList, +} + +func init() { + sessionsCmd.AddCommand(sessionsListCmd) + sessionsListCmd.Flags().BoolVar(&sessionsListWatch, "watch", false, "Watch for changes") +} + +type sessionEntry struct { + id string + connected bool + poolName string + workerState string + executions int + concurrency int + targets int + provides map[string][]string + targetMap map[string][]string // module -> []target, populated when needed for matching +} + +func parseSessionEntries(data map[string]any) []sessionEntry { + var entries []sessionEntry + for id, raw := range data { + s, ok := raw.(map[string]any) + if !ok { + continue + } + entry := sessionEntry{id: id} + entry.connected, _ = s["connected"].(bool) + if pn, ok := s["poolName"].(string); ok { + entry.poolName = pn + } + if ws, ok := s["workerState"].(string); ok { + entry.workerState = ws + } + if e, ok := s["executions"].(float64); ok { + entry.executions = int(e) + } + if c, ok := s["concurrency"].(float64); ok { + entry.concurrency = int(c) + } + if targets, ok := s["targets"].(map[string]any); ok { + for _, names := range targets { + if arr, ok := names.([]any); ok { + entry.targets += len(arr) + } + } + } + if provides, ok := s["provides"].(map[string]any); ok { + entry.provides = make(map[string][]string) + for k, v := range provides { + if arr, ok := v.([]any); ok { + for _, val := range arr { + if sv, ok := val.(string); ok { + entry.provides[k] = append(entry.provides[k], sv) + } + } + } + } + } + entries = append(entries, entry) + } + sort.Slice(entries, func(i, j int) bool { + return entries[i].id < entries[j].id + }) + return entries +} + +func formatSessionStatus(e sessionEntry) string { + if !e.connected { + return colorDim + "●" + colorReset + } + if e.workerState == "draining" || e.workerState == "paused" { + return colorYellow + "●" + colorReset + } + return colorGreen + "●" + colorReset +} + +func formatLoad(executions, concurrency int) string { + if concurrency > 0 { + return fmt.Sprintf("%d/%d", executions, concurrency) + } + return fmt.Sprintf("%d", executions) +} + +func formatProvides(provides map[string][]string) string { + if len(provides) == 0 { + return "" + } + var parts []string + keys := make([]string, 0, len(provides)) + for k := range provides { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + parts = append(parts, k+"="+strings.Join(provides[k], ",")) + } + return strings.Join(parts, "; ") +} + +func printSessions(entries []sessionEntry) { + if len(entries) == 0 { + fmt.Println("No active sessions.") + return + } + + lines := renderSessionsTable(entries) + for _, line := range lines { + fmt.Println(line) + } +} + +func runSessionsList(cmd *cobra.Command, args []string) error { + workspace, err := requireWorkspace() + if err != nil { + return err + } + + token, err := resolveToken() + if err != nil { + return err + } + + client, err := newClient() + if err != nil { + return err + } + + workspaceID, err := resolveWorkspaceID(cmd.Context(), client, workspace) + if err != nil { + return err + } + + if isOutput("json") { + data, err := client.CaptureTopic(cmd.Context(), "workspaces/"+workspaceID+"/sessions") + if err != nil { + return err + } + return outputJSON(data) + } + + if sessionsListWatch { + return watchSessions(cmd.Context(), getHost(), isSecure(), token, workspaceID) + } + + data, err := client.CaptureTopic(cmd.Context(), "workspaces/"+workspaceID+"/sessions") + if err != nil { + return err + } + printSessions(parseSessionEntries(data)) + return nil +} + +func watchSessions(ctx context.Context, host string, secure bool, token string, workspaceID string) error { + return watchTopics(ctx, host, secure, token, + []string{"workspaces/" + workspaceID + "/sessions"}, + func(data []map[string]any) []string { + if data[0] == nil { + return nil + } + return renderSessionsTable(parseSessionEntries(data[0])) + }, + ) +} + +func renderSessionsTable(entries []sessionEntry) []string { + headers := []string{"Session", "Pool", "Targets", "Provides", "Load"} + var rows [][]string + for _, e := range entries { + pool := e.poolName + if pool == "" { + pool = colorDim + "(none)" + colorReset + } + row := []string{ + formatSessionStatus(e) + " " + e.id, + pool, + fmt.Sprintf("%d", e.targets), + formatProvides(e.provides), + formatLoad(e.executions, e.concurrency), + } + if !e.connected { + for i := range row { + row[i] = colorDim + stripAnsi(row[i]) + colorReset + } + } + rows = append(rows, row) + } + return formatTable(headers, rows) +} + +// stripAnsi removes ANSI escape codes from a string (for re-wrapping in dim). +func stripAnsi(s string) string { + var out strings.Builder + i := 0 + for i < len(s) { + if s[i] == '\033' { + // Skip escape sequence + i++ + if i < len(s) && s[i] == '[' { + i++ + for i < len(s) && s[i] != 'm' { + i++ + } + if i < len(s) { + i++ // skip 'm' + } + } + } else { + out.WriteByte(s[i]) + i++ + } + } + return out.String() +} + +// visibleWidth returns the display width of a string after stripping ANSI codes. +func visibleWidth(s string) int { + return utf8.RuneCountInString(stripAnsi(s)) +} + +// formatTable formats headers and rows into lines (without printing). +func formatTable(headers []string, rows [][]string) []string { + widths := make([]int, len(headers)) + for i, h := range headers { + widths[i] = len(h) + } + for _, row := range rows { + for i, cell := range row { + w := visibleWidth(cell) + if i < len(widths) && w > widths[i] { + widths[i] = w + } + } + } + + var lines []string + + // Header + var hdr strings.Builder + for i, h := range headers { + fmt.Fprintf(&hdr, "%-*s ", widths[i], h) + } + lines = append(lines, hdr.String()) + + // Rows + for _, row := range rows { + var line strings.Builder + for i, cell := range row { + if i < len(widths) { + pad := widths[i] - visibleWidth(cell) + if pad < 0 { + pad = 0 + } + fmt.Fprintf(&line, "%s%*s ", cell, pad, "") + } + } + lines = append(lines, line.String()) + } + + return lines +} diff --git a/cli/cmd/coflux/watch.go b/cli/cmd/coflux/watch.go new file mode 100644 index 0000000..2bf846a --- /dev/null +++ b/cli/cmd/coflux/watch.go @@ -0,0 +1,119 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + + topicalclient "github.com/bitroot/coflux/cli/internal/topical" + topical "github.com/joefreeman/topical/client_go" + "golang.org/x/term" +) + +// watchTopics connects to the topical server, subscribes to the given topics, +// and calls render whenever any topic updates. The render function receives +// the current data for each topic (by index) and returns lines to display. +// Return nil from render to skip drawing (e.g. when waiting for all topics). +// The helper handles terminal cursor management, truncation, and signal handling. +func watchTopics(ctx context.Context, host string, secure bool, token string, topics []string, render func(data []map[string]any) []string) error { + client, err := topicalclient.Connect(ctx, host, secure, token) + if err != nil { + return fmt.Errorf("failed to connect: %w", err) + } + defer client.Close() + + subs := make([]*topical.Subscription, len(topics)) + for i, topic := range topics { + subs[i] = client.Subscribe(topic, nil) + defer subs[i].Unsubscribe() + } + + sigCtx, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) + defer stop() + + data := make([]map[string]any, len(topics)) + linesDrawn := 0 + + maxLines := 0 + if _, h, err := term.GetSize(int(os.Stdout.Fd())); err == nil && h > 0 { + maxLines = h - 1 + } + + doRender := func() { + lines := render(data) + if lines == nil { + return + } + + if linesDrawn > 0 { + fmt.Printf("\033[%dA", linesDrawn) + } + + totalRows := len(lines) - 1 // exclude header + truncated := 0 + if maxLines > 0 && len(lines) > maxLines { + truncated = len(lines) - (maxLines - 1) + lines = lines[:maxLines-1] + } + for _, line := range lines { + fmt.Printf("\r%s\033[K\n", line) + } + if truncated > 0 { + fmt.Printf("\r%s… %d more (%d total)%s\033[K\n", colorDim, truncated, totalRows, colorReset) + } + outputLines := len(lines) + if truncated > 0 { + outputLines++ + } + // Clear from cursor to end of screen + fmt.Print("\033[J") + linesDrawn = outputLines + } + + // Merge all subscription channels into a single event stream + type topicEvent struct { + index int + data map[string]any + err error + } + + events := make(chan topicEvent) + for i, sub := range subs { + go func(idx int, s *topical.Subscription) { + for { + select { + case value, ok := <-s.Values(): + if !ok { + events <- topicEvent{index: idx, err: fmt.Errorf("subscription closed unexpectedly")} + return + } + if d, ok := value.(map[string]any); ok { + events <- topicEvent{index: idx, data: d} + } + case err, ok := <-s.Err(): + if !ok { + events <- topicEvent{index: idx, err: fmt.Errorf("subscription closed unexpectedly")} + } else { + events <- topicEvent{index: idx, err: fmt.Errorf("subscription error: %w", err)} + } + return + } + } + }(i, sub) + } + + for { + select { + case ev := <-events: + if ev.err != nil { + return ev.err + } + data[ev.index] = ev.data + doRender() + case <-sigCtx.Done(): + return nil + } + } +} diff --git a/cli/cmd/coflux/worker.go b/cli/cmd/coflux/worker.go index 361f4d4..cdd1c31 100644 --- a/cli/cmd/coflux/worker.go +++ b/cli/cmd/coflux/worker.go @@ -80,6 +80,10 @@ func runWorker(cmd *cobra.Command, args []string) error { if err != nil { return err } + // For pool-launched workers, the session token doubles as the auth token + if token == "" && session != "" { + token = session + } cfg.Server.Token = token modules := args diff --git a/cli/internal/api/client.go b/cli/internal/api/client.go index a75c853..8aa7e77 100644 --- a/cli/internal/api/client.go +++ b/cli/internal/api/client.go @@ -90,38 +90,28 @@ func (c *Client) SubmitWorkflow(ctx context.Context, workspaceID, module, target // GetManifests retrieves the latest manifests for a workspace func (c *Client) GetManifests(ctx context.Context, workspaceID string) (map[string]any, error) { - var result map[string]any - params := url.Values{ - "workspaceId": {workspaceID}, - } - if err := c.get(ctx, "/api/get_manifests", params, &result); err != nil { - return nil, err - } - return result, nil + return c.CaptureTopic(ctx, "workspaces/"+workspaceID+"/manifests") } // GetWorkflow retrieves a workflow definition func (c *Client) GetWorkflow(ctx context.Context, workspaceID, module, target string) (map[string]any, error) { - var result map[string]any - params := url.Values{ - "workspaceId": {workspaceID}, - "module": {module}, - "target": {target}, - } - if err := c.get(ctx, "/api/get_workflow", params, &result); err != nil { + result, err := c.CaptureTopic(ctx, "workspaces/"+workspaceID+"/workflows/"+module+"/"+target) + if err != nil { return nil, err } + // Flatten configuration into top level for callers that expect it there + if config, ok := result["configuration"].(map[string]any); ok { + for k, v := range config { + result[k] = v + } + delete(result, "configuration") + } return result, nil } // GetAssetByID retrieves asset information by ID func (c *Client) GetAssetByID(ctx context.Context, assetID string) (map[string]any, error) { - var result map[string]any - params := url.Values{"asset": {assetID}} - if err := c.get(ctx, "/api/get_asset", params, &result); err != nil { - return nil, err - } - return result, nil + return c.CaptureTopic(ctx, "asset/"+assetID) } // Workspaces API @@ -129,7 +119,7 @@ func (c *Client) GetAssetByID(ctx context.Context, assetID string) (map[string]a // GetWorkspaces lists all workspaces func (c *Client) GetWorkspaces(ctx context.Context) (map[string]map[string]any, error) { var result map[string]map[string]any - if err := c.get(ctx, "/api/get_workspaces", nil, &result); err != nil { + if err := c.get(ctx, "/topics/workspaces", nil, &result); err != nil { return nil, err } return result, nil @@ -199,8 +189,7 @@ func (c *Client) ArchiveModule(ctx context.Context, workspaceID, moduleName stri // GetPools lists all pools in a workspace func (c *Client) GetPools(ctx context.Context, workspaceID string) (map[string]map[string]any, error) { var result map[string]map[string]any - params := url.Values{"workspaceId": {workspaceID}} - if err := c.get(ctx, "/api/get_pools", params, &result); err != nil { + if err := c.get(ctx, "/topics/workspaces/"+workspaceID+"/pools", nil, &result); err != nil { return nil, err } return result, nil @@ -208,11 +197,14 @@ func (c *Client) GetPools(ctx context.Context, workspaceID string) (map[string]m // GetPool gets a single pool func (c *Client) GetPool(ctx context.Context, workspaceID, pool string) (map[string]any, error) { - var result map[string]any - params := url.Values{"workspaceId": {workspaceID}, "pool": {pool}} - if err := c.get(ctx, "/api/get_pool", params, &result); err != nil { + result, err := c.CaptureTopic(ctx, "workspaces/"+workspaceID+"/pools/"+pool) + if err != nil { return nil, err } + // Extract the pool sub-object (topic returns {pool: {...}, workers: {...}}) + if p, ok := result["pool"].(map[string]any); ok { + return p, nil + } return result, nil } @@ -230,13 +222,15 @@ func (c *Client) UpdatePool(ctx context.Context, workspaceID, poolName string, p // ListTokens lists all service tokens func (c *Client) ListTokens(ctx context.Context) ([]map[string]any, error) { - var result struct { - Tokens []map[string]any `json:"tokens"` - } - if err := c.get(ctx, "/api/list_tokens", nil, &result); err != nil { + var result map[string]map[string]any + if err := c.get(ctx, "/topics/tokens", nil, &result); err != nil { return nil, err } - return result.Tokens, nil + tokens := make([]map[string]any, 0, len(result)) + for _, token := range result { + tokens = append(tokens, token) + } + return tokens, nil } // CreateToken creates a new service token diff --git a/server/lib/coflux/application.ex b/server/lib/coflux/application.ex index 3038e9b..45a5708 100644 --- a/server/lib/coflux/application.ex +++ b/server/lib/coflux/application.ex @@ -38,7 +38,11 @@ defmodule Coflux.Application do Topics.Module, Topics.Pools, Topics.Pool, - Topics.Search + Topics.Search, + Topics.Manifests, + Topics.Tokens, + Topics.Asset, + Topics.Queue ] end end diff --git a/server/lib/coflux/auth.ex b/server/lib/coflux/auth.ex index e2eaf72..c47125c 100644 --- a/server/lib/coflux/auth.ex +++ b/server/lib/coflux/auth.ex @@ -200,7 +200,12 @@ defmodule Coflux.Auth do case Orchestration.check_token(project_id, token_hash) do {:ok, %{workspaces: workspaces, principal_id: principal_id}} -> - {:ok, %{type: :service, workspaces: normalize_workspaces(workspaces), principal_id: principal_id}} + {:ok, + %{ + type: :service, + workspaces: normalize_workspaces(workspaces), + principal_id: principal_id + }} {:error, _reason} -> {:error, :unauthorized} @@ -268,7 +273,12 @@ defmodule Coflux.Auth do case Orchestration.check_token(project_id, random_hash) do {:ok, %{workspaces: workspaces, principal_id: principal_id}} -> - {:ok, %{type: :service, workspaces: normalize_workspaces(workspaces), principal_id: principal_id}} + {:ok, + %{ + type: :service, + workspaces: normalize_workspaces(workspaces), + principal_id: principal_id + }} {:error, _reason} -> {:error, :unauthorized} @@ -284,7 +294,9 @@ defmodule Coflux.Auth do external_id = claims["sub"] # Resolve external_id to principal_id {:ok, principal_id} = Orchestration.ensure_principal(project_id, external_id) - {:ok, %{type: :studio, workspaces: normalize_workspaces(workspaces), principal_id: principal_id}} + + {:ok, + %{type: :studio, workspaces: normalize_workspaces(workspaces), principal_id: principal_id}} else {:error, _reason} -> {:error, :unauthorized} diff --git a/server/lib/coflux/config.ex b/server/lib/coflux/config.ex index 6801db1..1779d8b 100644 --- a/server/lib/coflux/config.ex +++ b/server/lib/coflux/config.ex @@ -6,17 +6,22 @@ defmodule Coflux.Config do ## Project Configuration - At least one of the following must be configured: - - **COFLUX_PROJECT**: Restricts the server to a single project. All requests are routed to this project. Supports any access method including IP addresses. - - **COFLUX_BASE_DOMAIN**: Enables subdomain-based project routing. The project - is extracted from the subdomain (e.g., `acme.example.com` → project "acme"). - Requires subdomain access - direct IP or base domain access is not allowed. + - **COFLUX_PUBLIC_HOST**: The public-facing host for the server. Prefix with + `%` as a placeholder for the project ID to enable subdomain-based routing. + + Examples: + - `%.example.com:7777` — multi-project subdomain routing + - `%.localhost:7777` — local dev with multiple projects + - `myserver.internal:7777` — single-project, no subdomain routing - When both are set, subdomain routing is used but only the configured project - is allowed. + When the placeholder is present, the project is extracted from the subdomain + of incoming requests. When combined with COFLUX_PROJECT, subdomain routing + is used but only the configured project is accepted. + + When not set, falls back to `localhost:PORT`. ## Authentication Configuration @@ -44,8 +49,9 @@ defmodule Coflux.Config do """ def init do :persistent_term.put(:coflux_data_dir, parse_data_dir()) + :persistent_term.put(:coflux_port, String.to_integer(System.get_env("PORT", "7777"))) :persistent_term.put(:coflux_project, System.get_env("COFLUX_PROJECT")) - :persistent_term.put(:coflux_base_domain, System.get_env("COFLUX_BASE_DOMAIN")) + :persistent_term.put(:coflux_public_host, parse_public_host()) :persistent_term.put(:coflux_allowed_origins, parse_allowed_origins()) :persistent_term.put(:coflux_require_auth, parse_require_auth()) :persistent_term.put(:coflux_studio_teams, parse_studio_teams()) @@ -72,12 +78,59 @@ defmodule Coflux.Config do end @doc """ - Returns the base domain for subdomain-based routing, or nil if not set. + Returns the parsed public host configuration, or nil if not set. - When set, the project is extracted from the request's subdomain. + When set with a leading `%` (e.g., `%.example.com:7777`), returns + `{:template, suffix}` for subdomain-based routing. Without a `%` prefix, + returns the literal host string. + """ + def public_host do + :persistent_term.get(:coflux_public_host) + end + + @doc """ + Returns the base domain for subdomain-based routing, or nil. + + Derived from COFLUX_PUBLIC_HOST when it starts with `%`. + For example, `%.example.com:7777` yields base domain `example.com`. """ def base_domain do - :persistent_term.get(:coflux_base_domain) + case public_host() do + {:template, suffix} -> + suffix + |> String.trim_leading(".") + |> String.split(":") + |> hd() + + _ -> + nil + end + end + + @doc """ + Returns the host that workers should use to connect to this server. + + When COFLUX_PUBLIC_HOST starts with `%`, the project ID is substituted. + Otherwise the literal host is returned. Falls back to `localhost:PORT` + when not configured. + """ + def server_host(project_id \\ nil) do + port_suffix = + case :persistent_term.get(:coflux_port) do + port when port in [80, 443] -> "" + port -> ":#{port}" + end + + case public_host() do + {:template, suffix} -> + "#{project_id}#{suffix}" + + host when is_binary(host) -> + host + + nil -> + "localhost#{port_suffix}" + end end @doc """ @@ -134,6 +187,15 @@ defmodule Coflux.Config do :persistent_term.get(:coflux_secret) end + defp parse_public_host do + case System.get_env("COFLUX_PUBLIC_HOST") do + nil -> nil + "" -> nil + "%" <> suffix -> {:template, suffix} + host -> host + end + end + defp parse_data_dir do System.get_env("COFLUX_DATA_DIR", Path.join(File.cwd!(), "data")) end diff --git a/server/lib/coflux/handlers/api.ex b/server/lib/coflux/handlers/api.ex index f761869..a8ba121 100644 --- a/server/lib/coflux/handlers/api.ex +++ b/server/lib/coflux/handlers/api.ex @@ -110,22 +110,6 @@ defmodule Coflux.Handlers.Api do json_response(req, %{"workspaces" => patterns}) end - defp handle(req, "GET", ["get_workspaces"], project_id, _access) do - case Orchestration.get_workspaces(project_id) do - {:ok, workspaces} -> - json_response( - req, - Map.new(workspaces, fn {_workspace_id, workspace} -> - {workspace.external_id, - %{ - "name" => workspace.name, - "baseId" => workspace.base_external_id - }} - end) - ) - end - end - defp handle(req, "POST", ["create_workspace"], project_id, access) do case read_arguments(req, %{name: "name"}, %{base_id: "baseId"}) do {:ok, arguments, req} -> @@ -254,58 +238,6 @@ defmodule Coflux.Handlers.Api do end end - defp handle(req, "GET", ["get_pools"], project_id, _access) do - qs = :cowboy_req.parse_qs(req) - workspace_id = get_query_param(qs, "workspaceId") - - case Orchestration.get_pools(project_id, workspace_id) do - {:ok, pools} -> - json_response( - req, - Map.new(pools, fn {pool_name, pool} -> - { - pool_name, - %{ - "provides" => pool.provides, - "modules" => pool.modules, - "launcherType" => if(pool.launcher, do: pool.launcher.type) - } - } - end) - ) - - {:error, :workspace_invalid} -> - json_error_response(req, "workspace_not_found", status: 404) - end - end - - defp handle(req, "GET", ["get_pool"], project_id, _access) do - qs = :cowboy_req.parse_qs(req) - workspace_id = get_query_param(qs, "workspaceId") - pool_name = get_query_param(qs, "pool") - - case Orchestration.get_pools(project_id, workspace_id) do - {:ok, pools} -> - case Map.fetch(pools, pool_name) do - {:ok, pool} -> - json_response( - req, - %{ - "provides" => pool.provides, - "modules" => pool.modules, - "launcher" => format_launcher(pool.launcher) - } - ) - - :error -> - json_error_response(req, "not_found", status: 404) - end - - {:error, :workspace_invalid} -> - json_error_response(req, "workspace_not_found", status: 404) - end - end - defp handle(req, "POST", ["update_pool"], project_id, access) do case read_arguments(req, %{ workspace_id: "workspaceId", @@ -420,47 +352,6 @@ defmodule Coflux.Handlers.Api do end end - defp handle(req, "GET", ["get_manifests"], project_id, _access) do - qs = :cowboy_req.parse_qs(req) - workspace_id = get_query_param(qs, "workspaceId") - - case Orchestration.get_manifests(project_id, workspace_id) do - {:ok, manifests} -> - composed = - Map.new(manifests, fn {module, workflows} -> - targets = - Map.new(workflows, fn {name, workflow} -> - {name, compose_workflow(workflow)} - end) - - {module, targets} - end) - - json_response(req, composed) - - {:error, :workspace_invalid} -> - json_error_response(req, "not_found", status: 404) - end - end - - defp handle(req, "GET", ["get_workflow"], project_id, _access) do - qs = :cowboy_req.parse_qs(req) - workspace_id = get_query_param(qs, "workspaceId") - module = get_query_param(qs, "module") - target_name = get_query_param(qs, "target") - - case Orchestration.get_workflow(project_id, workspace_id, module, target_name) do - {:ok, nil} -> - json_error_response(req, "not_found", status: 404) - - {:ok, workflow} -> - json_response(req, compose_workflow(workflow)) - - {:error, :workspace_invalid} -> - json_error_response(req, "workspace_invalid", status: 404) - end - end - defp handle(req, "POST", ["submit_workflow"], project_id, access) do case read_arguments( req, @@ -595,19 +486,6 @@ defmodule Coflux.Handlers.Api do end end - defp handle(req, "GET", ["get_asset"], project_id, _access) do - qs = :cowboy_req.parse_qs(req) - asset_id = get_query_param(qs, "asset") - - case Orchestration.get_asset(project_id, asset_id) do - {:error, :not_found} -> - json_error_response(req, "not_found", status: 404) - - {:ok, name, entries} -> - json_response(req, compose_asset(name, entries)) - end - end - defp handle(req, "POST", ["create_session"], project_id, access) do case read_arguments( req, @@ -695,39 +573,6 @@ defmodule Coflux.Handlers.Api do end end - defp handle(req, "GET", ["list_tokens"], project_id, access) do - case Orchestration.list_tokens(project_id) do - {:ok, tokens} -> - # Filter tokens based on caller's access: - # - Full access: see all tokens - # - Limited access: see only tokens they created - filtered_tokens = - if access.workspaces == :all do - tokens - else - Enum.filter(tokens, fn token -> - token.created_by_principal_id == access[:principal_id] - end) - end - - json_response(req, %{ - "tokens" => - Enum.map(filtered_tokens, fn token -> - %{ - "id" => token.id, - "externalId" => token.external_id, - "name" => token.name, - "workspaces" => token.workspaces, - "createdAt" => token.created_at, - "expiresAt" => token.expires_at, - "revokedAt" => token.revoked_at, - "createdBy" => format_principal(token.created_by) - } - end) - }) - end - end - defp handle(req, "POST", ["revoke_token"], project_id, access) do case read_arguments(req, %{external_id: "externalId"}) do {:ok, arguments, req} -> @@ -744,7 +589,7 @@ defmodule Coflux.Handlers.Api do if can_revoke do case Orchestration.revoke_token(project_id, token.id) do - :ok -> + {:ok, _external_id} -> :cowboy_req.reply(204, req) {:error, :not_found} -> @@ -760,29 +605,6 @@ defmodule Coflux.Handlers.Api do end end - defp handle(req, "GET", ["get_token"], project_id, _access) do - case read_arguments(req, %{external_id: "externalId"}) do - {:ok, arguments, req} -> - case Orchestration.get_token(project_id, arguments.external_id) do - {:ok, nil} -> - json_error_response(req, "not_found", status: 404) - - {:ok, token} -> - json_response(req, %{ - "id" => token.id, - "externalId" => token.external_id, - "name" => token.name, - "createdAt" => token.created_at, - "expiresAt" => token.expires_at, - "revokedAt" => token.revoked_at - }) - end - - {:error, errors, req} -> - json_error_response(req, "bad_request", details: errors) - end - end - defp handle(req, "POST", ["rotate_epoch"], project_id, %{type: :super}) do case Orchestration.rotate_epoch(project_id) do :ok -> :cowboy_req.reply(204, req) @@ -808,24 +630,6 @@ defmodule Coflux.Handlers.Api do # Helper functions for handle/5 clauses - defp format_launcher(nil), do: nil - - defp format_launcher(launcher) do - base = %{"type" => Atom.to_string(launcher.type), "image" => launcher.image} - - base - |> maybe_put("dockerHost", Map.get(launcher, :docker_host)) - end - - defp maybe_put(map, _key, nil), do: map - defp maybe_put(map, key, value), do: Map.put(map, key, value) - - defp format_principal(nil), do: nil - - defp format_principal(%{type: type, external_id: external_id}) do - %{"type" => type, "externalId" => external_id} - end - defp parse_workspaces(value) when is_list(value) do if Enum.all?(value, &is_binary/1) do {:ok, value} @@ -943,6 +747,7 @@ defmodule Coflux.Handlers.Api do defp parse_docker_launcher(value) do image = Map.get(value, "image") docker_host = Map.get(value, "dockerHost") + server_host = Map.get(value, "serverHost") cond do not is_binary(image) or String.length(image) > 200 -> @@ -951,12 +756,18 @@ defmodule Coflux.Handlers.Api do not is_nil(docker_host) and (not is_binary(docker_host) or String.length(docker_host) > 200) -> {:error, :invalid} + not is_nil(server_host) and (not is_binary(server_host) or String.length(server_host) > 200) -> + {:error, :invalid} + true -> launcher = %{type: :docker, image: image} launcher = if docker_host, do: Map.put(launcher, :docker_host, docker_host), else: launcher + launcher = + if server_host, do: Map.put(launcher, :server_host, server_host), else: launcher + {:ok, launcher} end end @@ -1284,61 +1095,4 @@ defmodule Coflux.Handlers.Api do {:error, :invalid} end end - - defp compose_workflow_cache(cache) do - if cache do - %{ - "params" => cache.params, - "maxAge" => cache.max_age, - "namespace" => cache.namespace, - "version" => cache.version - } - end - end - - defp compose_workflow_defer(defer) do - if defer do - %{"params" => defer.params} - end - end - - defp compose_workflow_retries(retries) do - if retries do - %{ - "limit" => retries.limit, - "delayMin" => retries.delay_min, - "delayMax" => retries.delay_max - } - end - end - - defp compose_workflow(workflow) do - %{ - "parameters" => - Enum.map(workflow.parameters, fn {name, default, annotation} -> - %{"name" => name, "default" => default, "annotation" => annotation} - end), - "waitFor" => workflow.wait_for, - "cache" => compose_workflow_cache(workflow.cache), - "defer" => compose_workflow_defer(workflow.defer), - "delay" => workflow.delay, - "retries" => compose_workflow_retries(workflow.retries), - "requires" => workflow.requires - } - end - - defp compose_asset(name, entries) do - %{ - "name" => name, - "entries" => - Map.new(entries, fn {path, blob_key, size, metadata} -> - {path, - %{ - "blobKey" => blob_key, - "size" => size, - "metadata" => metadata - }} - end) - } - end end diff --git a/server/lib/coflux/handlers/topics/websocket.ex b/server/lib/coflux/handlers/topics/websocket.ex index 5dde7f7..de5c6a1 100644 --- a/server/lib/coflux/handlers/topics/websocket.ex +++ b/server/lib/coflux/handlers/topics/websocket.ex @@ -9,7 +9,7 @@ defmodule Coflux.Handlers.Topics.WebSocket do The server echoes back "v1" on successful auth. The project is determined by COFLUX_PROJECT (if set) or extracted from the - subdomain (if COFLUX_BASE_DOMAIN is set). + subdomain (if COFLUX_PUBLIC_HOST starts with %). """ import Coflux.Handlers.Utils diff --git a/server/lib/coflux/handlers/utils.ex b/server/lib/coflux/handlers/utils.ex index f16d6d5..011f7e5 100644 --- a/server/lib/coflux/handlers/utils.ex +++ b/server/lib/coflux/handlers/utils.ex @@ -8,7 +8,7 @@ defmodule Coflux.Handlers.Utils do - Neither set: Returns `{:error, :not_configured}` - COFLUX_PROJECT only: Returns the configured project (any access method works) - - COFLUX_BASE_DOMAIN only: Extracts project from subdomain (subdomain required) + - COFLUX_PUBLIC_HOST starts with `%`: Extracts project from subdomain - Both set: Extracts from subdomain, but must match COFLUX_PROJECT """ def resolve_project(hostname) do diff --git a/server/lib/coflux/handlers/worker.ex b/server/lib/coflux/handlers/worker.ex index 6f7396f..3720cbe 100644 --- a/server/lib/coflux/handlers/worker.ex +++ b/server/lib/coflux/handlers/worker.ex @@ -9,7 +9,7 @@ defmodule Coflux.Handlers.Worker do The server echoes back "v1" on successful auth. The project is determined by COFLUX_PROJECT (if set) or extracted from the - subdomain (if COFLUX_BASE_DOMAIN is set). + subdomain (if COFLUX_PUBLIC_HOST starts with %). """ import Coflux.Handlers.Utils diff --git a/server/lib/coflux/launchers/docker.ex b/server/lib/coflux/launchers/docker.ex index 61658a4..81cc64b 100644 --- a/server/lib/coflux/launchers/docker.ex +++ b/server/lib/coflux/launchers/docker.ex @@ -1,9 +1,11 @@ defmodule Coflux.DockerLauncher do @docker_api_version "v1.47" + @log_tail_lines 20 + @log_max_bytes 1024 def launch(project_id, workspace_name, session_token, modules, config \\ %{}) do docker_conn = parse_docker_host(config[:docker_host]) - coflux_host = get_coflux_host() + coflux_host = config[:server_host] || Coflux.Config.server_host(project_id) with {:ok, %{"Id" => container_id}} <- create_container( @@ -13,8 +15,7 @@ defmodule Coflux.DockerLauncher do "HostConfig" => %{"NetworkMode" => "host"}, "Cmd" => modules, "Env" => [ - "COFLUX_HOST=#{coflux_host}", - "COFLUX_PROJECT=#{project_id}", + "COFLUX_SERVER_HOST=#{coflux_host}", "COFLUX_WORKSPACE=#{workspace_name}", "COFLUX_SESSION=#{session_token}" ] @@ -48,11 +49,12 @@ defmodule Coflux.DockerLauncher do {:ok, true} else error = build_error(state) - {:ok, false, error} + logs = if error, do: fetch_logs(docker_conn, container_id) + {:ok, false, error, logs} end {:error, :no_such_container} -> - {:ok, false, nil} + {:ok, false, nil, nil} end end @@ -84,13 +86,6 @@ defmodule Coflux.DockerLauncher do end end - # Gets the Coflux host for workers to connect to. - # Priority: COFLUX_PUBLIC_HOST env -> localhost:PORT - defp get_coflux_host() do - System.get_env("COFLUX_PUBLIC_HOST") || - "localhost:#{System.get_env("PORT", "7777")}" - end - defp docker_request(docker_conn, method, path, opts \\ []) do {url, conn_opts} = case docker_conn do @@ -137,6 +132,49 @@ defmodule Coflux.DockerLauncher do end end + defp fetch_logs(docker_conn, container_id) do + case container_logs(docker_conn, container_id, @log_tail_lines) do + {:ok, logs} when logs != "" -> truncate_bytes(logs, @log_max_bytes) + _ -> nil + end + end + + defp container_logs(docker_conn, container_id, tail) do + response = + docker_request(docker_conn, :get, "/containers/#{container_id}/logs", + params: [stdout: true, stderr: true, tail: tail] + ) + + case response.status do + 200 -> {:ok, demux_docker_logs(response.body)} + 404 -> {:error, :no_such_container} + 500 -> {:error, :server_error} + end + end + + # Docker multiplexed stream format: each frame has an 8-byte header + # [stream_type(1), padding(3), size(4, big-endian)] followed by the payload. + defp demux_docker_logs(data) when is_binary(data) do + demux_docker_logs(data, []) + end + + defp demux_docker_logs( + <<_type::8, _pad::24, size::32, payload::binary-size(size), rest::binary>>, + acc + ) do + demux_docker_logs(rest, [acc, payload]) + end + + defp demux_docker_logs(_, acc), do: IO.iodata_to_binary(acc) + + defp truncate_bytes(string, max_bytes) when byte_size(string) <= max_bytes, do: string + + defp truncate_bytes(string, max_bytes) do + string + |> binary_part(byte_size(string) - max_bytes, max_bytes) + |> String.replace(~r/^[^\n]*\n/, "") + end + defp stop_container(docker_conn, container_id) do response = docker_request(docker_conn, :post, "/containers/#{container_id}/stop") diff --git a/server/lib/coflux/orchestration.ex b/server/lib/coflux/orchestration.ex index 15989c2..9016fef 100644 --- a/server/lib/coflux/orchestration.ex +++ b/server/lib/coflux/orchestration.ex @@ -25,6 +25,10 @@ defmodule Coflux.Orchestration do call_server(project_id, :list_tokens) end + def subscribe_tokens(project_id, pid) do + call_server(project_id, {:subscribe_tokens, pid}) + end + def revoke_token(project_id, token_id) do call_server(project_id, {:revoke_token, token_id}) end @@ -87,6 +91,10 @@ defmodule Coflux.Orchestration do call_server(project_id, {:get_manifests, workspace_id}) end + def subscribe_manifests(project_id, workspace_id, pid) do + call_server(project_id, {:subscribe_manifests, workspace_id, pid}) + end + def get_workflow(project_id, workspace_id, module, target_name) do call_server(project_id, {:get_workflow, workspace_id, module, target_name}) end @@ -168,6 +176,10 @@ defmodule Coflux.Orchestration do call_server(project_id, {:subscribe_module, module, workspace_id, pid}) end + def subscribe_queue(project_id, workspace_id, pid) do + call_server(project_id, {:subscribe_queue, workspace_id, pid}) + end + def subscribe_pools(project_id, workspace_id, pid) do call_server(project_id, {:subscribe_pools, workspace_id, pid}) end diff --git a/server/lib/coflux/orchestration/principals.ex b/server/lib/coflux/orchestration/principals.ex index 1fab878..98bbbd4 100644 --- a/server/lib/coflux/orchestration/principals.ex +++ b/server/lib/coflux/orchestration/principals.ex @@ -128,8 +128,31 @@ defmodule Coflux.Orchestration.Principals do token_id: token_id }) + {:ok, created_by} = resolve_created_by(db, created_by) + {:ok, - %{token: token, token_id: token_id, principal_id: principal_id, external_id: external_id}} + %{ + id: token_id, + token: token, + token_id: token_id, + principal_id: principal_id, + external_id: external_id, + name: name, + workspaces: workspaces, + created_at: now, + expires_at: nil, + revoked_at: nil, + created_by: created_by + }} + end + + defp resolve_created_by(_db, nil), do: {:ok, nil} + + defp resolve_created_by(db, principal_id) do + case get_principal(db, principal_id) do + {:ok, {type, ext_id}} -> {:ok, %{type: type, external_id: ext_id}} + {:ok, nil} -> {:ok, nil} + end end defp encode_workspaces(nil), do: nil @@ -150,31 +173,30 @@ defmodule Coflux.Orchestration.Principals do {:ok, rows} = Store.query(db, query, {}) - tokens = - Enum.map(rows, fn {id, external_id, name, workspaces_json, created_at, expires_at, - revoked_at, created_by_principal_id, creator_user_ext_id, - creator_token_ext_id} -> - created_by = - case {creator_user_ext_id, creator_token_ext_id} do - {nil, nil} -> nil - {user_ext_id, nil} -> %{type: "user", external_id: user_ext_id} - {nil, token_ext_id} -> %{type: "token", external_id: token_ext_id} - end - - %{ - id: id, - external_id: external_id, - name: name, - workspaces: decode_workspaces(workspaces_json), - created_at: created_at, - expires_at: expires_at, - revoked_at: revoked_at, - created_by: created_by, - created_by_principal_id: created_by_principal_id - } - end) - - {:ok, tokens} + {:ok, Enum.map(rows, &build_token/1)} + end + + defp build_token({id, external_id, name, workspaces_json, created_at, expires_at, + revoked_at, created_by_principal_id, creator_user_ext_id, + creator_token_ext_id}) do + created_by = + case {creator_user_ext_id, creator_token_ext_id} do + {nil, nil} -> nil + {user_ext_id, nil} -> %{type: "user", external_id: user_ext_id} + {nil, token_ext_id} -> %{type: "token", external_id: token_ext_id} + end + + %{ + id: id, + external_id: external_id, + name: name, + workspaces: decode_workspaces(workspaces_json), + created_at: created_at, + expires_at: expires_at, + revoked_at: revoked_at, + created_by: created_by, + created_by_principal_id: created_by_principal_id + } end @doc """ @@ -212,12 +234,12 @@ defmodule Coflux.Orchestration.Principals do def revoke_token(db, token_id) do now = System.system_time(:second) - case Store.query_one(db, "SELECT id FROM tokens WHERE id = ?1", {token_id}) do - {:ok, {^token_id}} -> + case Store.query_one(db, "SELECT id, external_id FROM tokens WHERE id = ?1", {token_id}) do + {:ok, {^token_id, external_id}} -> {:ok, _} = Store.query(db, "UPDATE tokens SET revoked_at = ?1 WHERE id = ?2", {now, token_id}) - :ok + {:ok, external_id} {:ok, nil} -> {:error, :not_found} diff --git a/server/lib/coflux/orchestration/runs.ex b/server/lib/coflux/orchestration/runs.ex index 82921d3..fb7c694 100644 --- a/server/lib/coflux/orchestration/runs.ex +++ b/server/lib/coflux/orchestration/runs.ex @@ -309,10 +309,10 @@ defmodule Coflux.Orchestration.Runs do end end - {step_number, execution_id, attempt, now, memo_hit, cache_key} = + {step_id, step_number, execution_id, attempt, now, memo_hit, cache_key} = case memoised_execution do {step_number, execution_id, attempt, now} -> - {step_number, execution_id, attempt, now, true, nil} + {nil, step_number, execution_id, attempt, now, true, nil} nil -> cache_key = @@ -386,7 +386,7 @@ defmodule Coflux.Orchestration.Runs do {:ok, execution_id} = insert_execution(db, step_id, attempt, workspace_id, execute_after, now) - {step_number, execution_id, attempt, now, false, cache_key} + {step_id, step_number, execution_id, attempt, now, false, cache_key} end child_added = @@ -399,6 +399,7 @@ defmodule Coflux.Orchestration.Runs do {:ok, %{ + step_id: step_id, step_number: step_number, execution_id: execution_id, attempt: attempt, @@ -573,6 +574,34 @@ defmodule Coflux.Orchestration.Runs do end end + def get_queue_executions(db, workspace_id) do + case query( + db, + """ + SELECT + s.module, + s.target, + r.external_id, + s.number, + e.attempt, + e.execute_after, + e.created_at, + a.created_at, + s.requires_tag_set_id + FROM executions AS e + INNER JOIN steps AS s ON s.id = e.step_id + INNER JOIN runs AS r ON r.id = s.run_id + LEFT JOIN assignments AS a ON a.execution_id = e.id + LEFT JOIN results AS re ON re.execution_id = e.id + WHERE e.workspace_id = ?1 AND re.created_at IS NULL + """, + {workspace_id} + ) do + {:ok, rows} -> + {:ok, rows} + end + end + def get_pending_executions_for_workspace(db, workspace_id) do query( db, diff --git a/server/lib/coflux/orchestration/server.ex b/server/lib/coflux/orchestration/server.ex index 73b8b65..c7433f2 100644 --- a/server/lib/coflux/orchestration/server.ex +++ b/server/lib/coflux/orchestration/server.ex @@ -97,7 +97,13 @@ defmodule Coflux.Orchestration.Server do index_task: nil, # [epoch_id] awaiting Bloom filter build (FIFO) - index_queue: [] + index_queue: [], + + # execution_id -> MapSet of execution_ids that this execution is waiting on + pending_dependencies: %{}, + + # execution_id -> MapSet of execution_ids that are waiting on this execution + dependency_waiters: %{} end def start_link(opts) do @@ -301,6 +307,9 @@ defmodule Coflux.Orchestration.Server do end end) + # Initialize pending dependency tracking for existing unassigned executions + state = initialize_pending_dependencies(state) + # Schedule periodic epoch rotation check Process.send_after(self(), :check_rotation, @rotation_check_interval_ms) @@ -359,6 +368,7 @@ defmodule Coflux.Orchestration.Server do def handle_call({:create_token, name, principal_id, opts}, _from, state) do {:ok, result} = Principals.create_token(state.db, state.project_id, name, principal_id, opts) + state = notify_listeners(state, :tokens, {:token, result.external_id, result}) {:reply, {:ok, result}, state} end @@ -367,9 +377,21 @@ defmodule Coflux.Orchestration.Server do {:reply, {:ok, tokens}, state} end + def handle_call({:subscribe_tokens, pid}, _from, state) do + {:ok, tokens} = Principals.list_tokens(state.db) + {:ok, ref, state} = add_listener(state, :tokens, pid) + {:reply, {:ok, tokens, ref}, state} + end + def handle_call({:revoke_token, token_id}, _from, state) do - result = Principals.revoke_token(state.db, token_id) - {:reply, result, state} + case Principals.revoke_token(state.db, token_id) do + {:ok, external_id} -> + state = notify_listeners(state, :tokens, {:token, external_id, nil}) + {:reply, {:ok, external_id}, state} + + error -> + {:reply, error, state} + end end def handle_call({:get_token, external_id}, _from, state) do @@ -714,6 +736,10 @@ defmodule Coflux.Orchestration.Server do {:modules, ws_ext_id}, {:manifests, manifests} ) + |> notify_listeners( + {:manifests, ws_ext_id}, + {:manifests, manifests} + ) |> notify_listeners( {:targets, ws_ext_id}, {:manifests, @@ -736,10 +762,16 @@ defmodule Coflux.Orchestration.Server do {:ok, workspace_id, _} -> case Manifests.archive_module(state.db, workspace_id, module_name, access[:principal_id]) do :ok -> + ws_ext_id = workspace_external_id(state, workspace_id) + state = state |> notify_listeners( - {:modules, workspace_external_id(state, workspace_id)}, + {:modules, ws_ext_id}, + {:manifest, module_name, nil} + ) + |> notify_listeners( + {:manifests, ws_ext_id}, {:manifest, module_name, nil} ) |> flush_notifications() @@ -760,6 +792,18 @@ defmodule Coflux.Orchestration.Server do end end + def handle_call({:subscribe_manifests, workspace_external_id, pid}, _from, state) do + case require_workspace(state, workspace_external_id) do + {:error, error} -> + {:reply, {:error, error}, state} + + {:ok, workspace_id, _} -> + {:ok, manifests} = Manifests.get_latest_manifests(state.db, workspace_id) + {:ok, ref, state} = add_listener(state, {:manifests, workspace_external_id}, pid) + {:reply, {:ok, manifests, ref}, state} + end + end + def handle_call({:get_workflow, workspace_external_id, module, target_name}, _from, state) do with {:ok, workspace_id, _} <- require_workspace(state, workspace_external_id), {:ok, workflow} <- @@ -882,12 +926,7 @@ defmodule Coflux.Orchestration.Server do notify_listeners( state, {:sessions, workspace_external_id(state, session.workspace_id)}, - {:session, session.external_id, - %{ - connected: true, - executions: session.starting |> MapSet.union(session.executing) |> Enum.count(), - pool_name: get_in(state.workers, [session.worker_id, :pool_name]) - }} + {:session, session.external_id, build_session_data(state, session)} ) state = @@ -935,9 +974,16 @@ defmodule Coflux.Orchestration.Server do def handle_call({:declare_targets, external_id, targets}, _from, state) do session_id = Map.fetch!(state.session_ids, external_id) + state = assign_targets(state, targets, session_id) + + session = Map.fetch!(state.sessions, session_id) + state = state - |> assign_targets(targets, session_id) + |> notify_listeners( + {:sessions, workspace_external_id(state, session.workspace_id)}, + {:session, session.external_id, build_session_data(state, session)} + ) |> flush_notifications() send(self(), :tick) @@ -1018,8 +1064,9 @@ defmodule Coflux.Orchestration.Server do ) do {:ok, %{ + step_id: step_id, step_number: step_number, - execution_id: _execution_id, + execution_id: execution_id, attempt: attempt, created_at: created_at, cache_key: cache_key, @@ -1027,6 +1074,22 @@ defmodule Coflux.Orchestration.Server do memo_hit: memo_hit, child_added: child_added }} -> + # Compute and register pending dependencies for non-memoised executions + {state, pending_dependencies} = + if step_id && !memo_hit do + wait_for = Keyword.get(opts, :wait_for) || [] + + pending_dependencies = + compute_pending_dependencies(state.db, execution_id, wait_for, step_id) + + state = + register_pending_dependencies(state, execution_id, pending_dependencies) + + {state, pending_dependencies} + else + {state, MapSet.new()} + end + group_id = Keyword.get(opts, :group_id) cache = Keyword.get(opts, :cache) delay = Keyword.get(opts, :delay, 0) @@ -1099,6 +1162,12 @@ defmodule Coflux.Orchestration.Server do {:scheduled, execution_external_id, target_name, run.external_id, step_number, attempt, execute_after, created_at} ) + |> notify_listeners( + {:queue, ws_ext_id}, + {:scheduled, execution_external_id, module, target_name, run.external_id, + step_number, attempt, execute_after, created_at, + pending_dependency_external_ids(state.db, pending_dependencies), requires} + ) send(self(), :tick) @@ -1635,6 +1704,35 @@ defmodule Coflux.Orchestration.Server do end end + def handle_call({:subscribe_queue, workspace_external_id, pid}, _from, state) do + case resolve_workspace_external_id(state, workspace_external_id) do + {:error, error} -> + {:reply, {:error, error}, state} + + {:ok, workspace_id} -> + {:ok, executions} = Runs.get_queue_executions(state.db, workspace_id) + + # Resolve tag sets, de-duplicating by ID + tag_sets = + executions + |> Enum.map(&elem(&1, 8)) + |> Enum.reject(&is_nil/1) + |> Enum.uniq() + |> Map.new(fn tag_set_id -> + {:ok, tag_set} = TagSets.get_tag_set(state.db, tag_set_id) + {tag_set_id, tag_set} + end) + + # Build a map of execution_external_id -> [dependency_external_id] + # from the in-memory pending_dependencies (which uses internal IDs) + dependencies = + build_queue_dependencies(state, workspace_id) + + {:ok, ref, state} = add_listener(state, {:queue, workspace_external_id}, pid) + {:reply, {:ok, executions, tag_sets, dependencies, ref}, state} + end + end + def handle_call({:subscribe_pools, workspace_external_id, pid}, _from, state) do case resolve_workspace_external_id(state, workspace_external_id) do {:error, error} -> @@ -1660,7 +1758,7 @@ defmodule Coflux.Orchestration.Server do Map.new( pool_workers, fn {worker_id, worker_external_id, starting_at, started_at, start_error, stopping_at, - stopped_at, stop_error, deactivated_at, error} -> + stopped_at, stop_error, deactivated_at, error, logs} -> worker = Map.get(state.workers, worker_id) connected = @@ -1686,6 +1784,7 @@ defmodule Coflux.Orchestration.Server do stop_error: stop_error, deactivated_at: deactivated_at, error: error, + logs: logs, state: if(worker, do: worker.state), connected: connected }} @@ -1712,19 +1811,7 @@ defmodule Coflux.Orchestration.Server do session.workspace_id == workspace_id end) |> Map.new(fn {_session_id, session} -> - executions = - session.starting - |> MapSet.union(session.executing) - |> Enum.count() - - pool_name = get_in(state.workers, [session.worker_id, :pool_name]) - - {session.external_id, - %{ - connected: !is_nil(session.connection), - executions: executions, - pool_name: pool_name - }} + {session.external_id, build_session_data(state, session)} end) {:ok, ref, state} = add_listener(state, {:sessions, workspace_external_id}, pid) @@ -1988,11 +2075,12 @@ defmodule Coflux.Orchestration.Server do {state, assigned, unassigned} else - # TODO: choose session before resolving arguments? - {:ok, arguments} = Runs.get_step_arguments(state.db, execution.step_id) + # Skip executions whose dependencies haven't resolved yet + has_pending = Map.has_key?(state.pending_dependencies, execution.execution_id) - if arguments_ready?(state.db, execution.wait_for, arguments) && - dependencies_ready?(state.db, execution.execution_id) do + if has_pending do + {state, assigned, unassigned} + else requires = if execution.requires_tag_set_id, do: Map.fetch!(tag_sets, execution.requires_tag_set_id), @@ -2007,6 +2095,8 @@ defmodule Coflux.Orchestration.Server do {:ok, assigned_at} = Runs.assign_execution(state.db, execution.execution_id, session_id) + {:ok, arguments} = Runs.get_step_arguments(state.db, execution.step_id) + # Enrich arguments with resolved references (asset/execution metadata) enriched_arguments = Enum.map(arguments, &build_value(&1, state.db)) @@ -2038,6 +2128,8 @@ defmodule Coflux.Orchestration.Server do {state, [{execution, assigned_at} | assigned], unassigned} end else + {:ok, arguments} = Runs.get_step_arguments(state.db, execution.step_id) + state = case schedule_run( state, @@ -2078,8 +2170,6 @@ defmodule Coflux.Orchestration.Server do {state, assigned, unassigned} end - else - {state, assigned, unassigned} end end end @@ -2125,11 +2215,38 @@ defmodule Coflux.Orchestration.Server do state = Enum.reduce(assigned_groups, state, fn {workspace_id, workspace_executions}, state -> - notify_listeners( - state, - {:modules, workspace_external_id(state, workspace_id)}, + ws_ext_id = workspace_external_id(state, workspace_id) + + all_ext_ids = + workspace_executions + |> Map.values() + |> Enum.reduce(MapSet.new(), &MapSet.union/2) + + queue_executions = + Enum.reduce(assigned, %{}, fn {execution, assigned_at}, acc -> + ext_id = + execution_external_id( + execution.run_external_id, + execution.step_number, + execution.attempt + ) + + if MapSet.member?(all_ext_ids, ext_id) do + Map.put(acc, ext_id, assigned_at) + else + acc + end + end) + + state + |> notify_listeners( + {:modules, ws_ext_id}, {:assigned, workspace_executions} ) + |> notify_listeners( + {:queue, ws_ext_id}, + {:assigned, queue_executions} + ) end) state = @@ -2331,8 +2448,8 @@ defmodule Coflux.Orchestration.Server do {:ok, {:ok, true}} -> state - {:ok, {:ok, false, error}} -> - deactivate_worker(state, worker_id, error) + {:ok, {:ok, false, error, logs}} -> + deactivate_worker(state, worker_id, error, logs) :error -> # TODO: ? @@ -2624,7 +2741,8 @@ defmodule Coflux.Orchestration.Server do {:ok, executions} = Runs.get_execution_descendants(state.db, root_execution_id) executions = - Enum.filter(executions, fn {_exec_id, _module, _assigned_at, _completed_at, exec_workspace_id} -> + Enum.filter(executions, fn {_exec_id, _module, _assigned_at, _completed_at, + exec_workspace_id} -> exec_workspace_id == workspace_id end) @@ -2934,6 +3052,7 @@ defmodule Coflux.Orchestration.Server do ) do {:ok, %{ + step_id: step_id, external_run_id: external_run_id, step_number: step_number, execution_id: execution_id, @@ -2955,6 +3074,23 @@ defmodule Coflux.Orchestration.Server do ws_ext_id = workspace_external_id(state, workspace_id) + # Compute and register pending dependencies + wait_for = Keyword.get(opts, :wait_for) || [] + requires = Keyword.get(opts, :requires) || %{} + + {state, pending_dependencies} = + if step_id do + pending_dependencies = + compute_pending_dependencies(state.db, execution_id, wait_for, step_id) + + state = + register_pending_dependencies(state, execution_id, pending_dependencies) + + {state, pending_dependencies} + else + {state, MapSet.new()} + end + state = state |> put_in([Access.key(:execution_ids), execution_external_id], execution_id) @@ -2971,6 +3107,12 @@ defmodule Coflux.Orchestration.Server do {:scheduled, execution_external_id, target_name, external_run_id, step_number, attempt, execute_after, created_at} ) + |> notify_listeners( + {:queue, ws_ext_id}, + {:scheduled, execution_external_id, module, target_name, external_run_id, step_number, + attempt, execute_after, created_at, + pending_dependency_external_ids(state.db, pending_dependencies), requires} + ) |> notify_listeners( {:targets, ws_ext_id}, {:step, module, target_name, type, external_run_id, step_number, attempt} @@ -3020,6 +3162,21 @@ defmodule Coflux.Orchestration.Server do {ext_id, execution} end) + # Compute and register pending dependencies + pending_dependencies = + compute_pending_dependencies(state.db, execution_id, step.wait_for || [], step.id) + + state = + register_pending_dependencies(state, execution_id, pending_dependencies) + + requires = + if step.requires_tag_set_id do + {:ok, requires} = TagSets.get_tag_set(state.db, step.requires_tag_set_id) + requires + else + %{} + end + execution_external_id = execution_external_id(run.external_id, step.number, attempt) @@ -3042,6 +3199,12 @@ defmodule Coflux.Orchestration.Server do {:scheduled, execution_external_id, step.target, run.external_id, step.number, attempt, execute_after, created_at} ) + |> notify_listeners( + {:queue, ws_ext_id}, + {:scheduled, execution_external_id, step.module, step.target, run.external_id, + step.number, attempt, execute_after, created_at, + pending_dependency_external_ids(state.db, pending_dependencies), requires} + ) |> notify_listeners( {:targets, ws_ext_id}, {:step, step.module, step.target, step.type, run.external_id, step.number, attempt} @@ -3104,6 +3267,9 @@ defmodule Coflux.Orchestration.Server do |> Map.update!(:index_queue, &(&1 ++ [epoch_id])) |> remap_config_ids(id_mappings) |> copy_in_flight_runs() + |> Map.put(:pending_dependencies, %{}) + |> Map.put(:dependency_waiters, %{}) + |> initialize_pending_dependencies() |> maybe_start_index_build() end @@ -4023,7 +4189,11 @@ defmodule Coflux.Orchestration.Server do case Results.record_result(state.db, execution_id, result, created_by) do {:ok, created_at} -> - state = notify_waiting(state, execution_id) + state = + state + |> notify_waiting(execution_id) + |> update_dependencies_on_result(execution_id) + |> unregister_pending_dependencies(execution_id) final = is_result_final?(result) result = build_result(result, state.db) @@ -4071,6 +4241,10 @@ defmodule Coflux.Orchestration.Server do {:module, module, ws_ext_id}, {:completed, execution_external_id} ) + |> notify_listeners( + {:queue, ws_ext_id}, + {:completed, execution_external_id} + ) # TODO: only if there's an execution waiting for this result? send(self(), :tick) @@ -4347,26 +4521,173 @@ defmodule Coflux.Orchestration.Server do end) end - defp arguments_ready?(db, wait_for, arguments) do - Enum.all?(wait_for, fn index -> - references = - case Enum.at(arguments, index) do - {:raw, _, references} -> references - {:blob, _, _, references} -> references - nil -> [] - end + # Build the session data map sent to the Sessions topic. + defp build_session_data(state, session) do + worker = session.worker_id && Map.get(state.workers, session.worker_id) + + # Build targets as %{module => [target_name]} (session.targets values are MapSets) + targets = + Map.new(session.targets, fn {module, target_names} -> + {module, target_names |> MapSet.to_list() |> Enum.sort()} + end) + + %{ + connected: !is_nil(session.connection), + executions: session.starting |> MapSet.union(session.executing) |> Enum.count(), + concurrency: session.concurrency, + pool_name: if(worker, do: worker.pool_name), + targets: targets, + provides: session.provides, + worker_state: if(worker, do: worker.state) + } + end - all_references_ready?(db, references, MapSet.new()) + # Convert a set of internal pending dependency IDs to a list of external IDs. + defp pending_dependency_external_ids(db, pending_dependency_ids) do + pending_dependency_ids + |> Enum.map(fn dependency_id -> + case Runs.get_execution_key(db, dependency_id) do + {:ok, {r, s, a}} -> execution_external_id(r, s, a) + {:error, _} -> nil + end end) + |> Enum.reject(&is_nil/1) + end + + # Send a queue notification with the current pending dependencies for an execution. + # Converts internal execution IDs to external IDs. + defp notify_queue_dependencies(state, execution_id, pending_dependency_ids) do + case Runs.get_execution_key(state.db, execution_id) do + {:ok, {r, s, a}} -> + execution_ext_id = execution_external_id(r, s, a) + + dependency_ext_ids = + pending_dependency_ids + |> Enum.map(fn dependency_id -> + case Runs.get_execution_key(state.db, dependency_id) do + {:ok, {dr, ds, da}} -> execution_external_id(dr, ds, da) + {:error, _} -> nil + end + end) + |> Enum.reject(&is_nil/1) + + {:ok, workspace_id} = Runs.get_workspace_id_for_execution(state.db, execution_id) + ws_ext_id = workspace_external_id(state, workspace_id) + + notify_listeners( + state, + {:queue, ws_ext_id}, + {:dependencies, execution_ext_id, dependency_ext_ids} + ) + + {:error, _} -> + state + end end - defp all_references_ready?(db, references, seen) do - Enum.all?(references, fn - {:execution, run_ext, step_num, attempt} -> + # Build a map of execution_external_id -> [dependency_external_id] for all + # pending executions in the given workspace, for the Queue topic snapshot. + defp build_queue_dependencies(state, workspace_id) do + state.pending_dependencies + |> Enum.reduce(%{}, fn {execution_id, dependency_ids}, acc -> + case Runs.get_workspace_id_for_execution(state.db, execution_id) do + {:ok, ^workspace_id} -> + case Runs.get_execution_key(state.db, execution_id) do + {:ok, {r, s, a}} -> + ext_id = execution_external_id(r, s, a) + + dependency_ext_ids = + dependency_ids + |> Enum.map(fn dependency_id -> + case Runs.get_execution_key(state.db, dependency_id) do + {:ok, {dr, ds, da}} -> execution_external_id(dr, ds, da) + {:error, _} -> nil + end + end) + |> Enum.reject(&is_nil/1) + + Map.put(acc, ext_id, dependency_ext_ids) + + {:error, _} -> + acc + end + + _ -> + acc + end + end) + end + + # Initialize pending_dependencies and dependency_waiters for all existing unassigned executions. + defp initialize_pending_dependencies(state) do + {:ok, executions} = Runs.get_unassigned_executions(state.db) + + Enum.reduce(executions, state, fn execution, state -> + pending_dependencies = + compute_pending_dependencies( + state.db, + execution.execution_id, + execution.wait_for, + execution.step_id + ) + + register_pending_dependencies(state, execution.execution_id, pending_dependencies) + end) + end + + # Compute the set of execution IDs that the given execution is waiting on. + # This covers both argument references (when wait_for is set) and result_dependencies. + defp compute_pending_dependencies(db, execution_id, wait_for, step_id) do + # Collect pending execution IDs from argument references + argument_dependencies = + if wait_for && wait_for != [] do + {:ok, arguments} = Runs.get_step_arguments(db, step_id) + + wait_for + |> Enum.flat_map(fn index -> + case Enum.at(arguments, index) do + {:raw, _, references} -> references + {:blob, _, _, references} -> references + nil -> [] + end + end) + |> collect_pending_execution_ids(db, MapSet.new()) + else + MapSet.new() + end + + # Collect pending execution IDs from result_dependencies + result_dependencies = + case Runs.get_result_dependencies(db, execution_id) do + {:ok, dependencies} -> + Enum.reduce(dependencies, MapSet.new(), fn {dependency_ref_id}, acc -> + {:ok, {run_ext, step_num, attempt, _, _}} = + Runs.get_execution_ref(db, dependency_ref_id) + + case Runs.get_execution_id(db, run_ext, step_num, attempt) do + {:ok, {dependency_execution_id}} when not is_nil(dependency_execution_id) -> + case resolve_result(db, dependency_execution_id) do + {:ok, _} -> acc + {:pending, pending_id} -> MapSet.put(acc, pending_id) + end + + _ -> + acc + end + end) + end + + MapSet.union(argument_dependencies, result_dependencies) + end + + # Walk references and collect execution IDs that are still pending. + defp collect_pending_execution_ids(references, db, seen) do + Enum.reduce(references, MapSet.new(), fn + {:execution, run_ext, step_num, attempt}, acc -> case Runs.get_execution_id(db, run_ext, step_num, attempt) do {:ok, {execution_id}} when not is_nil(execution_id) -> if MapSet.member?(seen, execution_id) do - true + acc else case resolve_result(db, execution_id) do {:ok, {:value, value}} -> @@ -4377,50 +4698,172 @@ defmodule Coflux.Orchestration.Server do _ -> [] end - all_references_ready?(db, inner_refs, MapSet.put(seen, execution_id)) + inner_pending = + collect_pending_execution_ids( + inner_refs, + db, + MapSet.put(seen, execution_id) + ) + + MapSet.union(acc, inner_pending) {:ok, _} -> - true + acc - {:pending, _} -> - false + {:pending, pending_id} -> + MapSet.put(acc, pending_id) end end _ -> - false + acc end - {:fragment, _format, _blob_key, _size, _metadata} -> - true + {:fragment, _format, _blob_key, _size, _metadata}, acc -> + acc - {:asset, _external_id} -> - true + {:asset, _external_id}, acc -> + acc end) end - defp dependencies_ready?(db, execution_id) do - # TODO: also check assets? - case Runs.get_result_dependencies(db, execution_id) do + # Register an execution's pending dependencies in state. + # Only adds entries if there are actual pending dependencies. + defp register_pending_dependencies(state, execution_id, dependencies) do + if MapSet.size(dependencies) == 0 do + state + else + state = + put_in(state, [Access.key(:pending_dependencies), execution_id], dependencies) + + Enum.reduce(dependencies, state, fn dependency_id, state -> + update_in( + state, + [Access.key(:dependency_waiters), Access.key(dependency_id, MapSet.new())], + &MapSet.put(&1, execution_id) + ) + end) + end + end + + # Remove an execution from the dependency tracking (when assigned or completed). + defp unregister_pending_dependencies(state, execution_id) do + case Map.fetch(state.pending_dependencies, execution_id) do {:ok, dependencies} -> - Enum.all?(dependencies, fn {dependency_ref_id} -> - {:ok, {run_ext, step_num, attempt, _, _}} = - Runs.get_execution_ref(db, dependency_ref_id) - - case Runs.get_execution_id(db, run_ext, step_num, attempt) do - {:ok, {dep_execution_id}} when not is_nil(dep_execution_id) -> - case resolve_result(db, dep_execution_id) do - {:ok, _} -> true - {:pending, _} -> false - end + state = + Enum.reduce(dependencies, state, fn dependency_id, state -> + state = + update_in( + state, + [Access.key(:dependency_waiters), Access.key(dependency_id, MapSet.new())], + &MapSet.delete(&1, execution_id) + ) + + # Clean up empty waiter entries + if MapSet.size(state.dependency_waiters[dependency_id] || MapSet.new()) == 0 do + update_in( + state, + [Access.key(:dependency_waiters)], + &Map.delete(&1, dependency_id) + ) + else + state + end + end) + + update_in(state, [Access.key(:pending_dependencies)], &Map.delete(&1, execution_id)) + + :error -> + state + end + end + + # Called when a result is recorded for an execution. Updates dependency_waiters + # and pending_dependencies for any executions that were waiting on this one. + # Handles two cases: + # 1. Result redirects (spawned, deferred, etc.) — follows the chain to find + # the new pending execution. + # 2. Result is a value containing inner execution references (wait_for + # semantics) — extracts any still-pending references from the value. + defp update_dependencies_on_result(state, execution_id) do + case Map.fetch(state.dependency_waiters, execution_id) do + {:ok, waiters} -> + state = + update_in( + state, + [Access.key(:dependency_waiters)], + &Map.delete(&1, execution_id) + ) + + # Determine new pending dependencies that replace this resolved one. + # This handles both redirect chains and inner value references. + new_pending = + case resolve_result(state.db, execution_id) do + {:pending, new_id} when new_id != execution_id -> + MapSet.new([new_id]) + + {:ok, {:value, value}} -> + # The result is a value — check for inner execution references + # that are still pending (needed for wait_for semantics). + inner_references = + case value do + {:raw, _, references} -> references + {:blob, _, _, references} -> references + _ -> [] + end + + collect_pending_execution_ids( + inner_references, + state.db, + MapSet.new([execution_id]) + ) _ -> - # Execution not in current epoch. In-flight runs are copied to the - # active epoch during rotation, so this should only be reachable for - # completed runs whose results are already resolved. - false + MapSet.new() + end + + Enum.reduce(waiters, state, fn waiter_id, state -> + case Map.fetch(state.pending_dependencies, waiter_id) do + {:ok, current} -> + updated = + current + |> MapSet.delete(execution_id) + |> MapSet.union(new_pending) + + # Register waiter with new dependency_waiters entries + state + |> then(fn state -> + Enum.reduce(new_pending, state, fn new_dependency_id, state -> + update_in( + state, + [ + Access.key(:dependency_waiters), + Access.key(new_dependency_id, MapSet.new()) + ], + &MapSet.put(&1, waiter_id) + ) + end) + end) + |> then(fn state -> + if MapSet.size(updated) == 0 do + update_in( + state, + [Access.key(:pending_dependencies)], + &Map.delete(&1, waiter_id) + ) + else + put_in(state, [Access.key(:pending_dependencies), waiter_id], updated) + end + end) + |> notify_queue_dependencies(waiter_id, updated) + + :error -> + state end end) + + :error -> + state end end @@ -4665,8 +5108,8 @@ defmodule Coflux.Orchestration.Server do ) end - defp deactivate_worker(state, worker_id, error) do - {:ok, deactivated_at} = Workers.create_worker_deactivation(state.db, worker_id, error) + defp deactivate_worker(state, worker_id, error, logs \\ nil) do + {:ok, deactivated_at} = Workers.create_worker_deactivation(state.db, worker_id, error, logs) {worker, state} = pop_in(state, [Access.key(:workers), worker_id]) @@ -4675,7 +5118,7 @@ defmodule Coflux.Orchestration.Server do notify_listeners( state, {:pool, workspace_external_id(state, worker.workspace_id), worker.pool_name}, - {:worker_deactivated, worker.external_id, deactivated_at, error} + {:worker_deactivated, worker.external_id, deactivated_at, error, logs} ) end diff --git a/server/lib/coflux/orchestration/workers.ex b/server/lib/coflux/orchestration/workers.ex index efe932d..c943c2a 100644 --- a/server/lib/coflux/orchestration/workers.ex +++ b/server/lib/coflux/orchestration/workers.ex @@ -64,12 +64,20 @@ defmodule Coflux.Orchestration.Workers do end end - def create_worker_deactivation(db, worker_id, error) do + def create_worker_deactivation(db, worker_id, error, logs \\ nil) do now = current_timestamp() + worker_log_id = + case logs do + nil -> nil + "" -> nil + content -> get_or_create_worker_log(db, content) + end + case insert_one(db, :worker_deactivations, %{ worker_id: worker_id, error: error, + worker_log_id: worker_log_id, created_at: now }) do {:ok, _} -> @@ -77,6 +85,40 @@ defmodule Coflux.Orchestration.Workers do end end + def get_worker_logs(db, worker_id) do + case query_one( + db, + """ + SELECT wl.content + FROM worker_deactivations AS wd + INNER JOIN worker_logs AS wl ON wl.id = wd.worker_log_id + WHERE wd.worker_id = ?1 + """, + {worker_id} + ) do + {:ok, {content}} -> {:ok, content} + {:ok, nil} -> {:ok, nil} + end + end + + defp get_or_create_worker_log(db, content) do + hash = :crypto.hash(:sha256, content) + + case query_one(db, "SELECT id FROM worker_logs WHERE hash = ?1", {{:blob, hash}}) do + {:ok, {id}} -> + id + + {:ok, nil} -> + {:ok, id} = + insert_one(db, :worker_logs, %{ + hash: {:blob, hash}, + content: content + }) + + id + end + end + def get_active_workers(db) do case query( db, @@ -128,7 +170,7 @@ defmodule Coflux.Orchestration.Workers do db, """ SELECT w.id, w.external_id, w.created_at, r.created_at, r.error, s.created_at, sr.created_at, sr.error, - d.created_at, d.error + d.created_at, d.error, wl.content FROM workers AS w INNER JOIN pools AS p ON p.id = w.pool_id LEFT JOIN worker_launch_results AS r ON r.worker_id = w.id @@ -141,6 +183,7 @@ defmodule Coflux.Orchestration.Workers do ) LEFT JOIN worker_stop_results AS sr ON sr.worker_stop_id = s.id LEFT JOIN worker_deactivations AS d ON d.worker_id = w.id + LEFT JOIN worker_logs AS wl ON wl.id = d.worker_log_id WHERE p.name = ?1 ORDER BY w.created_at DESC LIMIT ?2 diff --git a/server/lib/coflux/topics/asset.ex b/server/lib/coflux/topics/asset.ex new file mode 100644 index 0000000..c93b29d --- /dev/null +++ b/server/lib/coflux/topics/asset.ex @@ -0,0 +1,35 @@ +defmodule Coflux.Topics.Asset do + use Topical.Topic, route: ["asset", :asset_id] + + alias Coflux.Orchestration + + def connect(params, context) do + {:ok, Map.put(params, :project, context.project)} + end + + def init(params) do + project_id = Map.fetch!(params, :project) + asset_id = Map.fetch!(params, :asset_id) + + case Orchestration.get_asset(project_id, asset_id) do + {:ok, name, entries} -> + value = %{ + name: name, + entries: + Map.new(entries, fn {path, blob_key, size, metadata} -> + {path, + %{ + blobKey: blob_key, + size: size, + metadata: metadata + }} + end) + } + + {:ok, Topic.new(value, %{})} + + {:error, :not_found} -> + {:error, :not_found} + end + end +end diff --git a/server/lib/coflux/topics/manifests.ex b/server/lib/coflux/topics/manifests.ex new file mode 100644 index 0000000..3f686c3 --- /dev/null +++ b/server/lib/coflux/topics/manifests.ex @@ -0,0 +1,100 @@ +defmodule Coflux.Topics.Manifests do + use Topical.Topic, route: ["workspaces", :workspace_id, "manifests"] + + alias Coflux.Orchestration + + def connect(params, context) do + {:ok, Map.put(params, :project, context.project)} + end + + def init(params) do + project_id = Map.fetch!(params, :project) + workspace_id = Map.fetch!(params, :workspace_id) + + case Orchestration.subscribe_manifests(project_id, workspace_id, self()) do + {:ok, manifests, ref} -> + {:ok, Topic.new(build_value(manifests), %{ref: ref})} + + {:error, :workspace_invalid} -> + {:error, :not_found} + end + end + + def handle_info({:topic, _ref, notifications}, topic) do + topic = Enum.reduce(notifications, topic, &process_notification(&2, &1)) + {:ok, topic} + end + + defp process_notification(topic, {:manifests, manifests}) do + Enum.reduce(manifests, topic, fn {module, workflows}, topic -> + update_module(topic, module, workflows) + end) + end + + defp process_notification(topic, {:manifest, module, workflows}) do + update_module(topic, module, workflows) + end + + defp update_module(topic, module, nil) do + Topic.unset(topic, [], module) + end + + defp update_module(topic, module, workflows) do + targets = + Map.new(workflows, fn {name, workflow} -> + {name, build_workflow(workflow)} + end) + + Topic.set(topic, [module], targets) + end + + defp build_value(manifests) do + Map.new(manifests, fn {module, workflows} -> + targets = + Map.new(workflows, fn {name, workflow} -> + {name, build_workflow(workflow)} + end) + + {module, targets} + end) + end + + defp build_workflow(workflow) do + %{ + parameters: + Enum.map(workflow.parameters, fn {name, default, annotation} -> + %{name: name, default: default, annotation: annotation} + end), + waitFor: workflow.wait_for, + cache: build_cache(workflow.cache), + defer: build_defer(workflow.defer), + delay: workflow.delay, + retries: build_retries(workflow.retries), + requires: workflow.requires + } + end + + defp build_cache(nil), do: nil + + defp build_cache(cache) do + %{ + params: cache.params, + maxAge: cache.max_age, + namespace: cache.namespace, + version: cache.version + } + end + + defp build_defer(nil), do: nil + defp build_defer(defer), do: %{params: defer.params} + + defp build_retries(nil), do: nil + + defp build_retries(retries) do + %{ + limit: retries.limit, + delayMin: retries.delay_min, + delayMax: retries.delay_max + } + end +end diff --git a/server/lib/coflux/topics/module.ex b/server/lib/coflux/topics/module.ex index 1275ebe..eb25ad7 100644 --- a/server/lib/coflux/topics/module.ex +++ b/server/lib/coflux/topics/module.ex @@ -1,6 +1,6 @@ defmodule Coflux.Topics.Module do use Topical.Topic, - route: ["modules", :module, :workspace_id] + route: ["workspaces", :workspace_id, "modules", :module] alias Coflux.Orchestration diff --git a/server/lib/coflux/topics/modules.ex b/server/lib/coflux/topics/modules.ex index b51a793..718b5da 100644 --- a/server/lib/coflux/topics/modules.ex +++ b/server/lib/coflux/topics/modules.ex @@ -1,5 +1,5 @@ defmodule Coflux.Topics.Modules do - use Topical.Topic, route: ["modules", :workspace_id] + use Topical.Topic, route: ["workspaces", :workspace_id, "modules"] alias Coflux.Orchestration diff --git a/server/lib/coflux/topics/pool.ex b/server/lib/coflux/topics/pool.ex index 06e3540..fa6856b 100644 --- a/server/lib/coflux/topics/pool.ex +++ b/server/lib/coflux/topics/pool.ex @@ -1,5 +1,5 @@ defmodule Coflux.Topics.Pool do - use Topical.Topic, route: ["pools", :workspace_id, :pool_name] + use Topical.Topic, route: ["workspaces", :workspace_id, "pools", :pool_name] alias Coflux.Orchestration @@ -48,6 +48,7 @@ defmodule Coflux.Topics.Pool do stoppingAt: nil, stopError: nil, deactivatedAt: nil, + logs: nil, state: :active, connected: nil }) @@ -72,11 +73,12 @@ defmodule Coflux.Topics.Pool do defp process_notification( topic, - {:worker_deactivated, worker_external_id, deactivated_at, error} + {:worker_deactivated, worker_external_id, deactivated_at, error, logs} ) do topic |> Topic.set([:workers, worker_external_id, :deactivatedAt], deactivated_at) |> Topic.set([:workers, worker_external_id, :error], error) + |> Topic.set([:workers, worker_external_id, :logs], logs) end defp process_notification(topic, {:worker_state, worker_external_id, state}) do @@ -94,6 +96,7 @@ defmodule Coflux.Topics.Pool do base |> maybe_put(:dockerHost, Map.get(launcher, :docker_host)) + |> maybe_put(:serverHost, Map.get(launcher, :server_host)) end end @@ -121,6 +124,7 @@ defmodule Coflux.Topics.Pool do stopError: worker.stop_error, deactivatedAt: worker.deactivated_at, error: worker.error, + logs: worker.logs, state: worker.state, connected: worker.connected } diff --git a/server/lib/coflux/topics/pools.ex b/server/lib/coflux/topics/pools.ex index 69a2541..648dd8b 100644 --- a/server/lib/coflux/topics/pools.ex +++ b/server/lib/coflux/topics/pools.ex @@ -1,5 +1,5 @@ defmodule Coflux.Topics.Pools do - use Topical.Topic, route: ["pools", :workspace_id] + use Topical.Topic, route: ["workspaces", :workspace_id, "pools"] alias Coflux.Orchestration diff --git a/server/lib/coflux/topics/queue.ex b/server/lib/coflux/topics/queue.ex new file mode 100644 index 0000000..aeddc06 --- /dev/null +++ b/server/lib/coflux/topics/queue.ex @@ -0,0 +1,87 @@ +defmodule Coflux.Topics.Queue do + use Topical.Topic, route: ["workspaces", :workspace_id, "queue"] + + alias Coflux.Orchestration + + def connect(params, context) do + {:ok, Map.put(params, :project, context.project)} + end + + def init(params) do + project_id = Map.fetch!(params, :project) + workspace_id = Map.fetch!(params, :workspace_id) + + case Orchestration.subscribe_queue(project_id, workspace_id, self()) do + {:ok, executions, tag_sets, dependencies, ref} -> + value = + Map.new(executions, fn {module, target, run_external_id, step_number, attempt, + execute_after, created_at, assigned_at, requires_tag_set_id} -> + execution_id = "#{run_external_id}:#{step_number}:#{attempt}" + + requires = + if requires_tag_set_id, + do: Map.fetch!(tag_sets, requires_tag_set_id), + else: %{} + + {execution_id, + %{ + module: module, + target: target, + runId: run_external_id, + stepId: "#{run_external_id}:#{step_number}", + stepNumber: step_number, + attempt: attempt, + executeAfter: execute_after, + createdAt: created_at, + assignedAt: assigned_at, + dependencies: Map.get(dependencies, execution_id, []), + requires: requires + }} + end) + + {:ok, Topic.new(value, %{ref: ref})} + + {:error, :workspace_invalid} -> + {:error, :not_found} + end + end + + def handle_info({:topic, _ref, notifications}, topic) do + topic = Enum.reduce(notifications, topic, &process_notification/2) + {:ok, topic} + end + + defp process_notification( + {:scheduled, execution_external_id, module, target, run_external_id, step_number, + attempt, execute_after, created_at, dependencies, requires}, + topic + ) do + Topic.set(topic, [execution_external_id], %{ + module: module, + target: target, + runId: run_external_id, + stepId: "#{run_external_id}:#{step_number}", + stepNumber: step_number, + attempt: attempt, + executeAfter: execute_after, + createdAt: created_at, + assignedAt: nil, + dependencies: dependencies, + requires: requires + }) + end + + defp process_notification({:dependencies, execution_external_id, dependency_ids}, topic) do + Topic.set(topic, [execution_external_id, :dependencies], dependency_ids) + end + + defp process_notification({:assigned, executions}, topic) do + Enum.reduce(executions, topic, fn {execution_external_id, assigned_at}, topic -> + Topic.set(topic, [execution_external_id, :assignedAt], assigned_at) + end) + end + + defp process_notification({:completed, execution_external_id}, topic) do + Topic.unset(topic, [], execution_external_id) + end +end diff --git a/server/lib/coflux/topics/run.ex b/server/lib/coflux/topics/run.ex index bd98a46..f676013 100644 --- a/server/lib/coflux/topics/run.ex +++ b/server/lib/coflux/topics/run.ex @@ -1,5 +1,5 @@ defmodule Coflux.Topics.Run do - use Topical.Topic, route: ["runs", :run_id, :workspace_id] + use Topical.Topic, route: ["workspaces", :workspace_id, "runs", :run_id] alias Coflux.Orchestration diff --git a/server/lib/coflux/topics/search.ex b/server/lib/coflux/topics/search.ex index 8767ac9..f31b759 100644 --- a/server/lib/coflux/topics/search.ex +++ b/server/lib/coflux/topics/search.ex @@ -1,7 +1,7 @@ defmodule Coflux.Topics.Search do alias Coflux.Orchestration - use Topical.Topic, route: ["search", :workspace_id] + use Topical.Topic, route: ["workspaces", :workspace_id, "search"] def connect(params, context) do {:ok, Map.put(params, :project, context.project)} diff --git a/server/lib/coflux/topics/sessions.ex b/server/lib/coflux/topics/sessions.ex index 2aab637..16f5f49 100644 --- a/server/lib/coflux/topics/sessions.ex +++ b/server/lib/coflux/topics/sessions.ex @@ -1,5 +1,5 @@ defmodule Coflux.Topics.Sessions do - use Topical.Topic, route: ["sessions", :workspace_id] + use Topical.Topic, route: ["workspaces", :workspace_id, "sessions"] alias Coflux.Orchestration @@ -46,7 +46,11 @@ defmodule Coflux.Topics.Sessions do %{ connected: session.connected, executions: session.executions, - poolName: session.pool_name + concurrency: session.concurrency, + poolName: session.pool_name, + targets: session.targets, + provides: session.provides, + workerState: session.worker_state } end end diff --git a/server/lib/coflux/topics/tokens.ex b/server/lib/coflux/topics/tokens.ex new file mode 100644 index 0000000..f62ebd3 --- /dev/null +++ b/server/lib/coflux/topics/tokens.ex @@ -0,0 +1,49 @@ +defmodule Coflux.Topics.Tokens do + use Topical.Topic, route: ["tokens"] + + import Coflux.TopicUtils, only: [build_principal: 1] + + alias Coflux.Orchestration + + def connect(params, context) do + {:ok, Map.put(params, :project, context.project)} + end + + def init(params) do + project_id = Map.fetch!(params, :project) + + {:ok, tokens, ref} = Orchestration.subscribe_tokens(project_id, self()) + + active_tokens = + tokens + |> Enum.filter(fn token -> is_nil(token.revoked_at) end) + |> Map.new(fn token -> {token.external_id, build_token(token)} end) + + {:ok, Topic.new(active_tokens, %{ref: ref})} + end + + def handle_info({:topic, _ref, notifications}, topic) do + topic = Enum.reduce(notifications, topic, &process_notification(&2, &1)) + {:ok, topic} + end + + defp process_notification(topic, {:token, external_id, nil}) do + Topic.unset(topic, [], external_id) + end + + defp process_notification(topic, {:token, external_id, token}) do + Topic.set(topic, [external_id], build_token(token)) + end + + defp build_token(token) do + %{ + id: token.id, + externalId: token.external_id, + name: token.name, + workspaces: token.workspaces, + createdAt: token.created_at, + expiresAt: token.expires_at, + createdBy: build_principal(token.created_by) + } + end +end diff --git a/server/lib/coflux/topics/workflow.ex b/server/lib/coflux/topics/workflow.ex index 90002e7..c56efaa 100644 --- a/server/lib/coflux/topics/workflow.ex +++ b/server/lib/coflux/topics/workflow.ex @@ -1,6 +1,6 @@ defmodule Coflux.Topics.Workflow do use Topical.Topic, - route: ["workflows", :module, :target, :workspace_id] + route: ["workspaces", :workspace_id, "workflows", :module, :target] import Coflux.TopicUtils diff --git a/server/priv/migrations/orchestration/1.sql b/server/priv/migrations/orchestration/1.sql index ff28af9..fc4b6ef 100644 --- a/server/priv/migrations/orchestration/1.sql +++ b/server/priv/migrations/orchestration/1.sql @@ -197,11 +197,19 @@ CREATE TABLE worker_stop_results ( FOREIGN KEY (worker_stop_id) REFERENCES worker_stops ) STRICT; +CREATE TABLE worker_logs ( + id INTEGER PRIMARY KEY, + hash BLOB NOT NULL UNIQUE, + content TEXT NOT NULL +) STRICT; + CREATE TABLE worker_deactivations ( worker_id INTEGER PRIMARY KEY, error TEXT, + worker_log_id INTEGER, created_at INTEGER NOT NULL, - FOREIGN KEY (worker_id) REFERENCES workers + FOREIGN KEY (worker_id) REFERENCES workers, + FOREIGN KEY (worker_log_id) REFERENCES worker_logs ON DELETE RESTRICT ) STRICT; CREATE TABLE sessions ( diff --git a/tests/support/server.py b/tests/support/server.py index 8fde9b5..a9ec723 100644 --- a/tests/support/server.py +++ b/tests/support/server.py @@ -62,7 +62,7 @@ def _start_local(self, timeout): "HOME": os.environ.get("HOME", "/tmp"), "PORT": str(self.port), "COFLUX_DATA_DIR": self.data_dir, - "COFLUX_BASE_DOMAIN": "localhost", + "COFLUX_PUBLIC_HOST": "%.localhost:" + str(self.port), "COFLUX_REQUIRE_AUTH": "false", "COFLUX_SUPER_TOKEN_HASH": hashlib.sha256(SUPER_TOKEN.encode()).hexdigest(), } @@ -89,7 +89,7 @@ def _start_docker(self, image, timeout): "-v", f"{self.data_dir}:/data", "-e", - "COFLUX_BASE_DOMAIN=localhost", + "COFLUX_PUBLIC_HOST=%.localhost:7777", "-e", "COFLUX_REQUIRE_AUTH=false", "-e",