Skip to content

Commit e4c63de

Browse files
committed
use sync.Once - Events should return the same event channel
Signed-off-by: Davanum Srinivas <[email protected]>
1 parent df96b80 commit e4c63de

File tree

3 files changed

+45
-32
lines changed

3 files changed

+45
-32
lines changed

store-client/pkg/client/mongodb_client.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"log/slog"
2121
"strings"
22+
"sync"
2223

2324
"github.com/nvidia/nvsentinel/data-models/pkg/model"
2425
"github.com/nvidia/nvsentinel/store-client/pkg/config"
@@ -887,26 +888,33 @@ type mongoSessionContext struct {
887888
}
888889

889890
type mongoChangeStreamWatcher struct {
890-
watcher *mongoWatcher.ChangeStreamWatcher
891+
watcher *mongoWatcher.ChangeStreamWatcher
892+
eventChan chan Event
893+
initOnce sync.Once
891894
}
892895

893896
func (w *mongoChangeStreamWatcher) Start(ctx context.Context) {
894897
w.watcher.Start(ctx)
895898
}
896899

900+
// Events returns the event channel for this watcher.
901+
// The returned channel is initialized once and cached, so multiple calls
902+
// return the same channel. Safe for concurrent use.
897903
func (w *mongoChangeStreamWatcher) Events() <-chan Event {
898-
bsonChan := w.watcher.Events()
899-
eventChan := make(chan Event)
904+
w.initOnce.Do(func() {
905+
w.eventChan = make(chan Event)
906+
bsonChan := w.watcher.Events()
900907

901-
go func() {
902-
defer close(eventChan)
908+
go func() {
909+
defer close(w.eventChan)
903910

904-
for rawEvent := range bsonChan {
905-
eventChan <- &mongoEvent{rawEvent: rawEvent}
906-
}
907-
}()
911+
for rawEvent := range bsonChan {
912+
w.eventChan <- &mongoEvent{rawEvent: rawEvent}
913+
}
914+
}()
915+
})
908916

909-
return eventChan
917+
return w.eventChan
910918
}
911919

912920
func (w *mongoChangeStreamWatcher) MarkProcessed(ctx context.Context, token []byte) error {

store-client/pkg/datastore/providers/mongodb/adapter.go

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"log/slog"
2121
"os"
2222
"path/filepath"
23+
"sync"
2324

2425
"github.com/nvidia/nvsentinel/store-client/pkg/client"
2526
"github.com/nvidia/nvsentinel/store-client/pkg/config"
@@ -164,7 +165,9 @@ func (a *AdaptedMongoStore) CreateChangeStreamWatcher(ctx context.Context, clien
164165

165166
// AdaptedChangeStreamWatcher adapts our existing change stream watcher to the new interface
166167
type AdaptedChangeStreamWatcher struct {
167-
watcher client.ChangeStreamWatcher
168+
watcher client.ChangeStreamWatcher
169+
eventChan chan datastore.EventWithToken
170+
initOnce sync.Once
168171
}
169172

170173
// NewAdaptedChangeStreamWatcher creates a new adapted change stream watcher
@@ -173,33 +176,36 @@ func NewAdaptedChangeStreamWatcher(watcher client.ChangeStreamWatcher) datastore
173176
}
174177

175178
// Events returns the events channel
179+
// CRITICAL FIX: Only create the channel and goroutine ONCE to prevent event loss
176180
func (a *AdaptedChangeStreamWatcher) Events() <-chan datastore.EventWithToken {
177-
eventChan := make(chan datastore.EventWithToken)
181+
a.initOnce.Do(func() {
182+
a.eventChan = make(chan datastore.EventWithToken)
178183

179-
go func() {
180-
defer close(eventChan)
184+
go func() {
185+
defer close(a.eventChan)
181186

182-
for event := range a.watcher.Events() {
183-
// Convert from our existing Event interface to the new EventWithToken
184-
eventMap := make(map[string]interface{})
187+
for event := range a.watcher.Events() {
188+
// Convert from our existing Event interface to the new EventWithToken
189+
eventMap := make(map[string]interface{})
185190

186-
// Extract the event data
187-
if err := event.UnmarshalDocument(&eventMap); err != nil {
188-
slog.Error("Failed to unmarshal event", "error", err)
189-
continue
190-
}
191+
// Extract the event data
192+
if err := event.UnmarshalDocument(&eventMap); err != nil {
193+
slog.Error("Failed to unmarshal event", "error", err)
194+
continue
195+
}
191196

192-
// Create EventWithToken
193-
eventWithToken := datastore.EventWithToken{
194-
Event: eventMap,
195-
ResumeToken: []byte(""), // We'll need to extract the resume token properly
196-
}
197+
// Create EventWithToken
198+
eventWithToken := datastore.EventWithToken{
199+
Event: eventMap,
200+
ResumeToken: []byte(""), // We'll need to extract the resume token properly
201+
}
197202

198-
eventChan <- eventWithToken
199-
}
200-
}()
203+
a.eventChan <- eventWithToken
204+
}
205+
}()
206+
})
201207

202-
return eventChan
208+
return a.eventChan
203209
}
204210

205211
// Start starts the watcher

tests/helpers/health_events_analyzer.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"context"
1919
"math/rand"
2020
"testing"
21-
"time"
2221

2322
pb "github.com/nvidia/nvsentinel/data-models/pkg/protos"
2423
"github.com/stretchr/testify/require"

0 commit comments

Comments
 (0)