Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 19 additions & 52 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ import (
"log/slog"
"net/http"
"net/http/pprof"
"strings"
"os"
"time"

"gopkg.in/yaml.v3"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
Expand Down Expand Up @@ -45,63 +43,32 @@ func setupInterrupt(ctx context.Context, g *run.Group, log *slog.Logger) {
}

type bucketOpts struct {
storage string
prefix string

// filesystem options
filesystemDirectory string

// s3 options
s3Bucket string
s3Endpoint string
s3AccessKey string
s3SecretKey string
s3Insecure bool

retries int
objStoreConfigFile string
objStoreConfig string
}

func setupBucket(log *slog.Logger, opts bucketOpts) (objstore.Bucket, error) {
prov := objstore.ObjProvider(strings.ToUpper(opts.storage))
cfg := client.BucketConfig{
Type: prov,
Prefix: opts.prefix,
}
var subCfg any
switch prov {
case objstore.FILESYSTEM:
subCfg = struct {
Directory string `yaml:"directory"`
}{
Directory: opts.filesystemDirectory,
}
case objstore.S3:
subCfg = struct {
Bucket string `yaml:"bucket"`
Endpoint string `yaml:"endpoint"`
AccessKey string `yaml:"access_key"`
SecretKey string `yaml:"secret_key"`
MaxRetries int `yaml:"max_retries"`
Insecure bool `yaml:"insecure"`
}{
Bucket: opts.s3Bucket,
Endpoint: opts.s3Endpoint,
AccessKey: opts.s3AccessKey,
SecretKey: opts.s3SecretKey,
Insecure: opts.s3Insecure,
MaxRetries: opts.retries,
var confContentYaml []byte
var err error

// Read from file if provided, otherwise use inline content
if opts.objStoreConfigFile != "" {
confContentYaml, err = os.ReadFile(opts.objStoreConfigFile)
if err != nil {
return nil, fmt.Errorf("unable to read objstore config file: %w", err)
}
default:
return nil, fmt.Errorf("unknown bucket type: %s", prov)
} else if opts.objStoreConfig != "" {
confContentYaml = []byte(opts.objStoreConfig)
} else {
return nil, fmt.Errorf("objstore config is required (use --parquet.objstore-config or --parquet.objstore-config-file)")
}

cfg.Config = subCfg
bytes, err := yaml.Marshal(cfg)
if err != nil {
return nil, fmt.Errorf("unable to marshal bucket config yaml: %w", err)
// If config is empty, return error
if len(confContentYaml) == 0 {
return nil, fmt.Errorf("objstore config is required")
}

bkt, err := client.NewBucket(slogAdapter{log}, bytes, "parquet-gateway", nil)
bkt, err := client.NewBucket(slogAdapter{log}, confContentYaml, "parquet-gateway", nil)
if err != nil {
return nil, fmt.Errorf("unable to create bucket client: %w", err)
}
Expand Down
139 changes: 139 additions & 0 deletions cmd/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

package main

import (
"io"
"log/slog"
"os"
"path/filepath"
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("github.com/baidubce/bce-sdk-go/util/log.NewLogger.func1"))
}

func TestSetupBucketWithConfigFile(t *testing.T) {
t.Run("filesystem config from file", func(tt *testing.T) {
tmpDir := tt.TempDir()
configFile := filepath.Join(tmpDir, "config.yaml")
configContent := `type: FILESYSTEM
config:
directory: ` + tmpDir + `
`
if err := os.WriteFile(configFile, []byte(configContent), 0644); err != nil {
tt.Fatalf("unable to write config file: %v", err)
}

opts := bucketOpts{
objStoreConfigFile: configFile,
}

log := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError}))
bkt, err := setupBucket(log, opts)
if err != nil {
tt.Fatalf("unable to setup bucket: %v", err)
}
if bkt == nil {
tt.Fatal("bucket is nil")
}

// Verify it's a filesystem bucket by checking if we can list (empty bucket)
ctx := tt.Context()
if err := bkt.Iter(ctx, "", func(_ string) error {
return nil
}); err != nil {
tt.Fatalf("unable to iterate bucket: %v", err)
}
})

t.Run("filesystem config from inline yaml", func(tt *testing.T) {
tmpDir := tt.TempDir()
configContent := `type: FILESYSTEM
config:
directory: ` + tmpDir + `
`

opts := bucketOpts{
objStoreConfig: configContent,
}

log := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError}))
bkt, err := setupBucket(log, opts)
if err != nil {
tt.Fatalf("unable to setup bucket: %v", err)
}
if bkt == nil {
tt.Fatal("bucket is nil")
}

ctx := tt.Context()
if err := bkt.Iter(ctx, "", func(_ string) error {
return nil
}); err != nil {
tt.Fatalf("unable to iterate bucket: %v", err)
}
})

