Skip to content

Commit 501058e

Browse files
committed
stream: unwrap underlying error from sarama.ProducerError
1 parent 9f5ec2e commit 501058e

File tree

2 files changed

+57
-7
lines changed

2 files changed

+57
-7
lines changed

internal/incoming/stream/publisher.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,14 @@ func (kp *kafkaProducer) Send(ctx context.Context, m *pubsub.Message) error {
6262
if err != nil {
6363
var producerError sarama.ProducerError
6464
if kp.topic.ErrorAs(err, &producerError) {
65-
return fmt.Errorf("producer error sending message: %w", producerError)
65+
return fmt.Errorf("producer error: %w", producerError.Err)
6666
}
6767
var producerErrors sarama.ProducerErrors
6868
if kp.topic.ErrorAs(err, &producerErrors) {
69-
return fmt.Errorf("producer errors sending message: %w", producerErrors)
69+
if len(producerErrors) > 0 {
70+
return fmt.Errorf("first producer error (of %d) - %w", len(producerErrors), producerErrors[0].Err)
71+
}
72+
return fmt.Errorf("producer errors: %w", producerErrors)
7073
}
7174
return fmt.Errorf("error sending message: %w", err)
7275
}

internal/incoming/stream/publisher_test.go

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ package stream
66

77
import (
88
"context"
9+
"errors"
10+
"strings"
911
"testing"
1012

1113
"github.com/moov-io/achgateway/internal/service"
14+
"github.com/moov-io/base/docker"
1215
"github.com/moov-io/base/log"
1316

1417
"github.com/stretchr/testify/require"
@@ -34,7 +37,7 @@ func TestStream(t *testing.T) {
3437
defer sub.Shutdown(ctx)
3538

3639
// quick send and receive
37-
send(ctx, topic, "hello, world")
40+
send(t, ctx, topic, "hello, world")
3841
if msg, err := receive(ctx, sub); err == nil {
3942
if msg != "hello, world" {
4043
t.Errorf("got %q", msg)
@@ -44,19 +47,63 @@ func TestStream(t *testing.T) {
4447
}
4548
}
4649

47-
func send(ctx context.Context, t Publisher, body string) *pubsub.Message {
50+
func TestStreamErrors(t *testing.T) {
51+
if testing.Short() {
52+
t.Skip("-short flag enabled")
53+
}
54+
if !docker.Enabled() {
55+
t.Skip("Docker not enabled")
56+
}
57+
58+
cfg := &service.Config{
59+
Inbound: service.Inbound{
60+
Kafka: &service.KafkaConfig{
61+
Brokers: []string{"localhost:19092"},
62+
Key: "",
63+
Secret: "",
64+
Topic: "test1",
65+
TLS: false,
66+
},
67+
},
68+
}
69+
ctx := context.Background()
70+
logger := log.NewTestLogger()
71+
72+
topic, err := Topic(logger, cfg)
73+
require.NoError(t, err)
74+
defer topic.Shutdown(ctx)
75+
76+
// Produce a message that's too big
77+
msg := &pubsub.Message{
78+
Body: []byte(strings.Repeat("A", 1e9)),
79+
Metadata: make(map[string]string),
80+
}
81+
err = topic.Send(ctx, msg)
82+
require.ErrorContains(t, err, "kafka server: Message was too large, server rejected it to avoid allocation error")
83+
}
84+
85+
func send(t *testing.T, ctx context.Context, topic Publisher, body string) *pubsub.Message {
86+
t.Helper()
87+
4888
msg := &pubsub.Message{
4989
Body: []byte(body),
5090
Metadata: make(map[string]string),
5191
}
52-
t.Send(ctx, msg)
92+
err := topic.Send(ctx, msg)
93+
if err != nil {
94+
t.Error(err)
95+
}
5396
return msg
5497
}
5598

56-
func receive(ctx context.Context, t Subscription) (string, error) {
57-
msg, err := t.Receive(ctx)
99+
func receive(ctx context.Context, sub Subscription) (string, error) {
100+
msg, err := sub.Receive(ctx)
58101
if err != nil {
59102
return "", err
60103
}
104+
if msg == nil {
105+
return "", errors.New("nil Message received")
106+
}
107+
msg.Ack()
61108
return string(msg.Body), nil
62109
}

0 commit comments

Comments
 (0)