diff --git a/go.mod b/go.mod index cd9f656e3f7d..4556a7401de7 100644 --- a/go.mod +++ b/go.mod @@ -187,10 +187,9 @@ require ( github.com/ajg/form v1.5.1 // indirect github.com/andybalholm/brotli v1.1.0 // indirect github.com/awalterschulze/gographviz v0.0.0-20200901124122-0eecad45bd71 // indirect - github.com/aws/aws-sdk-go v1.47.11 // indirect github.com/aws/aws-sdk-go-v2 v1.36.3 // indirect - github.com/aws/aws-sdk-go-v2/config v1.29.9 // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.17.62 // indirect + github.com/aws/aws-sdk-go-v2/config v1.29.9 + github.com/aws/aws-sdk-go-v2/credentials v1.17.62 github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 // indirect @@ -199,7 +198,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/ecrpublic v1.13.8 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.25.1 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.33.17 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.17 github.com/aws/smithy-go v1.22.2 // indirect github.com/awslabs/amazon-ecr-credential-helper/ecr-login v0.0.0-20220706184558-ce46abcd012b // indirect github.com/beorn7/perks v1.0.1 // indirect diff --git a/go.sum b/go.sum index 3310b3af500f..925b72ab914f 100644 --- a/go.sum +++ b/go.sum @@ -137,8 +137,6 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkY github.com/awalterschulze/gographviz v0.0.0-20200901124122-0eecad45bd71 h1:m3N1Fv5vE5IcxuTOGFGGV0grrVFHV8UY2SV0wSBXAC8= github.com/awalterschulze/gographviz v0.0.0-20200901124122-0eecad45bd71/go.mod h1:/ynarkO/43wP/JM2Okn61e8WFMtdbtA8he7GJxW+SFM= github.com/aws/aws-sdk-go v1.45.1/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= -github.com/aws/aws-sdk-go v1.47.11 h1:Dol+MA+hQblbnXUI3Vk9qvoekU6O1uDEuAItezjiWNQ= -github.com/aws/aws-sdk-go v1.47.11/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= github.com/aws/aws-sdk-go-v2 v1.16.7/go.mod h1:6CpKuLXg2w7If3ABZCl/qZ6rEgwtjZTn4eAf4RcEyuw= github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM= github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg= diff --git a/version.go b/version.go index a4cae5894039..698a8ce539d3 100644 --- a/version.go +++ b/version.go @@ -28,7 +28,7 @@ func ImageTag() string { // GetVersion returns the version information func GetVersion() wfv1.Version { - var versionStr = "v3.5.14-atlan-1.2.8" + var versionStr = "v3.5.14-atlan-1.3" return wfv1.Version{ Version: versionStr, BuildDate: buildDate, diff --git a/workflow/artifacts/s3/errors.go b/workflow/artifacts/s3/errors.go index eafe6f1810e9..43b17b03e1c1 100644 --- a/workflow/artifacts/s3/errors.go +++ b/workflow/artifacts/s3/errors.go @@ -1,7 +1,6 @@ package s3 import ( - argos3 "github.com/argoproj/pkg/s3" log "github.com/sirupsen/logrus" "github.com/argoproj/argo-workflows/v3/util/errors" @@ -27,7 +26,7 @@ func isTransientS3Err(err error) bool { return false } for _, transientErrCode := range s3TransientErrorCodes { - if argos3.IsS3ErrCode(err, transientErrCode) { + if IsS3ErrCode(err, transientErrCode) { log.Errorf("Transient S3 error: %v", err) return true } diff --git a/workflow/artifacts/s3/s3.go b/workflow/artifacts/s3/s3.go index 3255bd0eb874..fa41b9192d4b 100644 --- a/workflow/artifacts/s3/s3.go +++ b/workflow/artifacts/s3/s3.go @@ -3,14 +3,25 @@ package s3 import ( "context" "crypto/x509" + "encoding/base64" + "encoding/json" "errors" "fmt" "io" + "net/http" "os" + "path" + "path/filepath" "strings" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials/stscreds" + "github.com/aws/aws-sdk-go-v2/service/sts" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/minio/minio-go/v7/pkg/encrypt" + "github.com/minio/minio-go/v7/pkg/sse" + "github.com/argoproj/pkg/file" - argos3 "github.com/argoproj/pkg/s3" "github.com/minio/minio-go/v7" log "github.com/sirupsen/logrus" "k8s.io/client-go/util/retry" @@ -23,6 +34,85 @@ import ( executorretry "github.com/argoproj/argo-workflows/v3/workflow/executor/retry" ) +const nullIAMEndpoint = "" + +type S3Client interface { + // PutFile puts a single file to a bucket at the specified key + PutFile(bucket, key, path string) error + + // PutDirectory puts a complete directory into a bucket key prefix, with each file in the directory + // a separate key in the bucket. + PutDirectory(bucket, key, path string) error + + // GetFile downloads a file to a local file path + GetFile(bucket, key, path string) error + + // OpenFile opens a file for much lower disk and memory usage that GetFile + OpenFile(bucket, key string) (io.ReadCloser, error) + + // KeyExists checks if object exists (and if we have permission to access) + KeyExists(bucket, key string) (bool, error) + + // Delete deletes the key from the bucket + Delete(bucket, key string) error + + // GetDirectory downloads a directory to a local file path + GetDirectory(bucket, key, path string) error + + // ListDirectory list the contents of a directory/bucket + ListDirectory(bucket, keyPrefix string) ([]string, error) + + // IsDirectory tests if the key is acting like an s3 directory + IsDirectory(bucket, key string) (bool, error) + + // BucketExists returns whether a bucket exists + BucketExists(bucket string) (bool, error) + + // MakeBucket creates a bucket with name bucketName and options opts + MakeBucket(bucketName string, opts minio.MakeBucketOptions) error +} + +type EncryptOpts struct { + KmsKeyId string + KmsEncryptionContext string + Enabled bool + ServerSideCustomerKey string +} + +// AddressingStyle is a type of bucket (and also its content) addressing used by the S3 client and supported by the server +type AddressingStyle int + +const ( + AutoDetectStyle AddressingStyle = iota + PathStyle + VirtualHostedStyle +) + +type S3ClientOpts struct { + Endpoint string + AddressingStyle AddressingStyle + Region string + Secure bool + Transport http.RoundTripper + AccessKey string + SecretKey string + SessionToken string + Trace bool + RoleARN string + RoleSessionName string + UseSDKCreds bool + EncryptOpts EncryptOpts + SendContentMd5 bool +} + +type s3client struct { + S3ClientOpts + minioClient *minio.Client + ctx context.Context +} + +var _ S3Client = &s3client{} + // ArtifactDriver is a driver for AWS S3 type ArtifactDriver struct { Endpoint string @@ -31,6 +121,7 @@ type ArtifactDriver struct { TrustedCA string AccessKey string SecretKey string + SessionToken string RoleARN string UseSDKCreds bool Context context.Context @@ -43,17 +134,18 @@ type ArtifactDriver struct { var _ artifactscommon.ArtifactDriver = &ArtifactDriver{} // newS3Client instantiates a new S3 client object. -func (s3Driver *ArtifactDriver) newS3Client(ctx context.Context) (argos3.S3Client, error) { - opts := argos3.S3ClientOpts{ - Endpoint: s3Driver.Endpoint, - Region: s3Driver.Region, - Secure: s3Driver.Secure, - AccessKey: s3Driver.AccessKey, - SecretKey: s3Driver.SecretKey, - RoleARN: s3Driver.RoleARN, - Trace: os.Getenv(common.EnvVarArgoTrace) == "1", - UseSDKCreds: s3Driver.UseSDKCreds, - EncryptOpts: argos3.EncryptOpts{ +func (s3Driver *ArtifactDriver) newS3Client(ctx context.Context) (S3Client, error) { + opts := S3ClientOpts{ + Endpoint: s3Driver.Endpoint, + Region: s3Driver.Region, + Secure: s3Driver.Secure, + AccessKey: s3Driver.AccessKey, + SecretKey: s3Driver.SecretKey, + SessionToken: s3Driver.SessionToken, + RoleARN: s3Driver.RoleARN, + Trace: os.Getenv(common.EnvVarArgoTrace) == "1", + UseSDKCreds: s3Driver.UseSDKCreds, + EncryptOpts: EncryptOpts{ KmsKeyId: s3Driver.KmsKeyId, KmsEncryptionContext: s3Driver.KmsEncryptionContext, Enabled: s3Driver.EnableEncryption, @@ -61,7 +153,7 @@ func (s3Driver *ArtifactDriver) newS3Client(ctx context.Context) (argos3.S3Clien }, } - if tr, err := argos3.GetDefaultTransport(opts); err == nil { + if tr, err := GetDefaultTransport(opts); err == nil { if s3Driver.Secure && s3Driver.TrustedCA != "" { // Trust only the provided root CA pool := x509.NewCertPool() @@ -71,7 +163,7 @@ func (s3Driver *ArtifactDriver) newS3Client(ctx context.Context) (argos3.S3Clien opts.Transport = tr } - return argos3.NewS3Client(ctx, opts) + return NewS3Client(ctx, opts) } // Load downloads artifacts from S3 compliant storage @@ -95,12 +187,12 @@ func (s3Driver *ArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) // loadS3Artifact downloads artifacts from an S3 compliant storage // returns true if the download is completed or can't be retried (non-transient error) // returns false if it can be retried (transient error) -func loadS3Artifact(s3cli argos3.S3Client, inputArtifact *wfv1.Artifact, path string) (bool, error) { +func loadS3Artifact(s3cli S3Client, inputArtifact *wfv1.Artifact, path string) (bool, error) { origErr := s3cli.GetFile(inputArtifact.S3.Bucket, inputArtifact.S3.Key, path) if origErr == nil { return true, nil } - if !argos3.IsS3ErrCode(origErr, "NoSuchKey") { + if !IsS3ErrCode(origErr, "NoSuchKey") { return !isTransientS3Err(origErr), fmt.Errorf("failed to get file: %v", origErr) } // If we get here, the error was a NoSuchKey. The key might be an s3 "directory" @@ -131,12 +223,12 @@ func (s3Driver *ArtifactDriver) OpenStream(inputArtifact *wfv1.Artifact) (io.Rea } -func streamS3Artifact(s3cli argos3.S3Client, inputArtifact *wfv1.Artifact) (io.ReadCloser, error) { +func streamS3Artifact(s3cli S3Client, inputArtifact *wfv1.Artifact) (io.ReadCloser, error) { stream, origErr := s3cli.OpenFile(inputArtifact.S3.Bucket, inputArtifact.S3.Key) if origErr == nil { return stream, nil } - if !argos3.IsS3ErrCode(origErr, "NoSuchKey") { + if !IsS3ErrCode(origErr, "NoSuchKey") { return nil, fmt.Errorf("failed to get file: %v", origErr) } // If we get here, the error was a NoSuchKey. The key might be an s3 "directory" @@ -206,7 +298,7 @@ func (s3Driver *ArtifactDriver) Delete(artifact *wfv1.Artifact) error { // saveS3Artifact uploads artifacts to an S3 compliant storage // returns true if the upload is completed or can't be retried (non-transient error) // returns false if it can be retried (transient error) -func saveS3Artifact(s3cli argos3.S3Client, path string, outputArtifact *wfv1.Artifact) (bool, error) { +func saveS3Artifact(s3cli S3Client, path string, outputArtifact *wfv1.Artifact) (bool, error) { isDir, err := file.IsDirectory(path) if err != nil { return true, fmt.Errorf("failed to test if %s is a directory: %v", path, err) @@ -271,7 +363,7 @@ func (s3Driver *ArtifactDriver) ListObjects(artifact *wfv1.Artifact) ([]string, // listObjects returns the files inside the directory represented by the Artifact // returns true if success or can't be retried (non-transient error) // returns false if it can be retried (transient error) -func listObjects(s3cli argos3.S3Client, artifact *wfv1.Artifact) (bool, []string, error) { +func listObjects(s3cli S3Client, artifact *wfv1.Artifact) (bool, []string, error) { var files []string files, err := s3cli.ListDirectory(artifact.S3.Bucket, artifact.S3.Key) if err != nil { @@ -298,3 +390,427 @@ func (s3Driver *ArtifactDriver) IsDirectory(artifact *wfv1.Artifact) (bool, erro } return s3cli.IsDirectory(artifact.S3.Bucket, artifact.S3.Key) } + +// Get AWS credentials based on default order from aws SDK +func GetAWSCredentials(opts S3ClientOpts) (*credentials.Credentials, error) { + ctx := context.Background() + cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(opts.Region)) + if err != nil { + return nil, err + } + + value, err := cfg.Credentials.Retrieve(ctx) + if err != nil { + return nil, err + } + return credentials.NewStaticV4(value.AccessKeyID, value.SecretAccessKey, value.SessionToken), nil +} + +// GetAssumeRoleCredentials gets Assumed role credentials +func GetAssumeRoleCredentials(opts S3ClientOpts) (*credentials.Credentials, error) { + ctx := context.Background() + cfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + return nil, err + } + client := sts.NewFromConfig(cfg) + + // Create the credentials from AssumeRoleProvider to assume the role + // referenced by the "myRoleARN" ARN. Prompt for MFA token from stdin. + + creds := stscreds.NewAssumeRoleProvider(client, opts.RoleARN) + value, err := creds.Retrieve(ctx) + if err != nil { + return nil, err + } + return credentials.NewStaticV4(value.AccessKeyID, value.SecretAccessKey, value.SessionToken), nil +} + +func GetCredentials(opts S3ClientOpts) (*credentials.Credentials, error) { + if opts.AccessKey != "" && opts.SecretKey != "" { + if opts.SessionToken != "" { + log.WithField("endpoint", opts.Endpoint).Info("Creating minio client using ephemeral credentials") + return credentials.NewStaticV4(opts.AccessKey, opts.SecretKey, opts.SessionToken), nil + } else { + log.WithField("endpoint", opts.Endpoint).Info("Creating minio client using static credentials") + return credentials.NewStaticV4(opts.AccessKey, opts.SecretKey, ""), nil + } + } else if opts.RoleARN != "" { + log.WithField("roleArn", opts.RoleARN).Info("Creating minio client using assumed-role credentials") + return GetAssumeRoleCredentials(opts) + } else if opts.UseSDKCreds { + log.Info("Creating minio client using AWS SDK credentials") + return GetAWSCredentials(opts) + } else { + log.Info("Creating minio client using IAM role") + return credentials.NewIAM(nullIAMEndpoint), nil + } +} + +// GetDefaultTransport returns minio's default transport +func GetDefaultTransport(opts S3ClientOpts) (*http.Transport, error) { + return minio.DefaultTransport(opts.Secure) +} + +// NewS3Client instantiates a new S3 client object backed +func NewS3Client(ctx context.Context, opts S3ClientOpts) (S3Client, error) { + s3cli := s3client{ + S3ClientOpts: opts, + } + s3cli.AccessKey = strings.TrimSpace(s3cli.AccessKey) + s3cli.SecretKey = strings.TrimSpace(s3cli.SecretKey) + var minioClient *minio.Client + var err error + + credentials, err := GetCredentials(opts) + if err != nil { + return nil, err + } + + var bucketLookupType minio.BucketLookupType + switch s3cli.AddressingStyle { + case PathStyle: + bucketLookupType = minio.BucketLookupPath + case VirtualHostedStyle: + bucketLookupType = minio.BucketLookupDNS + default: + bucketLookupType = minio.BucketLookupAuto + } + minioOpts := &minio.Options{Creds: credentials, Secure: s3cli.Secure, Transport: opts.Transport, Region: s3cli.Region, BucketLookup: bucketLookupType} + minioClient, err = minio.New(s3cli.Endpoint, minioOpts) + if err != nil { + return nil, err + } + if opts.Trace { + minioClient.TraceOn(log.StandardLogger().Out) + } + + if opts.EncryptOpts.KmsKeyId != "" && opts.EncryptOpts.ServerSideCustomerKey != "" { + return nil, fmt.Errorf("EncryptOpts.KmsKeyId and EncryptOpts.SSECPassword cannot be set together") + } + + if opts.EncryptOpts.ServerSideCustomerKey != "" && !opts.Secure { + return nil, fmt.Errorf("Secure must be set if EncryptOpts.SSECPassword is set") + } + + s3cli.ctx = ctx + s3cli.minioClient = minioClient + + return &s3cli, nil +} + +// PutFile puts a single file to a bucket at the specified key +func (s *s3client) PutFile(bucket, key, path string) error { + log.WithFields(log.Fields{"endpoint": s.Endpoint, "bucket": bucket, "key": key, "path": path}).Info("Saving file to s3") + // NOTE: minio will detect proper mime-type based on file extension + + encOpts, err := s.EncryptOpts.buildServerSideEnc(bucket, key) + + if err != nil { + return err + } + + _, err = s.minioClient.FPutObject(s.ctx, bucket, key, path, minio.PutObjectOptions{SendContentMd5: s.SendContentMd5, ServerSideEncryption: encOpts}) + if err != nil { + return err + } + return nil +} + +func (s *s3client) BucketExists(bucketName string) (bool, error) { + log.WithField("bucket", bucketName).Info("Checking if bucket exists") + result, err := s.minioClient.BucketExists(s.ctx, bucketName) + return result, err +} + +func (s *s3client) MakeBucket(bucketName string, opts minio.MakeBucketOptions) error { + log.WithFields(log.Fields{"bucket": bucketName, "region": opts.Region, "objectLocking": opts.ObjectLocking}).Info("Creating bucket") + err := s.minioClient.MakeBucket(s.ctx, bucketName, opts) + + if err != nil { + return err + } + + err = s.setBucketEnc(bucketName) + return err +} + +type uploadTask struct { + key string + path string +} + +func generatePutTasks(keyPrefix, rootPath string) chan uploadTask { + rootPath = filepath.Clean(rootPath) + string(os.PathSeparator) + uploadTasks := make(chan uploadTask) + go func() { + _ = filepath.Walk(rootPath, func(localPath string, fi os.FileInfo, err error) error { + if err != nil { + log.WithFields(log.Fields{"localPath": localPath}).Error("Failed to walk artifacts path", err) + return err + } + relPath := strings.TrimPrefix(localPath, rootPath) + if fi.IsDir() { + return nil + } + if fi.Mode()&os.ModeSymlink != 0 { + return nil + } + t := uploadTask{ + key: path.Join(keyPrefix, relPath), + path: localPath, + } + uploadTasks <- t + return nil + }) + close(uploadTasks) + }() + return uploadTasks +} + +// PutDirectory puts a complete directory into a bucket key prefix, with each file in the directory +// a separate key in the bucket. +func (s *s3client) PutDirectory(bucket, key, path string) error { + for putTask := range generatePutTasks(key, path) { + err := s.PutFile(bucket, putTask.key, putTask.path) + if err != nil { + return err + } + } + return nil +} + +// GetFile downloads a file to a local file path +func (s *s3client) GetFile(bucket, key, path string) error { + log.WithFields(log.Fields{"endpoint": s.Endpoint, "bucket": bucket, "key": key, "path": path}).Info("Getting file from s3") + + encOpts, err := s.EncryptOpts.buildServerSideEnc(bucket, key) + if err != nil { + return err + } + + err = s.minioClient.FGetObject(s.ctx, bucket, key, path, minio.GetObjectOptions{ServerSideEncryption: encOpts}) + if err != nil { + return err + } + return nil +} + +// OpenFile opens a file for reading +func (s *s3client) OpenFile(bucket, key string) (io.ReadCloser, error) { + log.WithFields(log.Fields{"endpoint": s.Endpoint, "bucket": bucket, "key": key}).Info("Opening file from s3") + + encOpts, err := s.EncryptOpts.buildServerSideEnc(bucket, key) + if err != nil { + return nil, err + } + f, err := s.minioClient.GetObject(s.ctx, bucket, key, minio.GetObjectOptions{ServerSideEncryption: encOpts}) + if err != nil { + return nil, err + } + // the call above doesn't return an error in the case that the key doesn't exist, but by calling Stat() it will + _, err = f.Stat() + if err != nil { + return nil, err + } + return f, nil +} + +// checks if object exists (and if we have permission to access) +func (s *s3client) KeyExists(bucket, key string) (bool, error) { + log.WithFields(log.Fields{"endpoint": s.Endpoint, "bucket": bucket, "key": key}).Info("Checking key exists from s3") + + encOpts, err := s.EncryptOpts.buildServerSideEnc(bucket, key) + if err != nil { + return false, err + } + + _, err = s.minioClient.StatObject(s.ctx, bucket, key, minio.StatObjectOptions{ServerSideEncryption: encOpts}) + if err == nil { + return true, nil + } + if IsS3ErrCode(err, "NoSuchKey") { + return false, nil + } + + return false, err +} + +func (s *s3client) Delete(bucket, key string) error { + log.WithFields(log.Fields{"endpoint": s.Endpoint, "bucket": bucket, "key": key}).Info("Deleting object from s3") + return s.minioClient.RemoveObject(s.ctx, bucket, key, minio.RemoveObjectOptions{}) +} + +// GetDirectory downloads a s3 directory to a local path +func (s *s3client) GetDirectory(bucket, keyPrefix, path string) error { + log.WithFields(log.Fields{"endpoint": s.Endpoint, "bucket": bucket, "key": keyPrefix, "path": path}).Info("Getting directory from s3") + keys, err := s.ListDirectory(bucket, keyPrefix) + if err != nil { + return err + } + + for _, objKey := range keys { + relKeyPath := strings.TrimPrefix(objKey, keyPrefix) + localPath := filepath.Join(path, relKeyPath) + + encOpts, err := s.EncryptOpts.buildServerSideEnc(bucket, objKey) + if err != nil { + return err + } + + err = s.minioClient.FGetObject(s.ctx, bucket, objKey, localPath, minio.GetObjectOptions{ServerSideEncryption: encOpts}) + if err != nil { + return err + } + } + return nil +} + +// IsDirectory tests if the key is acting like a s3 directory. This just means it has at least one +// object which is prefixed with the given key +func (s *s3client) IsDirectory(bucket, keyPrefix string) (bool, error) { + doneCh := make(chan struct{}) + defer close(doneCh) + + if keyPrefix != "" { + keyPrefix = filepath.Clean(keyPrefix) + "/" + if os.PathSeparator == '\\' { + keyPrefix = strings.ReplaceAll(keyPrefix, "\\", "/") + } + } + + listOpts := minio.ListObjectsOptions{ + Prefix: keyPrefix, + Recursive: false, + } + objCh := s.minioClient.ListObjects(s.ctx, bucket, listOpts) + for obj := range objCh { + if obj.Err != nil { + return false, obj.Err + } else { + return true, nil + } + } + return false, nil +} + +func (s *s3client) ListDirectory(bucket, keyPrefix string) ([]string, error) { + log.WithFields(log.Fields{"endpoint": s.Endpoint, "bucket": bucket, "key": keyPrefix}).Info("Listing directory from s3") + + if keyPrefix != "" { + keyPrefix = filepath.Clean(keyPrefix) + "/" + if os.PathSeparator == '\\' { + keyPrefix = strings.ReplaceAll(keyPrefix, "\\", "/") + } + } + + doneCh := make(chan struct{}) + defer close(doneCh) + listOpts := minio.ListObjectsOptions{ + Prefix: keyPrefix, + Recursive: true, + } + var out []string + objCh := s.minioClient.ListObjects(s.ctx, bucket, listOpts) + for obj := range objCh { + if obj.Err != nil { + return nil, obj.Err + } + if strings.HasSuffix(obj.Key, "/") { + // When a dir is created through AWS S3 console, a nameless obj will be created + // automatically, its key will be {dir_name} + "/". This obj does not display in the + // console, but you can see it when using aws cli. + // If obj.Key ends with "/" means it's a dir obj, we need to skip it, otherwise it + // will be downloaded as a regular file with the same name as the dir, and it will + // creates error when downloading the files under the dir. + continue + } + out = append(out, obj.Key) + } + return out, nil +} + +// IsS3ErrCode returns if the supplied error is of a specific S3 error code +func IsS3ErrCode(err error, code string) bool { + var minioErr minio.ErrorResponse + if errors.As(err, &minioErr) { + return minioErr.Code == code + } + return false +} + +// setBucketEnc sets the encryption options on a bucket +func (s *s3client) setBucketEnc(bucketName string) error { + if !s.EncryptOpts.Enabled { + return nil + } + + var config *sse.Configuration + if s.EncryptOpts.KmsKeyId != "" { + config = sse.NewConfigurationSSEKMS(s.EncryptOpts.KmsKeyId) + } else { + config = sse.NewConfigurationSSES3() + } + + log.WithFields(log.Fields{"KmsKeyId": s.EncryptOpts.KmsKeyId, "bucketName": bucketName}).Info("Setting Bucket Encryption") + err := s.minioClient.SetBucketEncryption(s.ctx, bucketName, config) + return err +} + +// buildServerSideEnc creates the minio encryption options when putting encrypted items in a bucket +func (e *EncryptOpts) buildServerSideEnc(bucket, key string) (encrypt.ServerSide, error) { + if e == nil || !e.Enabled { + return nil, nil + } + + if e.ServerSideCustomerKey != "" { + encryption := encrypt.DefaultPBKDF([]byte(e.ServerSideCustomerKey), []byte(bucket+key)) + + return encryption, nil + } + + if e.KmsKeyId != "" { + encryptionCtx, err := parseKMSEncCntx(e.KmsEncryptionContext) + + if err != nil { + return nil, fmt.Errorf("failed to parse KMS encryption context: %w", err) + } + + if encryptionCtx == nil { + // To overcome a limitation in Minio which checks interface{} == nil. + kms, err := encrypt.NewSSEKMS(e.KmsKeyId, nil) + + if err != nil { + return nil, err + } + + return kms, nil + } + + kms, err := encrypt.NewSSEKMS(e.KmsKeyId, encryptionCtx) + + if err != nil { + return nil, err + } + + return kms, nil + } + + return encrypt.NewSSE(), nil +} + +// parseKMSEncCntx validates if kmsEncCntx is a valid JSON +func parseKMSEncCntx(kmsEncCntx string) (*string, error) { + if kmsEncCntx == "" { + return nil, nil + } + + jsonKMSEncryptionContext, err := json.Marshal(json.RawMessage(kmsEncCntx)) + if err != nil { + return nil, fmt.Errorf("failed to marshal KMS encryption context: %w", err) + } + + parsedKMSEncryptionContext := base64.StdEncoding.EncodeToString(jsonKMSEncryptionContext) + + return &parsedKMSEncryptionContext, nil +} diff --git a/workflow/artifacts/s3/s3_test.go b/workflow/artifacts/s3/s3_test.go index a59ac718cbfe..fd04954e51d3 100644 --- a/workflow/artifacts/s3/s3_test.go +++ b/workflow/artifacts/s3/s3_test.go @@ -2,15 +2,17 @@ package s3 import ( "bytes" + "context" "io" + "net/http" "os" "path/filepath" "strings" "testing" - argos3 "github.com/argoproj/pkg/s3" "github.com/minio/minio-go/v7" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) @@ -24,7 +26,7 @@ type mockS3Client struct { mockedErrs map[string]error } -func newMockS3Client(files map[string][]string, mockedErrs map[string]error) argos3.S3Client { +func newMockS3Client(files map[string][]string, mockedErrs map[string]error) S3Client { return &mockS3Client{ files: files, mockedErrs: mockedErrs, @@ -127,7 +129,7 @@ func (s *mockS3Client) MakeBucket(bucketName string, opts minio.MakeBucketOption func TestOpenStreamS3Artifact(t *testing.T) { tests := map[string]struct { - s3client argos3.S3Client + s3client S3Client bucket string key string localPath string @@ -247,7 +249,7 @@ func (s *mockS3Client) Delete(bucket, key string) error { func TestLoadS3Artifact(t *testing.T) { tests := map[string]struct { - s3client argos3.S3Client + s3client S3Client bucket string key string localPath string @@ -413,7 +415,7 @@ func TestSaveS3Artifact(t *testing.T) { } tests := map[string]struct { - s3client argos3.S3Client + s3client S3Client bucket string key string localPath string @@ -542,7 +544,7 @@ func TestSaveS3Artifact(t *testing.T) { func TestListObjects(t *testing.T) { tests := map[string]struct { - s3client argos3.S3Client + s3client S3Client bucket string key string expectedSuccess bool @@ -619,3 +621,116 @@ func TestListObjects(t *testing.T) { } _ = os.Unsetenv(transientEnvVarKey) } + +// TestNewS3Client tests the s3 constructor +func TestNewS3Client(t *testing.T) { + opts := S3ClientOpts{ + Endpoint: "foo.com", + Region: "us-south-3", + Secure: false, + Transport: http.DefaultTransport, + AccessKey: "key", + SecretKey: "secret", + SessionToken: "", + Trace: true, + RoleARN: "", + RoleSessionName: "", + UseSDKCreds: false, + EncryptOpts: EncryptOpts{Enabled: true, ServerSideCustomerKey: "", KmsKeyId: "", KmsEncryptionContext: ""}, + } + s3If, err := NewS3Client(context.Background(), opts) + require.NoError(t, err) + s3cli := s3If.(*s3client) + assert.Equal(t, opts.Endpoint, s3cli.Endpoint) + assert.Equal(t, opts.Region, s3cli.Region) + assert.Equal(t, opts.Secure, s3cli.Secure) + assert.Equal(t, opts.Transport, s3cli.Transport) + assert.Equal(t, opts.AccessKey, s3cli.AccessKey) + assert.Equal(t, opts.SessionToken, s3cli.SessionToken) + assert.Equal(t, opts.Trace, s3cli.Trace) + assert.Equal(t, opts.EncryptOpts, s3cli.EncryptOpts) + assert.Equal(t, opts.AddressingStyle, s3cli.AddressingStyle) + // s3cli.minioClient. + // s3client.minioClient +} + +// TestNewS3Client tests the S3 constructor using ephemeral credentials +func TestNewS3ClientEphemeral(t *testing.T) { + opts := S3ClientOpts{ + Endpoint: "foo.com", + Region: "us-south-3", + AccessKey: "key", + SecretKey: "secret", + SessionToken: "sessionToken", + } + s3If, err := NewS3Client(context.Background(), opts) + require.NoError(t, err) + s3cli := s3If.(*s3client) + assert.Equal(t, opts.Endpoint, s3cli.Endpoint) + assert.Equal(t, opts.Region, s3cli.Region) + assert.Equal(t, opts.AccessKey, s3cli.AccessKey) + assert.Equal(t, opts.SecretKey, s3cli.SecretKey) + assert.Equal(t, opts.SessionToken, s3cli.SessionToken) +} + +// TestNewS3Client tests the s3 constructor +func TestNewS3ClientWithDiff(t *testing.T) { + t.Run("IAMRole", func(t *testing.T) { + opts := S3ClientOpts{ + Endpoint: "foo.com", + Region: "us-south-3", + Secure: false, + Trace: true, + } + s3If, err := NewS3Client(context.Background(), opts) + require.NoError(t, err) + s3cli := s3If.(*s3client) + assert.Equal(t, opts.Endpoint, s3cli.Endpoint) + assert.Equal(t, opts.Region, s3cli.Region) + assert.Equal(t, opts.Trace, s3cli.Trace) + assert.Equal(t, opts.Endpoint, s3cli.minioClient.EndpointURL().Host) + }) + t.Run("AssumeIAMRole", func(t *testing.T) { + t.SkipNow() + opts := S3ClientOpts{ + Endpoint: "foo.com", + Region: "us-south-3", + Secure: false, + Trace: true, + RoleARN: "01234567890123456789", + } + s3If, err := NewS3Client(context.Background(), opts) + require.NoError(t, err) + s3cli := s3If.(*s3client) + assert.Equal(t, opts.Endpoint, s3cli.Endpoint) + assert.Equal(t, opts.Region, s3cli.Region) + assert.Equal(t, opts.Trace, s3cli.Trace) + assert.Equal(t, opts.Endpoint, s3cli.minioClient.EndpointURL().Host) + }) +} + +func TestDisallowedComboOptions(t *testing.T) { + t.Run("KMS and SSEC", func(t *testing.T) { + opts := S3ClientOpts{ + Endpoint: "foo.com", + Region: "us-south-3", + Secure: true, + Trace: true, + EncryptOpts: EncryptOpts{Enabled: true, ServerSideCustomerKey: "PASSWORD", KmsKeyId: "00000000-0000-0000-0000-000000000000", KmsEncryptionContext: ""}, + } + _, err := NewS3Client(context.Background(), opts) + assert.Error(t, err) + }) + + t.Run("SSEC and InSecure", func(t *testing.T) { + opts := S3ClientOpts{ + Endpoint: "foo.com", + Region: "us-south-3", + Secure: false, + Trace: true, + EncryptOpts: EncryptOpts{Enabled: true, ServerSideCustomerKey: "PASSWORD", KmsKeyId: "", KmsEncryptionContext: ""}, + } + _, err := NewS3Client(context.Background(), opts) + assert.Error(t, err) + }) +} diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index a00a32ed1495..6539a0fc91f7 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -813,7 +813,15 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool { woc := newWorkflowOperationCtx(wf, wfc) - if (!woc.GetShutdownStrategy().Enabled() || woc.GetShutdownStrategy() != wfv1.ShutdownStrategyTerminate) && !wfc.throttler.Admit(key.(string)) && woc.wf.Status.Phase != wfv1.WorkflowRunning { + bypassParallelism := "false" + if annotation := woc.wf.GetAnnotations(); annotation != nil { + val, ok := annotation["bypass-parallelism"] + if ok { + bypassParallelism = val + } + } + + if (!woc.GetShutdownStrategy().Enabled() || woc.GetShutdownStrategy() != wfv1.ShutdownStrategyTerminate) && !wfc.throttler.Admit(key.(string)) && bypassParallelism != "true" && woc.wf.Status.Phase != wfv1.WorkflowRunning { log.WithField("key", key).Info("Workflow processing has been postponed due to max parallelism limit") if woc.wf.Status.Phase == wfv1.WorkflowUnknown { woc.markWorkflowPhase(ctx, wfv1.WorkflowPending, "Workflow processing has been postponed because too many workflows are already running") diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index 3d7b7e796b8f..c62066b41f15 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -704,6 +704,77 @@ spec: } } +func TestParallelismBypass(t *testing.T) { + for tt, f := range map[string]func(controller *WorkflowController){ + "Parallelism": func(x *WorkflowController) { + x.Config.Parallelism = 1 + }, + "NamespaceParallelism": func(x *WorkflowController) { + x.Config.NamespaceParallelism = 1 + }, + } { + t.Run(tt, func(t *testing.T) { + cancel, controller := newController( + wfv1.MustUnmarshalWorkflow(` +metadata: + name: my-wfb-0 +spec: + entrypoint: main + templates: + - name: main + container: + image: my-image +`), + wfv1.MustUnmarshalWorkflow(` +metadata: + name: my-wfb-1 + annotations: + bypass-parallelism: "true" +spec: + entrypoint: main + templates: + - name: main + container: + image: my-image +`), + wfv1.MustUnmarshalWorkflow(` +metadata: + name: my-wfb-2 +spec: + entrypoint: main + templates: + - name: main + container: + image: my-image +`), + f, + ) + defer cancel() + ctx := context.Background() + assert.True(t, controller.processNextItem(ctx)) + assert.True(t, controller.processNextItem(ctx)) + assert.True(t, controller.processNextItem(ctx)) + + expectWorkflow(ctx, controller, "my-wfb-0", func(wf *wfv1.Workflow) { + if assert.NotNil(t, wf) { + assert.Equal(t, wfv1.WorkflowRunning, wf.Status.Phase) + } + }) + expectWorkflow(ctx, controller, "my-wfb-1", func(wf *wfv1.Workflow) { + if assert.NotNil(t, wf) { + assert.Equal(t, wfv1.WorkflowRunning, wf.Status.Phase) + } + }) + expectWorkflow(ctx, controller, "my-wfb-2", func(wf *wfv1.Workflow) { + if assert.NotNil(t, wf) { + assert.Equal(t, wfv1.WorkflowPending, wf.Status.Phase) + assert.Equal(t, "Workflow processing has been postponed because too many workflows are already running", wf.Status.Message) + } + }) + }) + } +} + func TestWorkflowController_archivedWorkflowGarbageCollector(t *testing.T) { cancel, controller := newController() defer cancel()