Skip to content

Commit f1b7283

Browse files
committed
chore: follow-up on upstream changes
Signed-off-by: Bence Csati <[email protected]>
1 parent 419cd55 commit f1b7283

File tree

10 files changed

+185
-124
lines changed

10 files changed

+185
-124
lines changed

README.md

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ Forward is the protocol used by Fluentd to route message between peers.
2222

2323
| Property | Default value | Type | Description |
2424
|---|---|---|---|
25-
| endpoint | | string | **MANDATORY** Target URL to send `Forward` log streams to |
25+
| endpoint.tcp_addr | | string | **MANDATORY** Target URL to send `Forward` log streams to |
26+
| endpoint.validate_tcp_resolution | false | bool | Controls whether to validate the tcp address and fail at startup. |
2627
| connection_timeout | 30s | time.Duration | Maximum amount of time a dial will wait for a connect to complete |
2728
| tls.insecure | true | bool | If set to **true**, the connexion is not secured with TLS. |
2829
| tls.insecure_skip_verify | false | bool | Controls whether the exporter verifies the server's certificate chain and host name. If **true**, any certificate is accepted and any host name. This mode is susceptible to man-in-the-middle attacks |
@@ -34,6 +35,8 @@ Forward is the protocol used by Fluentd to route message between peers.
3435
| tag | "tag" | string | Fluentd tag is a string separated by '.'s (e.g. myapp.access), and is used as the directions for Fluentd's internal routing engine |
3536
| compress_gzip | false | bool | Transparent data compression. You can use this feature to reduce the transferred payload size |
3637
| default_labels_enabled | true | map[string]bool | If omitted then default labels will be added. If one of the labels is omitted then this label will be added |
38+
| kubernetes_metadata.key | | string | KubernetesMetadata includes kubernetes metadata as a nested object. It leverages resources attributes provided by k8sattributesprocessor |
39+
| kubernetes_metadata.include_pod_labels | | bool | Whether pod labels should be added to the nested object |
3740

3841
See the default values in the method `createDefaultConfig()` in [factory.go](factory.go) file.
3942

@@ -42,7 +45,8 @@ Example, for `default_labels_enabled` that will add only the `time` attribute in
4245
```yaml
4346
exporters:
4447
fluentforward:
45-
endpoint: a.new.fluentforward.target:24224
48+
endpoint:
49+
tcp_addr: a.new.fluentforward.target:24224
4650
connection_timeout: 10s
4751
require_ack: true
4852
tag: nginx
@@ -59,7 +63,8 @@ Example with TLS enabled and shared key:
5963
```yaml
6064
exporters:
6165
fluentforward:
62-
endpoint: a.new.fluentforward.target:24224
66+
endpoint:
67+
tcp_addr: a.new.fluentforward.target:24224
6368
connection_timeout: 10s
6469
tls:
6570
insecure: false
@@ -71,7 +76,8 @@ Example with mutual TLS authentication (mTLS):
7176
```yaml
7277
exporters:
7378
fluentforward:
74-
endpoint: a.new.fluentforward.target:24224
79+
endpoint:
80+
tcp_addr: a.new.fluentforward.target:24224
7581
connection_timeout: 10s
7682
tls:
7783
insecure: false
@@ -93,7 +99,8 @@ Example usage:
9399
```yaml
94100
exporters:
95101
fluentforward:
96-
endpoint: a.new.fluentforward.target:24224
102+
endpoint:
103+
tcp_addr: a.new.fluentforward.target:24224
97104
connection_timeout: 10s
98105
retry_on_failure:
99106
enabled: true