t.Run("empty config returns error", func(tt *testing.T) {
opts := bucketOpts{}

log := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError}))
_, err := setupBucket(log, opts)
if err == nil {
tt.Fatal("expected error for empty config")
}
})

t.Run("invalid config returns error", func(tt *testing.T) {
opts := bucketOpts{
objStoreConfig: "invalid: yaml: content",
}

log := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError}))
_, err := setupBucket(log, opts)
if err == nil {
tt.Fatal("expected error for invalid config")
}
})

t.Run("s3 config from file", func(tt *testing.T) {
configFile := filepath.Join(tt.TempDir(), "config.yaml")
configContent := `type: S3
config:
bucket: test-bucket
endpoint: localhost:9000
access_key: minioadmin
secret_key: minioadmin
insecure: true
`
if err := os.WriteFile(configFile, []byte(configContent), 0644); err != nil {
tt.Fatalf("unable to write config file: %v", err)
}

opts := bucketOpts{
objStoreConfigFile: configFile,
}

log := slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelError}))
bkt, err := setupBucket(log, opts)
// S3 bucket creation might fail if minio is not running, but config parsing should work
if err != nil && bkt == nil {
// This is expected if S3 endpoint is not available
return
}
if bkt != nil {
// If bucket was created, verify it's the right type
ctx := tt.Context()
_ = bkt.Iter(ctx, "", func(_ string) error {
return nil
})
}
})
}

20 changes: 4 additions & 16 deletions cmd/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,13 @@ func (opts *conversionOpts) registerFlags(cmd *kingpin.CmdClause) {
}

func (opts *bucketOpts) registerConvertParquetFlags(cmd *kingpin.CmdClause) {
cmd.Flag("parquet.storage.type", "type of storage").Default("filesystem").EnumVar(&opts.storage, "filesystem", "s3")
cmd.Flag("parquet.storage.prefix", "prefix for the storage").Default("").StringVar(&opts.prefix)
cmd.Flag("parquet.storage.filesystem.directory", "directory for filesystem").Default(".data").StringVar(&opts.filesystemDirectory)
cmd.Flag("parquet.storage.s3.bucket", "bucket for s3").Default("").StringVar(&opts.s3Bucket)
cmd.Flag("parquet.storage.s3.endpoint", "endpoint for s3").Default("").StringVar(&opts.s3Endpoint)
cmd.Flag("parquet.storage.s3.access_key", "access key for s3").Default("").Envar("PARQUET_STORAGE_S3_ACCESS_KEY").StringVar(&opts.s3AccessKey)
cmd.Flag("parquet.storage.s3.secret_key", "secret key for s3").Default("").Envar("PARQUET_STORAGE_S3_SECRET_KEY").StringVar(&opts.s3SecretKey)
cmd.Flag("parquet.storage.s3.insecure", "use http").Default("false").BoolVar(&opts.s3Insecure)
cmd.Flag("parquet.objstore-config-file", "YAML file that contains object store configuration for parquet storage. See format details: https://thanos.io/tip/thanos/storage.md/#configuration").StringVar(&opts.objStoreConfigFile)
cmd.Flag("parquet.objstore-config", "Alternative to 'parquet.objstore-config-file'. YAML content for parquet storage configuration.").StringVar(&opts.objStoreConfig)
}

