Skip to content

Commit 80f428a

Browse files
authored
Merge pull request #39 from kapetan-io/thrawn/nil-pointer-issue
Add InMemoryListener integration for daemon testing
2 parents c8d1029 + bcaca21 commit 80f428a

File tree

11 files changed

+438
-7
lines changed

11 files changed

+438
-7
lines changed

.github/workflows/lint.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ on:
1111
- main
1212

1313
env:
14-
GOLANGCI_LINT_VERSION: v1.61.0
14+
GOLANGCI_LINT_VERSION: v2.2.1
1515

1616
jobs:
1717
lint:
@@ -34,7 +34,7 @@ jobs:
3434
run: go mod tidy && git diff --exit-code
3535

3636
- name: golangci-lint
37-
uses: golangci/golangci-lint-action@v3
37+
uses: golangci/golangci-lint-action@v8
3838
with:
3939
version: ${{ env.GOLANGCI_LINT_VERSION }}
4040
skip-cache: true # cache/restore is done by actions/setup-go@v3 step

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ coverage.html
1010
.env
1111
docs/notes
1212
docs/plans
13+
testdata/

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
.DEFAULT_GOAL := build
22
LINT = $(GOPATH)/bin/golangci-lint
3-
LINT_VERSION = v1.64.6
3+
LINT_VERSION = v2.2.1
44
VERSION=$(shell git describe --tags --exact-match 2>/dev/null || echo "dev-build")
55

66
.PHONY: install

client.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/kapetan-io/tackle/clock"
1515
"github.com/kapetan-io/tackle/set"
1616
"google.golang.org/protobuf/proto"
17+
"net"
1718
"net/http"
1819
)
1920

@@ -400,6 +401,24 @@ func WithTLS(tls *tls.Config, address string) ClientConfig {
400401
}
401402
}
402403

404+
// WithConn returns ClientConfig suitable for use with a custom net.Conn connection
405+
func WithConn(conn net.Conn) ClientConfig {
406+
return ClientConfig{
407+
Endpoint: "http://memory",
408+
Client: &http.Client{
409+
Transport: &http.Transport{
410+
Dial: func(network, addr string) (net.Conn, error) {
411+
return conn, nil
412+
},
413+
MaxConnsPerHost: 1,
414+
MaxIdleConns: 1,
415+
MaxIdleConnsPerHost: 1,
416+
IdleConnTimeout: 60 * clock.Second,
417+
},
418+
},
419+
}
420+
}
421+
403422
type ItemsWithIDs interface {
404423
GetId() string
405424
}

common_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ func TestMain(m *testing.M) {
5959
}
6060

6161
goleak.VerifyTestMain(m)
62-
//os.Exit(m.Run())
6362
}
6463

6564
// ---------------------------------------------------------------------

daemon/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ type Config struct {
2525
// single `/queue.produce` request including the size of all fields in the marshalled protobuf.
2626
// The default size is 1MB.
2727
MaxProducePayloadSize int64
28+
29+
// InMemoryListener is true if daemon should ignore ListenAddress and use net.Pipe to listen for
30+
// and handle new connections. When true, calls to Daemon.Client() and Daemon.MustClient() will return
31+
// a new instance of the client bound to the client portion of a net.Pipe. This is useful for testing
32+
// querator where access to the loop back is not allowed, or when using testing/synctest
33+
InMemoryListener bool
2834
}
2935

3036
func (c *Config) ClientTLS() *tls.Config {

daemon/daemon.go

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,11 @@ func (d *Daemon) Start(ctx context.Context) error {
8080
), d.conf.MaxProducePayloadSize, d.conf.Log)
8181
registry.MustRegister(handler)
8282

