Skip to content

Commit 4222be0

Browse files
authored
Merge pull request #200 from moov-io/expose-producer-errors
stream: unwrap underlying error from sarama.ProducerError
2 parents 321e81a + 501058e commit 4222be0

File tree

4 files changed

+104
-20
lines changed

4 files changed

+104
-20
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ endif
4848
.PHONY: teardown
4949
teardown:
5050
-docker-compose down --remove-orphans
51+
-docker-compose rm -sfv
5152

5253
docker: update
5354
docker build --pull --build-arg VERSION=${VERSION} -t moov/achgateway:${VERSION} -f Dockerfile .

docker-compose.yml

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,20 +47,53 @@ services:
4747
- "2181:2181"
4848

4949
kafka1:
50-
image: wurstmeister/kafka:2.13-2.6.0
51-
depends_on:
52-
- zookeeper
53-
ports:
54-
- "9092:9092"
55-
environment:
56-
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
57-
KAFKA_CREATE_TOPICS: test1:1:1
58-
PORT_COMMAND: "docker port $$(hostname) 9092/tcp | cut -d: -f2"
59-
HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
50+
image: docker.redpanda.com/vectorized/redpanda:v22.3.21
51+
container_name: kafka1
52+
healthcheck:
53+
{
54+
test: curl -f localhost:9644/v1/status/ready,
55+
interval: 1s,
56+
start_period: 30s,
57+
}
6058
volumes:
61-
- /var/run/docker.sock:/var/run/docker.sock
62-
tmpfs: # Run this mysql in memory as its used for testing
63-
- /tmp/kafka-logs
59+
- redpanda-0:/var/lib/redpanda/data
60+
networks:
61+
- intranet
62+
ports:
63+
- 18081:18081
64+
- 18082:18082
65+
- 19092:19092
66+
- 19644:9644
67+
command:
68+
- redpanda
69+
- start
70+
- --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
71+
- --advertise-kafka-addr internal://kafka1:9092,external://localhost:19092
72+
- --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
73+
- --advertise-pandaproxy-addr internal://kafka1:8082,external://localhost:18082
74+
- --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
75+
- --rpc-addr kafka1:33145
76+
- --advertise-rpc-addr kafka1:33145
77+
- --smp 1
78+
- --memory 1G
79+
- --mode dev-container
80+
- --default-log-level=info
81+
82+
topics:
83+
image: docker.redpanda.com/vectorized/redpanda:v22.3.21
84+
depends_on:
85+
kafka1:
86+
condition: service_healthy
87+
networks:
88+
- intranet
89+
command:
90+
- topic
91+
- --brokers kafka1:9092
92+
- create
93+
- ach.outgoing-files
6494

6595
networks:
6696
intranet: {}
97+
98+
volumes:
99+
redpanda-0: null

internal/incoming/stream/publisher.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,14 @@ func (kp *kafkaProducer) Send(ctx context.Context, m *pubsub.Message) error {
6262
if err != nil {
6363
var producerError sarama.ProducerError
6464
if kp.topic.ErrorAs(err, &producerError) {
65-
return fmt.Errorf("producer error sending message: %w", producerError)
65+
return fmt.Errorf("producer error: %w", producerError.Err)
6666
}
6767
var producerErrors sarama.ProducerErrors
6868
if kp.topic.ErrorAs(err, &producerErrors) {
69-
return fmt.Errorf("producer errors sending message: %w", producerErrors)
69+
if len(producerErrors) > 0 {
70+
return fmt.Errorf("first producer error (of %d) - %w", len(producerErrors), producerErrors[0].Err)
71+
}
72+
return fmt.Errorf("producer errors: %w", producerErrors)
7073
}
7174
return fmt.Errorf("error sending message: %w", err)
7275
}

internal/incoming/stream/publisher_test.go

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,12 @@ package stream
66

77
import (
88
"context"
9+
"errors"
10+
"strings"
911
"testing"
1012

1113
"github.com/moov-io/achgateway/internal/service"
14+
"github.com/moov-io/base/docker"
1215
"github.com/moov-io/base/log"
1316

1417
"github.com/stretchr/testify/require"
@@ -34,7 +37,7 @@ func TestStream(t *testing.T) {
3437
defer sub.Shutdown(ctx)
3538

3639
// quick send and receive
37-
send(ctx, topic, "hello, world")
40+
send(t, ctx, topic, "hello, world")
3841
if msg, err := receive(ctx, sub); err == nil {
3942
if msg != "hello, world" {
4043
t.Errorf("got %q", msg)
@@ -44,19 +47,63 @@ func TestStream(t *testing.T) {
4447
}
4548
}
4649

47-
func send(ctx context.Context, t Publisher, body string) *pubsub.Message {
50+
func TestStreamErrors(t *testing.T) {
51+
if testing.Short() {
52+
t.Skip("-short flag enabled")
53+
}
54+
if !docker.Enabled() {
55+
t.Skip("Docker not enabled")
56+
}
57+
58+
cfg := &service.Config{
59+
Inbound: service.Inbound{
60+
Kafka: &service.KafkaConfig{
61+
Brokers: []string{"localhost:19092"},
62+
Key: "",
63+
Secret: "",
64+
Topic: "test1",
65+
TLS: false,
66+
},
67+
},
68+
}
69+
ctx := context.Background()
70+
logger := log.NewTestLogger()
71+
72+
topic, err := Topic(logger, cfg)
73+
require.NoError(t, err)
74+
defer topic.Shutdown(ctx)
75+
76+
// Produce a message that's too big
77+
msg := &pubsub.Message{
78+
Body: []byte(strings.Repeat("A", 1e9)),
79+
Metadata: make(map[string]string),
80+
}
81+
err = topic.Send(ctx, msg)
82+
require.ErrorContains(t, err, "kafka server: Message was too large, server rejected it to avoid allocation error")
83+
}
84+
85+
func send(t *testing.T, ctx context.Context, topic Publisher, body string) *pubsub.Message {
86+
t.Helper()
87+
4888
msg := &pubsub.Message{
4989
Body: []byte(body),
5090
Metadata: make(map[string]string),
5191
}
52-
t.Send(ctx, msg)
92+
err := topic.Send(ctx, msg)
93+
if err != nil {
94+
t.Error(err)
95+
}
5396
return msg
5497
}
5598

56-
func receive(ctx context.Context, t Subscription) (string, error) {
57-
msg, err := t.Receive(ctx)
99+
func receive(ctx context.Context, sub Subscription) (string, error) {
100+
msg, err := sub.Receive(ctx)
58101
if err != nil {
59102
return "", err
60103
}
104+
if msg == nil {
105+
return "", errors.New("nil Message received")
106+
}
107+
msg.Ack()
61108
return string(msg.Body), nil
62109
}

0 commit comments

Comments
 (0)