diff --git a/cmd/config.go b/cmd/config.go index f95b539..f430b9c 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -10,11 +10,9 @@ import ( "log/slog" "net/http" "net/http/pprof" - "strings" + "os" "time" - "gopkg.in/yaml.v3" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/jaeger" "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" @@ -45,63 +43,32 @@ func setupInterrupt(ctx context.Context, g *run.Group, log *slog.Logger) { } type bucketOpts struct { - storage string - prefix string - - // filesystem options - filesystemDirectory string - - // s3 options - s3Bucket string - s3Endpoint string - s3AccessKey string - s3SecretKey string - s3Insecure bool - - retries int + objStoreConfigFile string + objStoreConfig string } func setupBucket(log *slog.Logger, opts bucketOpts) (objstore.Bucket, error) { - prov := objstore.ObjProvider(strings.ToUpper(opts.storage)) - cfg := client.BucketConfig{ - Type: prov, - Prefix: opts.prefix, - } - var subCfg any - switch prov { - case objstore.FILESYSTEM: - subCfg = struct { - Directory string `yaml:"directory"` - }{ - Directory: opts.filesystemDirectory, - } - case objstore.S3: - subCfg = struct { - Bucket string `yaml:"bucket"` - Endpoint string `yaml:"endpoint"` - AccessKey string `yaml:"access_key"` - SecretKey string `yaml:"secret_key"` - MaxRetries int `yaml:"max_retries"` - Insecure bool `yaml:"insecure"` - }{ - Bucket: opts.s3Bucket, - Endpoint: opts.s3Endpoint, - AccessKey: opts.s3AccessKey, - SecretKey: opts.s3SecretKey, - Insecure: opts.s3Insecure, - MaxRetries: opts.retries, + var confContentYaml []byte + var err error + + // Read from file if provided, otherwise use inline content + if opts.objStoreConfigFile != "" { + confContentYaml, err = os.ReadFile(opts.objStoreConfigFile) + if err != nil { + return nil, fmt.Errorf("unable to read objstore config file: %w", err) } - default: - return nil, fmt.Errorf("unknown bucket type: %s", prov) + } else if opts.objStoreConfig != "" { + confContentYaml = []byte(opts.objStoreConfig) + } else { + return nil, fmt.Errorf("objstore config is required (use --parquet.objstore-config or --parquet.objstore-config-file)") } - cfg.Config = subCfg - bytes, err := yaml.Marshal(cfg) - if err != nil { - return nil, fmt.Errorf("unable to marshal bucket config yaml: %w", err) + // If config is empty, return error + if len(confContentYaml) == 0 { + return nil, fmt.Errorf("objstore config is required") } - bkt, err := client.NewBucket(slogAdapter{log}, bytes, "parquet-gateway", nil) + bkt, err := client.NewBucket(slogAdapter{log}, confContentYaml, "parquet-gateway", nil) if err != nil { return nil, fmt.Errorf("unable to create bucket client: %w", err) } diff --git a/cmd/config_test.go b/cmd/config_test.go new file mode 100644 index 0000000..8e1dae5 --- /dev/null +++ b/cmd/config_test.go @@ -0,0 +1,139 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +package main + +import ( + "io" + "log/slog" + "os" + "path/filepath" + "testing" + + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("github.com/baidubce/bce-sdk-go/util/log.NewLogger.func1")) +} + +func TestSetupBucketWithConfigFile(t *testing.T) { + t.Run("filesystem config from file", func(tt *testing.T) { + tmpDir := tt.TempDir() + configFile := filepath.Join(tmpDir, "config.yaml") + configContent := `type: FILESYSTEM +config: + directory: ` + tmpDir + ` +` + if err := os.WriteFile(configFile, []byte(configContent), 0644); err != nil { + tt.Fatalf("unable to write config file: %v", err) + } + + opts := bucketOpts{ + objStoreConfigFile: configFile, + } + + log := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError})) + bkt, err := setupBucket(log, opts) + if err != nil { + tt.Fatalf("unable to setup bucket: %v", err) + } + if bkt == nil { + tt.Fatal("bucket is nil") + } + + // Verify it's a filesystem bucket by checking if we can list (empty bucket) + ctx := tt.Context() + if err := bkt.Iter(ctx, "", func(_ string) error { + return nil + }); err != nil { + tt.Fatalf("unable to iterate bucket: %v", err) + } + }) + + t.Run("filesystem config from inline yaml", func(tt *testing.T) { + tmpDir := tt.TempDir() + configContent := `type: FILESYSTEM +config: + directory: ` + tmpDir + ` +` + + opts := bucketOpts{ + objStoreConfig: configContent, + } + + log := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError})) + bkt, err := setupBucket(log, opts) + if err != nil { + tt.Fatalf("unable to setup bucket: %v", err) + } + if bkt == nil { + tt.Fatal("bucket is nil") + } + + ctx := tt.Context() + if err := bkt.Iter(ctx, "", func(_ string) error { + return nil + }); err != nil { + tt.Fatalf("unable to iterate bucket: %v", err) + } + }) + + t.Run("empty config returns error", func(tt *testing.T) { + opts := bucketOpts{} + + log := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError})) + _, err := setupBucket(log, opts) + if err == nil { + tt.Fatal("expected error for empty config") + } + }) + + t.Run("invalid config returns error", func(tt *testing.T) { + opts := bucketOpts{ + objStoreConfig: "invalid: yaml: content", + } + + log := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError})) + _, err := setupBucket(log, opts) + if err == nil { + tt.Fatal("expected error for invalid config") + } + }) + + t.Run("s3 config from file", func(tt *testing.T) { + configFile := filepath.Join(tt.TempDir(), "config.yaml") + configContent := `type: S3 +config: + bucket: test-bucket + endpoint: localhost:9000 + access_key: minioadmin + secret_key: minioadmin + insecure: true +` + if err := os.WriteFile(configFile, []byte(configContent), 0644); err != nil { + tt.Fatalf("unable to write config file: %v", err) + } + + opts := bucketOpts{ + objStoreConfigFile: configFile, + } + + log := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError})) + bkt, err := setupBucket(log, opts) + // S3 bucket creation might fail if minio is not running, but config parsing should work + if err != nil && bkt == nil { + // This is expected if S3 endpoint is not available + return + } + if bkt != nil { + // If bucket was created, verify it's the right type + ctx := tt.Context() + _ = bkt.Iter(ctx, "", func(_ string) error { + return nil + }) + } + }) +} + diff --git a/cmd/convert.go b/cmd/convert.go index 649f565..8a4ddf6 100644 --- a/cmd/convert.go +++ b/cmd/convert.go @@ -85,25 +85,13 @@ func (opts *conversionOpts) registerFlags(cmd *kingpin.CmdClause) { } func (opts *bucketOpts) registerConvertParquetFlags(cmd *kingpin.CmdClause) { - cmd.Flag("parquet.storage.type", "type of storage").Default("filesystem").EnumVar(&opts.storage, "filesystem", "s3") - cmd.Flag("parquet.storage.prefix", "prefix for the storage").Default("").StringVar(&opts.prefix) - cmd.Flag("parquet.storage.filesystem.directory", "directory for filesystem").Default(".data").StringVar(&opts.filesystemDirectory) - cmd.Flag("parquet.storage.s3.bucket", "bucket for s3").Default("").StringVar(&opts.s3Bucket) - cmd.Flag("parquet.storage.s3.endpoint", "endpoint for s3").Default("").StringVar(&opts.s3Endpoint) - cmd.Flag("parquet.storage.s3.access_key", "access key for s3").Default("").Envar("PARQUET_STORAGE_S3_ACCESS_KEY").StringVar(&opts.s3AccessKey) - cmd.Flag("parquet.storage.s3.secret_key", "secret key for s3").Default("").Envar("PARQUET_STORAGE_S3_SECRET_KEY").StringVar(&opts.s3SecretKey) - cmd.Flag("parquet.storage.s3.insecure", "use http").Default("false").BoolVar(&opts.s3Insecure) + cmd.Flag("parquet.objstore-config-file", "YAML file that contains object store configuration for parquet storage. See format details: https://thanos.io/tip/thanos/storage.md/#configuration").StringVar(&opts.objStoreConfigFile) + cmd.Flag("parquet.objstore-config", "Alternative to 'parquet.objstore-config-file'. YAML content for parquet storage configuration.").StringVar(&opts.objStoreConfig) } func (opts *bucketOpts) registerConvertTSDBFlags(cmd *kingpin.CmdClause) { - cmd.Flag("tsdb.storage.type", "type of storage").Default("filesystem").EnumVar(&opts.storage, "filesystem", "s3") - cmd.Flag("tsdb.storage.prefix", "prefix for the storage").Default("").StringVar(&opts.prefix) - cmd.Flag("tsdb.storage.filesystem.directory", "directory for filesystem").Default(".data").StringVar(&opts.filesystemDirectory) - cmd.Flag("tsdb.storage.s3.bucket", "bucket for s3").Default("").StringVar(&opts.s3Bucket) - cmd.Flag("tsdb.storage.s3.endpoint", "endpoint for s3").Default("").StringVar(&opts.s3Endpoint) - cmd.Flag("tsdb.storage.s3.access_key", "access key for s3").Default("").Envar("TSDB_STORAGE_S3_ACCESS_KEY").StringVar(&opts.s3AccessKey) - cmd.Flag("tsdb.storage.s3.secret_key", "secret key for s3").Default("").Envar("TSDB_STORAGE_S3_SECRET_KEY").StringVar(&opts.s3SecretKey) - cmd.Flag("tsdb.storage.s3.insecure", "use http").Default("false").BoolVar(&opts.s3Insecure) + cmd.Flag("tsdb.objstore-config-file", "YAML file that contains object store configuration for TSDB storage. See format details: https://thanos.io/tip/thanos/storage.md/#configuration").StringVar(&opts.objStoreConfigFile) + cmd.Flag("tsdb.objstore-config", "Alternative to 'tsdb.objstore-config-file'. YAML content for TSDB storage configuration.").StringVar(&opts.objStoreConfig) } func (opts *discoveryOpts) registerConvertParquetFlags(cmd *kingpin.CmdClause) { diff --git a/cmd/serve.go b/cmd/serve.go index 1e38d6d..6d9618d 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -65,14 +65,8 @@ func (opts *serveOpts) registerFlags(cmd *kingpin.CmdClause) { } func (opts *bucketOpts) registerServeFlags(cmd *kingpin.CmdClause) { - cmd.Flag("storage.type", "type of storage").Default("filesystem").EnumVar(&opts.storage, "filesystem", "s3") - cmd.Flag("storage.prefix", "prefix for the storage").Default("").StringVar(&opts.prefix) - cmd.Flag("storage.filesystem.directory", "directory for filesystem").Default(".data").StringVar(&opts.filesystemDirectory) - cmd.Flag("storage.s3.bucket", "bucket for s3").Default("").StringVar(&opts.s3Bucket) - cmd.Flag("storage.s3.endpoint", "endpoint for s3").Default("").StringVar(&opts.s3Endpoint) - cmd.Flag("storage.s3.access_key", "access key for s3").Default("").Envar("STORAGE_S3_ACCESS_KEY").StringVar(&opts.s3AccessKey) - cmd.Flag("storage.s3.secret_key", "secret key for s3").Default("").Envar("STORAGE_S3_SECRET_KEY").StringVar(&opts.s3SecretKey) - cmd.Flag("storage.s3.insecure", "use http").Default("false").BoolVar(&opts.s3Insecure) + cmd.Flag("parquet.objstore-config-file", "YAML file that contains object store configuration. See format details: https://thanos.io/tip/thanos/storage.md/#configuration").StringVar(&opts.objStoreConfigFile) + cmd.Flag("parquet.objstore-config", "Alternative to 'parquet.objstore-config-file'. YAML content for object store configuration.").StringVar(&opts.objStoreConfig) } func (opts *tracingOpts) registerServeFlags(cmd *kingpin.CmdClause) {