Skip to content

Commit 9afea2b

Browse files
authored
Improve mass output downloads (#135)
* AllRuns takes precedence over any specified number of runs * Return unmatched identifiers while filtering sub jobs instead of returning an error * Get the complete run details while downloading outputs if a simplified run object that doesn't contain the workflow version ID was used initially * Do not error out on the first failed run and print extra details in the download status messages * Return the destination path while downloading outputs and pass the individual download error details to the caller
1 parent 02587ae commit 9afea2b

File tree

6 files changed

+65
-42
lines changed

6 files changed

+65
-42
lines changed

cmd/execute/execute.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,11 +295,11 @@ func run(cfg *Config) error {
295295
if err != nil {
296296
return fmt.Errorf("failed to get run: %w", err)
297297
}
298-
results, err := actions.DownloadRunOutput(client, run, cfg.NodesToDownload, []string{}, cfg.OutputDirectory)
298+
results, runDir, err := actions.DownloadRunOutput(client, run, cfg.NodesToDownload, []string{}, cfg.OutputDirectory)
299299
if err != nil {
300300
return fmt.Errorf("failed to download run outputs: %w", err)
301301
}
302-
actions.PrintDownloadResults(results)
302+
actions.PrintDownloadResults(results, *run.ID, runDir)
303303
}
304304

305305
return nil

cmd/output/output.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,24 @@ func run(cfg *Config) error {
8383
files := cfg.GetFiles()
8484
path := cfg.GetOutputPath()
8585

86-
for _, run := range runs {
87-
results, err := actions.DownloadRunOutput(client, &run, nodes, files, path)
86+
var errorCount int
87+
for i, run := range runs {
88+
fmt.Fprintf(os.Stderr, "Downloading outputs for run %s\n", run.ID.String())
89+
results, runDir, err := actions.DownloadRunOutput(client, &run, nodes, files, path)
8890
if err != nil {
89-
return fmt.Errorf("failed to download run output: %w", err)
91+
fmt.Fprintf(os.Stderr, "Warning: failed to download run output for run %s: %v\n", run.ID.String(), err)
92+
errorCount++
9093
}
91-
actions.PrintDownloadResults(results)
94+
95+
actions.PrintDownloadResults(results, *run.ID, runDir)
96+
// Print a newline visual separation if there are more runs to process
97+
if i < len(runs)-1 {
98+
fmt.Fprintln(os.Stderr)
99+
}
100+
}
101+
102+
if errorCount == len(runs) {
103+
return fmt.Errorf("failed to download outputs for all runs")
92104
}
93105
return nil
94106
}

cmd/stop/stop.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,12 @@ func run(cfg *Config) error {
124124
}
125125
subJobs = trickest.LabelSubJobs(subJobs, *version)
126126

127-
matchingSubJobs, err := trickest.FilterSubJobs(subJobs, cfg.Nodes)
128-
if err != nil {
129-
return fmt.Errorf("no running nodes matching your query were found in the run %s: %w", run.ID.String(), err)
127+
matchingSubJobs, unmatchedNodes := trickest.FilterSubJobs(subJobs, cfg.Nodes)
128+
if len(matchingSubJobs) == 0 {
129+
return fmt.Errorf("no running nodes matching your query %q were found in the run %s", strings.Join(cfg.Nodes, ","), run.ID.String())
130+
}
131+
if len(unmatchedNodes) > 0 {
132+
fmt.Fprintf(os.Stderr, "Warning: The following nodes were not found in run %s: %s. Proceeding with the remaining nodes\n", run.ID.String(), strings.Join(unmatchedNodes, ","))
130133
}
131134

132135
for _, subJob := range matchingSubJobs {

pkg/actions/output.go

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"os"
7+
"strings"
78
"sync"
89

910
"github.com/google/uuid"
@@ -19,7 +20,7 @@ type DownloadResult struct {
1920
Error error
2021
}
2122

22-
func PrintDownloadResults(results []DownloadResult) {
23+
func PrintDownloadResults(results []DownloadResult, runID uuid.UUID, destinationPath string) {
2324
successCount := 0
2425
failureCount := 0
2526
for _, result := range results {
@@ -28,46 +29,58 @@ func PrintDownloadResults(results []DownloadResult) {
2829
} else {
2930
failureCount++
3031
if result.FileName != "" {
31-
fmt.Fprintf(os.Stderr, "Warning: Failed to download file %q for node %q: %v\n", result.FileName, result.SubJobName, result.Error)
32+
fmt.Fprintf(os.Stderr, "Warning: Failed to download file %q for node %q in run %s: %v\n", result.FileName, result.SubJobName, runID.String(), result.Error)
3233
} else {
33-
fmt.Fprintf(os.Stderr, "Warning: Failed to download output for node %q: %v\n", result.SubJobName, result.Error)
34+
fmt.Fprintf(os.Stderr, "Warning: Failed to download output for node %q in run %s: %v\n", result.SubJobName, runID.String(), result.Error)
3435
}
3536
}
3637
}
3738

3839
if failureCount > 0 {
39-
fmt.Fprintf(os.Stderr, "Download completed with %d successful and %d failed downloads\n", successCount, failureCount)
40+
fmt.Fprintf(os.Stderr, "Download completed with %d successful and %d failed downloads for run %s into %q\n", successCount, failureCount, runID.String(), destinationPath+"/")
4041
} else if successCount > 0 {
41-
fmt.Printf("Successfully downloaded outputs for %d sub-jobs\n", successCount)
42+
fmt.Printf("Successfully downloaded %d outputs from run %s into %q\n", successCount, runID.String(), destinationPath+"/")
4243
}
4344
}
4445

45-
func DownloadRunOutput(client *trickest.Client, run *trickest.Run, nodes []string, files []string, destinationPath string) ([]DownloadResult, error) {
46+
// DownloadRunOutput downloads the outputs for the specified nodes in the run
47+
// Returns the download result summary, the directory where the outputs were saved, and an error if _all_ of the downloads failed
48+
func DownloadRunOutput(client *trickest.Client, run *trickest.Run, nodes []string, files []string, destinationPath string) ([]DownloadResult, string, error) {
4649
if run.Status == "PENDING" || run.Status == "SUBMITTED" {
47-
return nil, fmt.Errorf("run %s has not started yet (status: %s)", run.ID.String(), run.Status)
50+
return nil, "", fmt.Errorf("run %s has not started yet (status: %s)", run.ID.String(), run.Status)
4851
}
4952

5053
ctx := context.Background()
5154

5255
subJobs, err := client.GetSubJobs(ctx, *run.ID)
5356
if err != nil {
54-
return nil, fmt.Errorf("failed to get subjobs for run %s: %w", run.ID.String(), err)
57+
return nil, "", fmt.Errorf("failed to get subjobs for run %s: %w", run.ID.String(), err)
5558
}
5659

60+
// If the run was retrieved through the GetRuns() method, the WorkflowVersionInfo field will be nil
61+
if run.WorkflowVersionInfo == nil {
62+
run, err = client.GetRun(ctx, *run.ID)
63+
if err != nil {
64+
return nil, "", fmt.Errorf("failed to get run details for run %s: %w", run.ID.String(), err)
65+
}
66+
}
5767
version, err := client.GetWorkflowVersion(ctx, *run.WorkflowVersionInfo)
5868
if err != nil {
59-
return nil, fmt.Errorf("could not get workflow version for run %s: %w", run.ID.String(), err)
69+
return nil, "", fmt.Errorf("could not get workflow version for run %s: %w", run.ID.String(), err)
6070
}
6171
subJobs = trickest.LabelSubJobs(subJobs, *version)
6272

63-
matchingSubJobs, err := trickest.FilterSubJobs(subJobs, nodes)
64-
if err != nil {
65-
return nil, fmt.Errorf("no completed node outputs matching your query were found in the run %s: %w", run.ID.String(), err)
73+
matchingSubJobs, unmatchedNodes := trickest.FilterSubJobs(subJobs, nodes)
74+
if len(matchingSubJobs) == 0 {
75+
return nil, "", fmt.Errorf("no completed node outputs matching your query %q were found in the run %s", strings.Join(nodes, ","), run.ID.String())
76+
}
77+
if len(unmatchedNodes) > 0 {
78+
fmt.Fprintf(os.Stderr, "Warning: The following nodes were not found in run %s: %s. Proceeding with the remaining nodes\n", run.ID.String(), strings.Join(unmatchedNodes, ","))
6679
}
6780

6881
runDir, err := filesystem.CreateRunDir(destinationPath, *run)
6982
if err != nil {
70-
return nil, fmt.Errorf("failed to create directory for run %s: %w", run.ID.String(), err)
83+
return nil, "", fmt.Errorf("failed to create directory for run %s: %w", run.ID.String(), err)
7184
}
7285

7386
var allResults []DownloadResult
@@ -84,13 +97,7 @@ func DownloadRunOutput(client *trickest.Client, run *trickest.Run, nodes []strin
8497
allResults = append(allResults, results...)
8598
}
8699

87-
// If all subjobs failed, return an error
88-
if errCount == len(allResults) && len(allResults) > 0 {
89-
return allResults, fmt.Errorf("failed to download outputs for all nodes")
90-
}
91-
92-
// If only some failed, return results but no error
93-
return allResults, nil
100+
return allResults, runDir, nil
94101
}
95102

96103
func downloadSubJobOutput(client *trickest.Client, savePath string, subJob *trickest.SubJob, files []string, runID *uuid.UUID, isModule bool) []DownloadResult {

pkg/config/workflowrunspec.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (s WorkflowRunSpec) GetRuns(ctx context.Context, client *trickest.Client) (
4949
}
5050

5151
limit := 0 // 0 means get all runs
52-
if s.NumberOfRuns > 0 {
52+
if s.NumberOfRuns > 0 && !s.AllRuns {
5353
limit = s.NumberOfRuns
5454
}
5555

pkg/trickest/subjob.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -172,35 +172,36 @@ func LabelSubJobs(subJobs []SubJob, version WorkflowVersion) []SubJob {
172172
}
173173

174174
// FilterSubJobs filters the subjobs based on the identifiers (label or name (node ID))
175-
func FilterSubJobs(subJobs []SubJob, identifiers []string) ([]SubJob, error) {
175+
// Returns matched subjobs and unmatched identifiers
176+
func FilterSubJobs(subJobs []SubJob, identifiers []string) ([]SubJob, []string) {
176177
if len(identifiers) == 0 {
177178
return subJobs, nil
178179
}
179180

180-
var foundNodes []string
181181
var matchingSubJobs []SubJob
182+
matchedIdentifiers := make(map[string]bool)
182183

183184
for _, subJob := range subJobs {
184185
labelExists := slices.Contains(identifiers, subJob.Label)
185186
nameExists := slices.Contains(identifiers, subJob.Name)
186187

187-
if labelExists {
188-
foundNodes = append(foundNodes, subJob.Label)
189-
}
190-
if nameExists {
191-
foundNodes = append(foundNodes, subJob.Name)
192-
}
193-
194188
if labelExists || nameExists {
195189
matchingSubJobs = append(matchingSubJobs, subJob)
190+
if labelExists {
191+
matchedIdentifiers[subJob.Label] = true
192+
}
193+
if nameExists {
194+
matchedIdentifiers[subJob.Name] = true
195+
}
196196
}
197197
}
198198

199+
var unmatchedIdentifiers []string
199200
for _, identifier := range identifiers {
200-
if !slices.Contains(foundNodes, identifier) {
201-
return nil, fmt.Errorf("subjob with name or label %s not found", identifier)
201+
if !matchedIdentifiers[identifier] {
202+
unmatchedIdentifiers = append(unmatchedIdentifiers, identifier)
202203
}
203204
}
204205

205-
return matchingSubJobs, nil
206+
return matchingSubJobs, unmatchedIdentifiers
206207
}

0 commit comments

Comments
 (0)