83-
if d.conf.ServerTLS() != nil {
83+
if d.conf.InMemoryListener {
84+
if err := d.spawnInMemory(ctx, handler); err != nil {
85+
return err
86+
}
87+
} else if d.conf.ServerTLS() != nil {
8488
if err := d.spawnHTTPS(ctx, handler); err != nil {
8589
return err
8690
}
@@ -114,12 +118,34 @@ func (d *Daemon) Service() *querator.Service {
114118
func (d *Daemon) MustClient() *querator.Client {
115119
c, err := d.Client()
116120
if err != nil {
117-
panic(fmt.Sprintf("[%s] failed to init daemon client - '%d'", d.conf.InstanceID, err))
121+
panic(fmt.Sprintf("[%s] failed to init daemon client - '%s'", d.conf.InstanceID, err))
118122
}
119123
return c
120124
}
121125

122126
func (d *Daemon) Client() (*querator.Client, error) {
127+
if d.conf.InMemoryListener {
128+
// Create a new net.Pipe for each client connection
129+
clientConn, serverConn := net.Pipe()
130+
131+
// Serve the server side of the pipe through the InMemoryListener
132+
if inMemListener, ok := d.Listener.(*InMemoryListener); ok {
133+
if err := inMemListener.ServeConn(serverConn); err != nil {
134+
_ = clientConn.Close()
135+
_ = serverConn.Close()
136+
return nil, fmt.Errorf("failed to serve connection: %w", err)
137+
}
138+
} else {
139+
_ = clientConn.Close()
140+
_ = serverConn.Close()
141+
return nil, fmt.Errorf("InMemoryListener is enabled but listener is not of type *InMemoryListener")
142+
}
143+
144+
// Create a new client using the client side of the pipe
145+
return querator.NewClient(querator.WithConn(clientConn))
146+
}
147+
148+
// Original logic for non-InMemoryListener clients
123149
var err error
124150
if d.client != nil {
125151
return d.client, nil
@@ -201,3 +227,27 @@ func (d *Daemon) spawnHTTP(ctx context.Context, h http.Handler) error {
201227
d.servers = append(d.servers, srv)
202228
return nil
203229
}
230+
231+
func (d *Daemon) spawnInMemory(ctx context.Context, h http.Handler) error {
232+
srv := &http.Server{
233+
ErrorLog: slog.NewLogLogger(d.conf.Log.Handler(), slog.LevelError),
234+
Handler: h,
235+
}
236+
237+
d.Listener = NewInMemoryListener()
238+
srv.Addr = d.Listener.Addr().String()
239+
240+
d.wg.Add(1)
241+
go func() {
242+
defer d.wg.Done()
243+
if err := srv.Serve(d.Listener); err != nil {
244+
if !errors.Is(err, http.ErrServerClosed) {
245+
d.conf.Log.Error("while starting InMemory server", "error", err)
246+
}
247+
}
248+
}()
249+
250+
d.conf.Log.Info("InMemory Server Started", "address", d.Listener.Addr().String())
251+
d.servers = append(d.servers, srv)
252+
return nil
253+
}

daemon/listener.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package daemon
2+
3+
import (
4+
"io"
5+
"net"
6+
"sync/atomic"
7+
)
8+
9+
// InMemoryListener is intended to be used with testing/synctest
10+
type InMemoryListener struct {
11+
closed chan struct{}
12+
connCh chan net.Conn
13+
isClosed atomic.Bool
14+
}
15+
16+
func NewInMemoryListener() *InMemoryListener {
17+
return &InMemoryListener{
18+
connCh: make(chan net.Conn),
19+
closed: make(chan struct{}),
20+
}
21+
}
22+
23+
func (l *InMemoryListener) ServeConn(conn net.Conn) error {
24+
if l.isClosed.Load() {
25+
return net.ErrClosed
26+
}
27+
28+
l.connCh <- conn
29+
return nil
30+
}
31+
32+
func (l *InMemoryListener) Accept() (net.Conn, error) {
33+
select {
34+
case c := <-l.connCh:
35+
return c, nil
36+
case <-l.closed:
37+
return nil, io.EOF
38+
}
39+
}
40+
41+
func (l *InMemoryListener) Close() error {
42+
if !l.isClosed.CompareAndSwap(false, true) {
43+
return nil
44+
}
45+
close(l.closed)
46+
return nil
47+
}
48+
49+
func (l *InMemoryListener) Addr() net.Addr {
50+
return memAddr("memory-listener")
51+
}
52+
53+
type memAddr string
54+
55+
func (a memAddr) Network() string { return string(a) }
56+
func (a memAddr) String() string { return string(a) }

daemon/listener_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package daemon_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/kapetan-io/querator/daemon"
7+
pb "github.com/kapetan-io/querator/proto"
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
"io"
11+
"net"
12+
"net/http"
13+
"strings"
14+
"sync"
15+
"testing"
16+
"time"
17+
)
18+
19+
func TestInMemoryListener(t *testing.T) {
20+
listener := daemon.NewInMemoryListener()
21+
server := &http.Server{
22+
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
23+
_, _ = fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
24+
}),
25+
}
26+
go func() { _ = server.Serve(listener) }()
27+
defer func() { _ = server.Close() }()
28+
29+
clientCount := 3
30+
var wg sync.WaitGroup
31+
for i := 0; i < clientCount; i++ {
32+
wg.Add(1)
33+
go func(id int) {
34+
defer wg.Done()
35+
// Each client gets its own net.Pipe
36+
serverConn, clientConn := net.Pipe()
37+
_ = listener.ServeConn(serverConn)
38+
39+
// Custom DialContext returns the clientConn for this request
40+
dialOnce := sync.Once{}
41+
42+
client := &http.Client{
43+
Transport: &http.Transport{
44+
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
45+
var c net.Conn
46+
dialOnce.Do(func() { c = clientConn })
47+
return c, nil
48+
},
49+
},
50+
Timeout: 2 * time.Second,
51+
}
52+
53+
url := fmt.Sprintf("http://inmemory/client%d", id)
54+
resp, err := client.Get(url)
55+
if err != nil {
56+
t.Errorf("client %d error: %v", id, err)
57+
return
58+
}
59+
defer func() { _ = resp.Body.Close() }()
60+
body, _ := io.ReadAll(resp.Body)
61+
expected := fmt.Sprintf("Hello, client%d!", id)
62+
if !strings.Contains(string(body), expected) {
63+
t.Errorf("client %d got unexpected body: %q", id, body)
64+
}
65+
}(i)
66+
}
67+
wg.Wait()
68+
_ = listener.Close()
69+
}
70+
71+
func TestDaemonInMemoryListener(t *testing.T) {
72+
ctx := context.Background()
73+
74+
// Create daemon with InMemoryListener enabled
75+
d, err := daemon.NewDaemon(ctx, daemon.Config{
76+
InMemoryListener: true,
77+
})
78+
require.NoError(t, err)
79+
defer func() { _ = d.Shutdown(ctx) }()
80+
81+
// Get a client - should create a new net.Pipe connection
82+
{
83+
client, err := d.Client()
84+
require.NoError(t, err)
85+
86+
// Test QueuesList to verify the connection works
87+
var resp pb.QueuesListResponse
88+
err = client.QueuesList(ctx, &resp, nil)
89+
require.NoError(t, err)
90+
91+
// Verify we got an empty list
92+
assert.Nil(t, resp.Items)
93+
94+
// Should work a second time also
95+
err = client.QueuesList(ctx, &resp, nil)
96+
require.NoError(t, err)
97+
98+
// Verify we got an empty list
99+
assert.Nil(t, resp.Items)
100+
}
101+
102+
// Test getting multiple clients - each should work independently
103+
{
104+
client, err := d.Client()
105+
require.NoError(t, err)
106+
107+
// Test QueuesList to verify the connection works
108+
var resp pb.QueuesListResponse
109+
err = client.QueuesList(ctx, &resp, nil)
110+
require.NoError(t, err)
111+
112+
// Verify we got an empty list
113+
assert.Nil(t, resp.Items)
114+
}
115+
}

0 commit comments

Comments
 (0)