File tree Expand file tree Collapse file tree 1 file changed +18
-10
lines changed Expand file tree Collapse file tree 1 file changed +18
-10
lines changed Original file line number Diff line number Diff line change @@ -19,6 +19,7 @@ import (
1919 "fmt"
2020 "log/slog"
2121 "strings"
22+ "sync"
2223
2324 "github.com/nvidia/nvsentinel/data-models/pkg/model"
2425 "github.com/nvidia/nvsentinel/store-client/pkg/config"
@@ -887,26 +888,33 @@ type mongoSessionContext struct {
887888}
888889
889890type mongoChangeStreamWatcher struct {
890- watcher * mongoWatcher.ChangeStreamWatcher
891+ watcher * mongoWatcher.ChangeStreamWatcher
892+ eventChan chan Event
893+ initOnce sync.Once
891894}
892895
893896func (w * mongoChangeStreamWatcher ) Start (ctx context.Context ) {
894897 w .watcher .Start (ctx )
895898}
896899
900+ // Events returns the event channel for this watcher.
901+ // The returned channel is initialized once and cached, so multiple calls
902+ // return the same channel. Safe for concurrent use.
897903func (w * mongoChangeStreamWatcher ) Events () <- chan Event {
898- bsonChan := w .watcher .Events ()
899- eventChan := make (chan Event )
904+ w .initOnce .Do (func () {
905+ w .eventChan = make (chan Event )
906+ bsonChan := w .watcher .Events ()
900907
901- go func () {
902- defer close (eventChan )
908+ go func () {
909+ defer close (w . eventChan )
903910
904- for rawEvent := range bsonChan {
905- eventChan <- & mongoEvent {rawEvent : rawEvent }
906- }
907- }()
911+ for rawEvent := range bsonChan {
912+ w .eventChan <- & mongoEvent {rawEvent : rawEvent }
913+ }
914+ }()
915+ })
908916
909- return eventChan
917+ return w . eventChan
910918}
911919
912920func (w * mongoChangeStreamWatcher ) MarkProcessed (ctx context.Context , token []byte ) error {
You can’t perform that action at this time.
0 commit comments