config.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@ import (
1616

1717
// TCPClientSettings defines common settings for a TCP client.
1818
type TCPClientSettings struct {
19-
// The target endpoint URI to send data to (e.g.: some.url:24224).
20-
Endpoint string `mapstructure:"endpoint"`
19+
// Endpoint to send logs to.
20+
Endpoint `mapstructure:"endpoint"`
2121

2222
// Connection Timeout parameter configures `net.Dialer`.
2323
ConnectionTimeout time.Duration `mapstructure:"connection_timeout"`
@@ -33,10 +33,6 @@ type TCPClientSettings struct {
3333
type Config struct {
3434
TCPClientSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
3535

36-
// SkipFailOnInvalidTCPEndpoint controls whether to fail if the endpoint is invalid.
37-
// This is useful for cases where the collector is started before the endpoint becomes available.
38-
SkipFailOnInvalidTCPEndpoint bool `mapstructure:"skip_fail_on_invalid_tcp_endpoint"`
39-
4036
// RequireAck enables the acknowledgement feature.
4137
RequireAck bool `mapstructure:"require_ack"`
4238

@@ -76,6 +72,13 @@ type Config struct {
7672
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
7773
}
7874

75+
type Endpoint struct {
76+
// TCPAddr is the address of the server to connect to.
77+
TCPAddr string `mapstructure:"tcp_addr"`
78+
// Controls whether to validate the tcp address.
79+
ValidateTCPResolution bool `mapstructure:"validate_tcp_resolution"`
80+
}
81+
7982
type KubernetesMetadata struct {
8083
Key string `mapstructure:"key"`
8184
IncludePodLabels bool `mapstructure:"include_pod_labels"`
@@ -89,10 +92,10 @@ func (config *Config) Validate() error {
8992
return fmt.Errorf("queue settings has invalid configuration: %w", err)
9093
}
9194

92-
// Resolve TCP address just to ensure that it is a valid one. It is better
93-
// to fail here than at when the exporter is started.
94-
if !config.SkipFailOnInvalidTCPEndpoint {
95-
if _, err := net.ResolveTCPAddr("tcp", config.Endpoint); err != nil {
95+
if config.TCPClientSettings.Endpoint.ValidateTCPResolution {
96+
// Resolve TCP address just to ensure that it is a valid one. It is better
97+
// to fail here than at when the exporter is started.
98+
if _, err := net.ResolveTCPAddr("tcp", config.Endpoint.TCPAddr); err != nil {
9699
return fmt.Errorf("exporter has an invalid TCP endpoint: %w", err)
97100
}
98101
}

config_test.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ func TestLoadConfigNewExporter(t *testing.T) {
3535
id: component.NewIDWithName(metadata.Type, "allsettings"),
3636
expected: &Config{
3737
TCPClientSettings: TCPClientSettings{
38-
Endpoint: validEndpoint,
38+
Endpoint: Endpoint{
39+
TCPAddr: validEndpoint,
40+
ValidateTCPResolution: false,
41+
},
3942
ConnectionTimeout: time.Second * 30,
4043
ClientConfig: configtls.ClientConfig{
4144
Insecure: true,
@@ -97,35 +100,48 @@ func TestConfigValidate(t *testing.T) {
97100
}{
98101
{
99102
desc: "QueueSettings are invalid",
100-
cfg: &Config{QueueConfig: exporterhelper.QueueConfig{QueueSize: -1, Enabled: true}},
101-
err: fmt.Errorf("queue settings has invalid configuration"),
103+
cfg: &Config{
104+
QueueConfig: exporterhelper.QueueConfig{
105+
QueueSize: -1,
106+
Enabled: true,
107+
},
108+
},
109+
err: fmt.Errorf("queue settings has invalid configuration"),
102110
},
103111
{
104112
desc: "Endpoint is invalid",
105113
cfg: &Config{
106114
TCPClientSettings: TCPClientSettings{
107-
Endpoint: "http://localhost:24224",
115+
Endpoint: Endpoint{
116+
TCPAddr: "http://localhost:24224",
117+
ValidateTCPResolution: true,
118+
},
108119
ConnectionTimeout: time.Second * 30,
109120
},
110121
},
111122
err: fmt.Errorf("exporter has an invalid TCP endpoint: address http://localhost:24224: too many colons in address"),
112123
},
113124
{
114-
desc: "Endpoint is invalid but SkipFailOnInvalidTCPEndpoint is false",
125+
desc: "Endpoint is invalid with ValidateTCPResolution false throw no error",
115126
cfg: &Config{
116127
TCPClientSettings: TCPClientSettings{
117-
Endpoint: "http://localhost:24224",
128+
Endpoint: Endpoint{
129+
TCPAddr: "http://localhost:24224",
130+
ValidateTCPResolution: false,
131+
},
118132
ConnectionTimeout: time.Second * 30,
119133
},
120-
SkipFailOnInvalidTCPEndpoint: true,
121134
},
122135
err: nil,
123136
},
124137
{
125138
desc: "Config is valid",
126139
cfg: &Config{
127140
TCPClientSettings: TCPClientSettings{
128-
Endpoint: validEndpoint,
141+
Endpoint: Endpoint{
142+
TCPAddr: validEndpoint,
143+
ValidateTCPResolution: true,
144+
},
129145
ConnectionTimeout: time.Second * 30,
130146
},
131147
},

exporter.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (f *fluentforwardExporter) start(ctx context.Context, host component.Host)
4242
return err
4343
}
4444
connFactory := &fclient.ConnFactory{
45-
Address: f.config.Endpoint,
45+
Address: f.config.Endpoint.TCPAddr,
4646
Timeout: f.config.ConnectionTimeout,
4747
TLSConfig: tlsConfig,
4848
}
@@ -70,14 +70,14 @@ func (f *fluentforwardExporter) stop(context.Context) (err error) {
7070
// connectForward connects to the Fluent Forward endpoint and keep running otel even if the connection is failing
7171
func (f *fluentforwardExporter) connectForward() {
7272
if err := f.client.Connect(); err != nil {
73-
f.settings.Logger.Error(fmt.Sprintf("Failed to connect to the endpoint %s", f.config.Endpoint))
73+
f.settings.Logger.Error(fmt.Sprintf("Failed to connect to the endpoint %s", f.config.Endpoint.TCPAddr))
7474
return
7575
}
76-
f.settings.Logger.Info(fmt.Sprintf("Successfull connection to the endpoint %s", f.config.Endpoint))
76+
f.settings.Logger.Info(fmt.Sprintf("Successfull connection to the endpoint %s", f.config.Endpoint.TCPAddr))
7777

7878
if f.config.SharedKey != "" {
7979
if err := f.client.Handshake(); err != nil {
80-
f.settings.Logger.Error(fmt.Sprintf("Failed shared key handshake with the endpoint %s", f.config.Endpoint))
80+
f.settings.Logger.Error(fmt.Sprintf("Failed shared key handshake with the endpoint %s", f.config.Endpoint.TCPAddr))
8181
return
8282
}
8383
f.settings.Logger.Info("Successfull shared key handshake with the endpoint")
@@ -182,7 +182,7 @@ func (f *fluentforwardExporter) send(sendMethod sendFunc, entries []protocol.Ent
182182
if errr := f.client.Disconnect(); errr != nil {
183183
return errr
184184
}
185-
f.settings.Logger.Warn(fmt.Sprintf("Failed to send data to the endpoint %s, trying to reconnect", f.config.Endpoint))
185+
f.settings.Logger.Warn(fmt.Sprintf("Failed to send data to the endpoint %s, trying to reconnect", f.config.Endpoint.TCPAddr))
186186
f.connectForward()
187187
err = sendMethod(f.config.Tag, entries)
188188
if err != nil {

exporter_test.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ import (
1818
func TestNewExporter(t *testing.T) {
1919
config := &Config{
2020
TCPClientSettings: TCPClientSettings{
21-
Endpoint: validEndpoint,
21+
Endpoint: Endpoint{
22+
TCPAddr: validEndpoint,
23+
},
2224
ConnectionTimeout: time.Second * 30,
2325
},
2426
}
@@ -37,7 +39,9 @@ func TestNewExporter(t *testing.T) {
3739
func TestStart(t *testing.T) {
3840
config := &Config{
3941
TCPClientSettings: TCPClientSettings{
40-
Endpoint: validEndpoint,
42+
Endpoint: Endpoint{
43+
TCPAddr: validEndpoint,
44+
},
4145
ConnectionTimeout: time.Second * 30,
4246
},
4347
}
@@ -62,7 +66,10 @@ func TestStartInvalidEndpointErrorLog(t *testing.T) {
6266

6367
config := &Config{
6468
TCPClientSettings: TCPClientSettings{
65-
Endpoint: "invalidEndpoint",
69+
Endpoint: Endpoint{
70+
TCPAddr: "invalidEndpoint",
71+
ValidateTCPResolution: true,
72+
},
6673
ConnectionTimeout: time.Second * 30,
6774
},
6875
}

factory.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ func NewFactory() exporter.Factory {
3131
func createDefaultConfig() component.Config {
3232
return &Config{
3333
TCPClientSettings: TCPClientSettings{
34-
Endpoint: "localhost:24224",
34+
Endpoint: Endpoint{
35+
TCPAddr: "localhost:24224",
36+
ValidateTCPResolution: false,
37+
},
3538
ConnectionTimeout: time.Second * 30,
3639
ClientConfig: configtls.ClientConfig{
3740
Insecure: true,

factory_test.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ func TestNewExporterMinimalConfig(t *testing.T) {
3030
t.Run("with valid config", func(t *testing.T) {
3131
config := &Config{
3232
TCPClientSettings: TCPClientSettings{
33-
Endpoint: validEndpoint,
33+
Endpoint: Endpoint{
34+
TCPAddr: validEndpoint,
35+
},
3436
ConnectionTimeout: time.Second * 30,
3537
},
3638
}
@@ -43,7 +45,10 @@ func TestNewExporterFullConfig(t *testing.T) {
4345
t.Run("with valid config", func(t *testing.T) {
4446
config := &Config{
4547
TCPClientSettings: TCPClientSettings{
46-
Endpoint: validEndpoint,
48+
Endpoint: Endpoint{
49+
TCPAddr: validEndpoint,
50+
ValidateTCPResolution: true,
51+
},
4752
ConnectionTimeout: time.Second * 30,
4853
ClientConfig: configtls.ClientConfig{
4954
Insecure: true,
@@ -82,7 +87,9 @@ func TestNewExporterFullConfig(t *testing.T) {
8287
func TestStartAlwaysReturnsNil(t *testing.T) {
8388
config := &Config{
8489
TCPClientSettings: TCPClientSettings{
85-
Endpoint: validEndpoint,
90+
Endpoint: Endpoint{
91+
TCPAddr: validEndpoint,
92+
},
8693
ConnectionTimeout: time.Second * 30,
8794
},
8895
}
@@ -94,7 +101,9 @@ func TestStartAlwaysReturnsNil(t *testing.T) {
94101
func TestStopAlwaysReturnsNil(t *testing.T) {
95102
config := &Config{
96103
TCPClientSettings: TCPClientSettings{
97-
Endpoint: validEndpoint,
104+
Endpoint: Endpoint{
105+
TCPAddr: validEndpoint,
106+
},
98107
ConnectionTimeout: time.Second * 30,
99108
},
100109
}

go.mod

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,55 +7,58 @@ toolchain go1.23.2
77
require (
88
github.com/IBM/fluent-forward-go v0.2.2
99
github.com/cenkalti/backoff/v4 v4.3.0
10-
github.com/stretchr/testify v1.9.0
11-
go.opentelemetry.io/collector/component v0.112.0
12-
go.opentelemetry.io/collector/config/configretry v1.18.0
13-
go.opentelemetry.io/collector/config/configtls v1.18.0
14-
go.opentelemetry.io/collector/confmap v1.18.0
15-
go.opentelemetry.io/collector/exporter v0.112.0
16-
go.opentelemetry.io/collector/pdata v1.18.0
17-
go.opentelemetry.io/otel/metric v1.31.0
18-
go.opentelemetry.io/otel/trace v1.31.0
10+
github.com/stretchr/testify v1.10.0
11+
go.opentelemetry.io/collector/component v0.115.0
12+
go.opentelemetry.io/collector/component/componenttest v0.115.0
13+
go.opentelemetry.io/collector/config/configretry v1.21.0
14+
go.opentelemetry.io/collector/config/configtls v1.21.0
15+
go.opentelemetry.io/collector/confmap v1.21.0
16+
go.opentelemetry.io/collector/exporter v0.115.0
17+
go.opentelemetry.io/collector/pdata v1.21.0
18+
go.opentelemetry.io/otel/metric v1.32.0
19+
go.opentelemetry.io/otel/trace v1.32.0
1920
go.uber.org/zap v1.27.0
2021
)
2122

2223
require (
2324
github.com/davecgh/go-spew v1.1.1 // indirect
24-
github.com/fsnotify/fsnotify v1.7.0 // indirect
25+
github.com/fsnotify/fsnotify v1.8.0 // indirect
2526
github.com/go-logr/logr v1.4.2 // indirect
2627
github.com/go-logr/stdr v1.2.2 // indirect
2728
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
2829
github.com/gogo/protobuf v1.3.2 // indirect
2930
github.com/google/uuid v1.6.0 // indirect
3031
github.com/gorilla/websocket v1.4.2 // indirect
32+
github.com/hashicorp/go-version v1.7.0 // indirect
3133
github.com/json-iterator/go v1.1.12 // indirect
3234
github.com/knadh/koanf/maps v0.1.1 // indirect
3335
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
34-
github.com/knadh/koanf/v2 v2.1.1 // indirect
36+
github.com/knadh/koanf/v2 v2.1.2 // indirect
3537
github.com/mitchellh/copystructure v1.2.0 // indirect
3638
github.com/mitchellh/reflectwalk v1.0.2 // indirect
3739
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
3840
github.com/modern-go/reflect2 v1.0.2 // indirect
3941
github.com/philhofer/fwd v1.1.1 // indirect
4042
github.com/pmezard/go-difflib v1.0.0 // indirect
4143
github.com/tinylib/msgp v1.1.6 // indirect
42-
go.opentelemetry.io/collector/config/configopaque v1.18.0 // indirect
43-
go.opentelemetry.io/collector/config/configtelemetry v0.112.0 // indirect
44-
go.opentelemetry.io/collector/consumer v0.112.0 // indirect
45-
go.opentelemetry.io/collector/consumer/consumererror v0.112.0 // indirect
46-
go.opentelemetry.io/collector/extension v0.112.0 // indirect
47-
go.opentelemetry.io/collector/extension/experimental/storage v0.112.0 // indirect
48-
go.opentelemetry.io/collector/pdata/pprofile v0.112.0 // indirect
49-
go.opentelemetry.io/collector/pipeline v0.112.0 // indirect
50-
go.opentelemetry.io/otel v1.31.0 // indirect
51-
go.opentelemetry.io/otel/sdk v1.31.0 // indirect
52-
go.opentelemetry.io/otel/sdk/metric v1.31.0 // indirect
44+
go.opentelemetry.io/collector/config/configopaque v1.21.0 // indirect
45+
go.opentelemetry.io/collector/config/configtelemetry v0.115.0 // indirect
46+
go.opentelemetry.io/collector/consumer v1.21.0 // indirect
47+
go.opentelemetry.io/collector/consumer/consumererror v0.115.0 // indirect
48+
go.opentelemetry.io/collector/extension v0.115.0 // indirect
49+
go.opentelemetry.io/collector/extension/experimental/storage v0.115.0 // indirect
50+
go.opentelemetry.io/collector/featuregate v1.21.0 // indirect
51+
go.opentelemetry.io/collector/pdata/pprofile v0.115.0 // indirect
52+
go.opentelemetry.io/collector/pipeline v0.115.0 // indirect
53+
go.opentelemetry.io/otel v1.32.0 // indirect
54+
go.opentelemetry.io/otel/sdk v1.32.0 // indirect
55+
go.opentelemetry.io/otel/sdk/metric v1.32.0 // indirect
5356
go.uber.org/multierr v1.11.0 // indirect
5457
golang.org/x/net v0.28.0 // indirect
55-
golang.org/x/sys v0.26.0 // indirect
58+
golang.org/x/sys v0.27.0 // indirect
5659
golang.org/x/text v0.17.0 // indirect
5760
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
5861
google.golang.org/grpc v1.67.1 // indirect
59-
google.golang.org/protobuf v1.35.1 // indirect
62+
google.golang.org/protobuf v1.35.2 // indirect
6063
gopkg.in/yaml.v3 v3.0.1 // indirect
6164
)

0 commit comments

Comments
 (0)