func (opts *bucketOpts) registerConvertTSDBFlags(cmd *kingpin.CmdClause) {
cmd.Flag("tsdb.storage.type", "type of storage").Default("filesystem").EnumVar(&opts.storage, "filesystem", "s3")
cmd.Flag("tsdb.storage.prefix", "prefix for the storage").Default("").StringVar(&opts.prefix)
cmd.Flag("tsdb.storage.filesystem.directory", "directory for filesystem").Default(".data").StringVar(&opts.filesystemDirectory)
cmd.Flag("tsdb.storage.s3.bucket", "bucket for s3").Default("").StringVar(&opts.s3Bucket)
cmd.Flag("tsdb.storage.s3.endpoint", "endpoint for s3").Default("").StringVar(&opts.s3Endpoint)
cmd.Flag("tsdb.storage.s3.access_key", "access key for s3").Default("").Envar("TSDB_STORAGE_S3_ACCESS_KEY").StringVar(&opts.s3AccessKey)
cmd.Flag("tsdb.storage.s3.secret_key", "secret key for s3").Default("").Envar("TSDB_STORAGE_S3_SECRET_KEY").StringVar(&opts.s3SecretKey)
cmd.Flag("tsdb.storage.s3.insecure", "use http").Default("false").BoolVar(&opts.s3Insecure)
cmd.Flag("tsdb.objstore-config-file", "YAML file that contains object store configuration for TSDB storage. See format details: https://thanos.io/tip/thanos/storage.md/#configuration").StringVar(&opts.objStoreConfigFile)
cmd.Flag("tsdb.objstore-config", "Alternative to 'tsdb.objstore-config-file'. YAML content for TSDB storage configuration.").StringVar(&opts.objStoreConfig)
}

func (opts *discoveryOpts) registerConvertParquetFlags(cmd *kingpin.CmdClause) {
Expand Down
10 changes: 2 additions & 8 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,8 @@ func (opts *serveOpts) registerFlags(cmd *kingpin.CmdClause) {
}

func (opts *bucketOpts) registerServeFlags(cmd *kingpin.CmdClause) {
cmd.Flag("storage.type", "type of storage").Default("filesystem").EnumVar(&opts.storage, "filesystem", "s3")
cmd.Flag("storage.prefix", "prefix for the storage").Default("").StringVar(&opts.prefix)
cmd.Flag("storage.filesystem.directory", "directory for filesystem").Default(".data").StringVar(&opts.filesystemDirectory)
cmd.Flag("storage.s3.bucket", "bucket for s3").Default("").StringVar(&opts.s3Bucket)
cmd.Flag("storage.s3.endpoint", "endpoint for s3").Default("").StringVar(&opts.s3Endpoint)
cmd.Flag("storage.s3.access_key", "access key for s3").Default("").Envar("STORAGE_S3_ACCESS_KEY").StringVar(&opts.s3AccessKey)
cmd.Flag("storage.s3.secret_key", "secret key for s3").Default("").Envar("STORAGE_S3_SECRET_KEY").StringVar(&opts.s3SecretKey)
cmd.Flag("storage.s3.insecure", "use http").Default("false").BoolVar(&opts.s3Insecure)
cmd.Flag("parquet.objstore-config-file", "YAML file that contains object store configuration. See format details: https://thanos.io/tip/thanos/storage.md/#configuration").StringVar(&opts.objStoreConfigFile)
cmd.Flag("parquet.objstore-config", "Alternative to 'parquet.objstore-config-file'. YAML content for object store configuration.").StringVar(&opts.objStoreConfig)
}

func (opts *tracingOpts) registerServeFlags(cmd *kingpin.CmdClause) {
Expand Down