From 822798a1e0db2683126ca4bfb9bf5bed98fd43cb Mon Sep 17 00:00:00 2001 From: Jake Bailey <5341706+jakebailey@users.noreply.github.com> Date: Thu, 25 Jun 2026 14:40:49 -0700 Subject: [PATCH 1/6] Use shared FSEvents streams --- internal/fswatch/fsevents_darwin.go | 328 +++++++++++++----- internal/fswatch/fsevents_darwin_ffi.go | 8 +- .../fswatch/fsevents_darwin_shared_test.go | 106 ++++++ 3 files changed, 342 insertions(+), 100 deletions(-) create mode 100644 internal/fswatch/fsevents_darwin_shared_test.go diff --git a/internal/fswatch/fsevents_darwin.go b/internal/fswatch/fsevents_darwin.go index fe99bd1f36b..604da32d97f 100644 --- a/internal/fswatch/fsevents_darwin.go +++ b/internal/fswatch/fsevents_darwin.go @@ -7,6 +7,10 @@ import ( "fmt" "os" "runtime" + "slices" + "sort" + "strings" + "sync" "sync/atomic" "syscall" "unsafe" @@ -31,13 +35,14 @@ import ( // │ fsEventsBackend │ // │ (no event loop; start() just signals readiness) │ // │ │ -// │ subscribe() per directory: │ +// │ subscribe/closeWatch rebuild the shared stream set: │ // │ │ │ // │ ▼ │ // │ ┌─────────────────────────────────────────────────────┐ │ -// │ │ fseventsState │ │ +// │ │ fseventsStream[] │ │ // │ │ │ │ -// │ │ FSEventStream ──► per-stream GCD dispatch queue │ │ +// │ │ each FSEventStream watches up to N paths ────────► │ │ +// │ │ per-stream GCD dispatch queue │ │ // │ │ (UseCFTypes | FileEvents = 0x11) │ │ // │ │ │ │ // │ │ callback fires on GCD thread: │ │ @@ -60,15 +65,14 @@ import ( // that GCD thread in the C calling convention. It never enters Go ABI. // It retains/copies the callback payload and passes it to Go through // eventPipe. -// - One Go goroutine per stream (eventLoop, in fsevents_darwin_ffi.go) +// - One Go goroutine per stream chunk (eventLoop, in fsevents_darwin_ffi.go) // blocks on eventFile.Read(), integrated with Go's netpoll so it parks -// without consuming an OS thread. When woken by the asm callback, it -// calls fsEventsCallback() to classify events and post them to the -// dirWatch's eventList. -// - subscribe/closeWatch and stream lifecycle (startStream/stopStream) -// run on the caller's goroutine under watcherBase.mu. Stream teardown -// uses atomic.Swap on the stream pointer so that only one of -// (Close, callback's deleted-root path) performs cleanup. +// without consuming an OS thread. When woken by the asm callback, it calls +// fsEventsCallback() to classify events and route them to matching +// dirWatch event lists. +// - subscribe/closeWatch rebuild the stream chunks on the caller's goroutine +// under watcherBase.mu. Old streams are swapped out before teardown so a +// callback cannot deadlock against stream teardown while routing events. // // Callback delivery: // dirWatch.notify() posts to the shared process-wide debouncer. After a @@ -79,11 +83,11 @@ import ( // per-subscriber before delivery. // // WatchDirectory flow (caller goroutine): -// subscribe → startStream: create an FSEventStream with -// kFSEventStreamEventIdSinceNow and start it on its own serial GCD -// queue. No directory walk or tree is needed; FSEvents watches -// recursively via the kernel, and event classification uses only the -// flags. +// subscribe/closeWatch snapshots active dirWatches and creates one or more +// FSEventStreams with kFSEventStreamEventIdSinceNow. Each stream receives a +// chunk of physical watch roots. No directory walk or tree is needed; +// FSEvents watches recursively via the kernel, and event classification uses +// only the flags. // // Event classification (fsEventsCallback, on eventLoop goroutine): // Each batch delivers arrays of paths, flags, and event IDs. The flags @@ -154,12 +158,11 @@ type fsEventStreamContext struct { copyDescription uintptr } -// fseventsState. -// -// stream is claimed by Close with an atomic Swap. Root deletion is detected in -// the callback, but teardown is deferred to Close because the assembly callback -// is still waiting on the done pipe while fsEventsCallback runs. type fseventsState struct { + terminated atomic.Bool +} + +type fseventsStream struct { stream atomic.Uintptr cb *streamCallback pinner runtime.Pinner @@ -170,6 +173,10 @@ type fseventsState struct { // fsEventsBackend. type fsEventsBackend struct { watcherBase + + mu sync.Mutex + watches map[*dirWatch]*fseventsState + streams []*fseventsStream } func init() { @@ -177,7 +184,9 @@ func init() { } func newFSEventsBackend() *fsEventsBackend { - b := &fsEventsBackend{} + b := &fsEventsBackend{ + watches: make(map[*dirWatch]*fseventsState), + } b.watcherBase.init(b) return b } @@ -200,9 +209,8 @@ func checkWatcher(w *dirWatch) error { } var ( - errMissingFSEventsState = errors.New("fsevents: missing state") - errStreamCreateNull = errors.New("FSEventStreamCreate returned NULL") - errStreamStartFailed = errors.New("error starting FSEvents stream") + errStreamCreateNull = errors.New("FSEventStreamCreate returned NULL") + errStreamStartFailed = errors.New("error starting FSEvents stream") ) var ( @@ -211,31 +219,93 @@ var ( errFSEventsTooMany = fmt.Errorf("too many events: %w", ErrOverflow) ) -// startStream creates and starts an FSEventStream on its per-stream -// serial dispatch queue. -func (b *fsEventsBackend) startStream(w *dirWatch, since uint64) error { - if err := checkWatcher(w); err != nil { - return err +const fseventsPathsPerStream = 512 + +type fseventsWatchSnapshot struct { + w *dirWatch + state *fseventsState +} + +func (b *fsEventsBackend) activeWatchesLocked() []fseventsWatchSnapshot { + watches := make([]fseventsWatchSnapshot, 0, len(b.watches)) + for w, state := range b.watches { + if state.terminated.Load() { + continue + } + watches = append(watches, fseventsWatchSnapshot{w: w, state: state}) } + slices.SortFunc(watches, func(a, b fseventsWatchSnapshot) int { + return strings.Compare(a.w.physicalDir, b.w.physicalDir) + }) + return watches +} - state, _ := w.state.(*fseventsState) - if state == nil { - return errMissingFSEventsState +func (b *fsEventsBackend) startStreams(watches []fseventsWatchSnapshot) ([]*fseventsStream, error) { + if len(watches) == 0 { + return nil, nil + } + seen := make(map[string]struct{}, len(watches)) + paths := make([]string, 0, len(watches)) + for _, watch := range watches { + path := watch.w.physicalDir + if _, ok := seen[path]; ok { + continue + } + seen[path] = struct{}{} + paths = append(paths, path) + } + sort.Strings(paths) + + streams := make([]*fseventsStream, 0, (len(paths)+fseventsPathsPerStream-1)/fseventsPathsPerStream) + for len(paths) > 0 { + chunkLen := min(len(paths), fseventsPathsPerStream) + stream, err := b.startStream(paths[:chunkLen]) + if err != nil { + stopFSEventsStreams(streams) + return nil, err + } + streams = append(streams, stream) + paths = paths[chunkLen:] + } + return streams, nil +} + +// startStream creates and starts one FSEventStream watching all supplied paths. +func (b *fsEventsBackend) startStream(paths []string) (*fseventsStream, error) { + if len(paths) == 0 { + return nil, nil } - dirCStr := append([]byte(w.physicalDir), 0) - cfDir := cfStringCreate(0, unsafe.Pointer(&dirCStr[0]), cfStringEncodingUTF8) - defer cfRelease(cfDir) + cfStrings := make([]uintptr, 0, len(paths)) + for _, path := range paths { + dirCStr := append([]byte(path), 0) + cfDir := cfStringCreate(0, unsafe.Pointer(&dirCStr[0]), cfStringEncodingUTF8) + if cfDir == 0 { + for _, cfString := range cfStrings { + cfRelease(cfString) + } + return nil, errStreamCreateNull + } + cfStrings = append(cfStrings, cfDir) + } + defer func() { + for _, cfString := range cfStrings { + cfRelease(cfString) + } + }() - pathsToWatch := cfArrayCreate(0, unsafe.Pointer(&cfDir), 1, 0) + pathsToWatch := cfArrayCreate(0, unsafe.Pointer(&cfStrings[0]), len(cfStrings), 0) + if pathsToWatch == 0 { + return nil, errStreamCreateNull + } defer cfRelease(pathsToWatch) - cb, err := newStreamCallback(w) + cb, err := newStreamCallback(b) if err != nil { - return &dirWatchError{err: err, dirWatch: w} + return nil, err } + state := &fseventsStream{cb: cb} state.pinner.Pin(cb) - state.cb = cb ctx := fsEventStreamContext{info: uintptr(unsafe.Pointer(cb))} @@ -244,14 +314,14 @@ func (b *fsEventsBackend) startStream(w *dirWatch, since uint64) error { fsEventsCallbackAsmAddr, unsafe.Pointer(&ctx), pathsToWatch, - since, + eventIDSinceNow, 0.001, ) if stream == 0 { cb.close() state.cb = nil state.pinner.Unpin() - return &dirWatchError{err: errStreamCreateNull, dirWatch: w} + return nil, errStreamCreateNull } fsEventStreamSetDispatchQueue(stream, cb.queue) @@ -261,11 +331,11 @@ func (b *fsEventsBackend) startStream(w *dirWatch, since uint64) error { cb.close() state.cb = nil state.pinner.Unpin() - return &dirWatchError{err: errStreamStartFailed, dirWatch: w} + return nil, errStreamStartFailed } fsEventStreamFlushSync(stream) state.stream.Store(stream) - return nil + return state, nil } // teardownStream performs the full FSEventStream cleanup. Stop and Invalidate @@ -284,10 +354,15 @@ func teardownStream(stream uintptr, cb *streamCallback) { fsEventStreamRelease(stream) } -// stopStream tears down a stream if WatchDirectory successfully started one. // The atomic Swap gates teardown so concurrent or repeated calls are safe: // only the goroutine that observes a non-zero stream performs the cleanup. -func (b *fsEventsBackend) stopStream(state *fseventsState) { +func stopFSEventsStreams(streams []*fseventsStream) { + for _, stream := range streams { + stopFSEventsStream(stream) + } +} + +func stopFSEventsStream(state *fseventsStream) { if state == nil { return } @@ -303,9 +378,35 @@ func (b *fsEventsBackend) stopStream(state *fseventsState) { // subscribe mirrors `fsEventsBackend::subscribe`. func (b *fsEventsBackend) subscribe(w *dirWatch) error { + if err := checkWatcher(w); err != nil { + return err + } + state := &fseventsState{} w.state = state - return b.startStream(w, eventIDSinceNow) + + b.mu.Lock() + b.watches[w] = state + watches := b.activeWatchesLocked() + b.mu.Unlock() + + streams, err := b.startStreams(watches) + if err != nil { + b.mu.Lock() + if b.watches[w] == state { + delete(b.watches, w) + w.state = nil + } + b.mu.Unlock() + return &dirWatchError{err: err, dirWatch: w} + } + + b.mu.Lock() + oldStreams := b.streams + b.streams = streams + b.mu.Unlock() + stopFSEventsStreams(oldStreams) + return nil } // closeWatch mirrors `fsEventsBackend::closeWatch`. @@ -315,7 +416,23 @@ func (b *fsEventsBackend) closeWatch(w *dirWatch) error { if state == nil { return nil } - b.stopStream(state) + state.terminated.Store(true) + + b.mu.Lock() + delete(b.watches, w) + watches := b.activeWatchesLocked() + b.mu.Unlock() + + streams, err := b.startStreams(watches) + if err != nil { + return err + } + + b.mu.Lock() + oldStreams := b.streams + b.streams = streams + b.mu.Unlock() + stopFSEventsStreams(oldStreams) return nil } @@ -344,8 +461,8 @@ func fsEventsCallback(cb *streamCallback, payload *fsEventsCallbackPayload) { paths := payload.paths flags := payload.flags - w := cb.dirWatch - deletedRoot := false + watches := cb.backend.activeWatchesSnapshot() + touched := map[*dirWatch]struct{}{} for i := range numEvents { flag := *(*uint32)(unsafe.Add(nil, flags+i*flagSize)) @@ -361,18 +478,22 @@ func fsEventsCallback(cb *streamCallback, payload *fsEventsCallbackPayload) { isDone := flag&flagHistoryDone != 0 if flag&flagMustScanSubDirs != 0 { + var overflow error switch { case flag&flagUserDropped != 0: - w.events.setError(errFSEventsUserDropped) + overflow = errFSEventsUserDropped case flag&flagKernelDropped != 0: - w.events.setError(errFSEventsKernelDropped) + overflow = errFSEventsKernelDropped default: - w.events.setError(errFSEventsTooMany) + overflow = errFSEventsTooMany + } + for _, watch := range watches { + watch.w.events.setError(overflow) + touched[watch.w] = struct{}{} } } if isDone { - w.notify() break } @@ -380,58 +501,73 @@ func fsEventsCallback(cb *streamCallback, payload *fsEventsCallbackPayload) { continue } - // Skip events for the watched directory itself unless it's been - // removed. fseventsd reports a change on the watched dir when a - // child is added or removed; subscribers observe changes *within* - // the directory, not the dir's own metadata churn. - // (A removal of the dir is still propagated because Watcher - // relies on it to tear down the stream.) rawPath := path - path = w.displayPath(path) - - if path == w.dir && !isRemoved && !isRenamed { - continue - } + pathExists := false + pathExistsKnown := false + + for _, watch := range watches { + w := watch.w + displayPath, ok := fseventsDisplayPath(w, rawPath) + if !ok { + continue + } - switch { - case isRemoved && !isCreated: - // Pure remove, or remove+rename: file is gone. - w.events.remove(path) - if path == w.dir { - deletedRoot = true + // Skip events for the watched directory itself unless it's been + // removed. fseventsd reports a change on the watched dir when a + // child is added or removed; subscribers observe changes *within* + // the directory, not the dir's own metadata churn. + // (A removal of the dir is still propagated because Watcher + // relies on it to tear down the stream.) + if displayPath == w.dir && !isRemoved && !isRenamed { + continue } - case isRenamed || (isRemoved && isCreated): - // Ambiguous: rename could mean moved away (delete) or - // moved in (update); remove+create could mean replaced. - // Stat to check existence. - var st unix.Stat_t - if unix.Lstat(rawPath, &st) != nil { - w.events.remove(path) - if path == w.dir { - deletedRoot = true + + switch { + case isRemoved && !isCreated: + w.events.remove(displayPath) + if displayPath == w.dir { + watch.state.terminated.Store(true) + w.events.setError(fmt.Errorf("%w: watched directory removed", ErrWatchTerminated)) + } + case isRenamed || (isRemoved && isCreated): + if !pathExistsKnown { + var st unix.Stat_t + pathExists = unix.Lstat(rawPath, &st) == nil + pathExistsKnown = true + } + if pathExists { + w.events.update(displayPath) + } else { + w.events.remove(displayPath) + if displayPath == w.dir { + watch.state.terminated.Store(true) + w.events.setError(fmt.Errorf("%w: watched directory removed", ErrWatchTerminated)) + } } - } else { - w.events.update(path) + default: + w.events.update(displayPath) } - default: - // Create, modify, or any other flag combo. - w.events.update(path) + touched[w] = struct{}{} } } - if deletedRoot { - // Surface ErrWatchTerminated alongside the delete event so the - // caller knows no further events will arrive. Stream teardown is - // still deferred to Close. - w.events.setError(fmt.Errorf("%w: watched directory removed", ErrWatchTerminated)) + for w := range touched { + w.notify() } +} - w.notify() +func (b *fsEventsBackend) activeWatchesSnapshot() []fseventsWatchSnapshot { + b.mu.Lock() + defer b.mu.Unlock() + return b.activeWatchesLocked() +} - if deletedRoot { - // The watched root was deleted. Mark the callback as closed so - // future callbacks are no-ops. Stream teardown and pipe cleanup - // are deferred to Close. - cb.closed.Store(true) +func fseventsDisplayPath(w *dirWatch, rawPath string) (string, bool) { + if isInDirectoryOrSelf(w.physicalDir, rawPath) { + return w.displayPath(rawPath), true + } + if w.physicalDir != w.dir && isInDirectoryOrSelf(w.dir, rawPath) { + return rawPath, true } + return "", false } diff --git a/internal/fswatch/fsevents_darwin_ffi.go b/internal/fswatch/fsevents_darwin_ffi.go index cd580789602..cfee33f0e05 100644 --- a/internal/fswatch/fsevents_darwin_ffi.go +++ b/internal/fswatch/fsevents_darwin_ffi.go @@ -75,7 +75,7 @@ import ( // asm: return to FSEvents fsEventsCallback(cb, payload) // classifies events, // frees payload, -// posts to dirWatch.events +// routes to matching dirWatch events // // The assembly callback never enters Go ABI; it stays entirely in C // context. One pipe per stream hands retained/copied callback payloads from @@ -452,7 +452,7 @@ type streamCallback struct { eventFile *os.File queue uintptr // per-stream serial dispatch queue done chan struct{} - dirWatch *dirWatch + backend *fsEventsBackend closed atomic.Bool } @@ -478,7 +478,7 @@ func (p *fsEventsCallbackPayload) close() { // callbacks. The per-stream serial queue serializes this stream's callbacks // and prevents cross-stream head-of-line blocking that a process-wide serial // queue would cause. -func newStreamCallback(w *dirWatch) (*streamCallback, error) { +func newStreamCallback(backend *fsEventsBackend) (*streamCallback, error) { var eventPipe [2]int if err := unix.Pipe(eventPipe[:]); err != nil { return nil, err @@ -500,7 +500,7 @@ func newStreamCallback(w *dirWatch) (*streamCallback, error) { eventFile: os.NewFile(uintptr(eventPipe[0]), "fsevents-event"), queue: queue, done: make(chan struct{}), - dirWatch: w, + backend: backend, } go cb.eventLoop() return cb, nil diff --git a/internal/fswatch/fsevents_darwin_shared_test.go b/internal/fswatch/fsevents_darwin_shared_test.go new file mode 100644 index 00000000000..7a529a555af --- /dev/null +++ b/internal/fswatch/fsevents_darwin_shared_test.go @@ -0,0 +1,106 @@ +//go:build darwin && (amd64 || arm64) + +package fswatch + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" +) + +func newTestFSEventsWatcher(impl **fsEventsBackend) Watcher { + return &watcher{ + name: "fsevents", + factory: func() watcherImpl { + *impl = newFSEventsBackend() + return *impl + }, + } +} + +func TestFSEventsSharedStreamAcrossWatches(t *testing.T) { + t.Parallel() + + var impl *fsEventsBackend + watcherImpl := newTestFSEventsWatcher(&impl) + root := newTmpDir(t) + + var subs []Watch + for i := range 5 { + dir := filepath.Join(root, fmt.Sprintf("dir%d", i)) + if err := os.MkdirAll(dir, 0o755); err != nil { + t.Fatal(err) + } + sub, err := watcherImpl.WatchDirectory(dir, func([]Event, error) {}) + if err != nil { + t.Fatal(err) + } + subs = append(subs, sub) + } + t.Cleanup(func() { + for _, sub := range subs { + _ = sub.Close() + } + }) + + impl.mu.Lock() + streamCount := len(impl.streams) + watchCount := len(impl.watches) + impl.mu.Unlock() + if streamCount != 1 { + t.Fatalf("expected one shared FSEvents stream, got %d", streamCount) + } + if watchCount != len(subs) { + t.Fatalf("expected %d logical watches, got %d", len(subs), watchCount) + } +} + +func TestFSEventsSharedStreamRoutesEvents(t *testing.T) { + t.Parallel() + + var impl *fsEventsBackend + watcherImpl := newTestFSEventsWatcher(&impl) + root := newTmpDir(t) + dirA := filepath.Join(root, "a") + dirB := filepath.Join(root, "b") + if err := os.MkdirAll(dirA, 0o755); err != nil { + t.Fatal(err) + } + if err := os.MkdirAll(dirB, 0o755); err != nil { + t.Fatal(err) + } + + time.Sleep(preSubscribeSleep(watcherImpl)) + recA := newRecorder(t) + recA.watcher = watcherImpl + subA, err := watcherImpl.WatchDirectory(dirA, recA.callback) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = subA.Close() }) + + recB := newRecorder(t) + recB.watcher = watcherImpl + subB, err := watcherImpl.WatchDirectory(dirB, recB.callback) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = subB.Close() }) + time.Sleep(settleSleep(watcherImpl)) + + fileA := filepath.Join(dirA, "file.ts") + if err := os.WriteFile(fileA, []byte("export {}"), 0o644); err != nil { + t.Fatal(err) + } + expectContains(t, recA, EventUpdate, fileA) + assertNoEventsForPath(t, recB.drainQuiet(500*time.Millisecond), fileA, "sibling watch saw event") + + fileB := filepath.Join(dirB, "file.ts") + if err := os.WriteFile(fileB, []byte("export {}"), 0o644); err != nil { + t.Fatal(err) + } + expectContains(t, recB, EventUpdate, fileB) + assertNoEventsForPath(t, recA.drainQuiet(500*time.Millisecond), fileB, "sibling watch saw event") +} From c2640d444c1d529c52955e82d8f93572b573d6dd Mon Sep 17 00:00:00 2001 From: Jake Bailey <5341706+jakebailey@users.noreply.github.com> Date: Thu, 25 Jun 2026 15:16:13 -0700 Subject: [PATCH 2/6] Try single FSEvents stream before chunking --- internal/fswatch/fsevents_darwin.go | 18 +++++++--- .../fswatch/fsevents_darwin_shared_test.go | 33 +++++++++++++++++++ 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/internal/fswatch/fsevents_darwin.go b/internal/fswatch/fsevents_darwin.go index 604da32d97f..a76e5c782ae 100644 --- a/internal/fswatch/fsevents_darwin.go +++ b/internal/fswatch/fsevents_darwin.go @@ -241,6 +241,10 @@ func (b *fsEventsBackend) activeWatchesLocked() []fseventsWatchSnapshot { } func (b *fsEventsBackend) startStreams(watches []fseventsWatchSnapshot) ([]*fseventsStream, error) { + return startFSEventsStreams(watches, b.startStream) +} + +func startFSEventsStreams(watches []fseventsWatchSnapshot, startStream func([]string) (*fseventsStream, error)) ([]*fseventsStream, error) { if len(watches) == 0 { return nil, nil } @@ -256,16 +260,22 @@ func (b *fsEventsBackend) startStreams(watches []fseventsWatchSnapshot) ([]*fsev } sort.Strings(paths) + stream, err := startStream(paths) + if err == nil { + return []*fseventsStream{stream}, nil + } + streams := make([]*fseventsStream, 0, (len(paths)+fseventsPathsPerStream-1)/fseventsPathsPerStream) - for len(paths) > 0 { - chunkLen := min(len(paths), fseventsPathsPerStream) - stream, err := b.startStream(paths[:chunkLen]) + remainingPaths := paths + for len(remainingPaths) > 0 { + chunkLen := min(len(remainingPaths), fseventsPathsPerStream) + stream, err := startStream(remainingPaths[:chunkLen]) if err != nil { stopFSEventsStreams(streams) return nil, err } streams = append(streams, stream) - paths = paths[chunkLen:] + remainingPaths = remainingPaths[chunkLen:] } return streams, nil } diff --git a/internal/fswatch/fsevents_darwin_shared_test.go b/internal/fswatch/fsevents_darwin_shared_test.go index 7a529a555af..456c9b694c8 100644 --- a/internal/fswatch/fsevents_darwin_shared_test.go +++ b/internal/fswatch/fsevents_darwin_shared_test.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path/filepath" + "slices" "testing" "time" ) @@ -104,3 +105,35 @@ func TestFSEventsSharedStreamRoutesEvents(t *testing.T) { expectContains(t, recB, EventUpdate, fileB) assertNoEventsForPath(t, recA.drainQuiet(500*time.Millisecond), fileB, "sibling watch saw event") } + +func TestFSEventsSharedStreamFallsBackToChunks(t *testing.T) { + t.Parallel() + + const count = fseventsPathsPerStream*2 + 1 + watches := make([]fseventsWatchSnapshot, 0, count) + for i := range count { + watches = append(watches, fseventsWatchSnapshot{ + w: &dirWatch{physicalDir: fmt.Sprintf("/watch/dir%04d", i)}, + state: &fseventsState{}, + }) + } + + var calls []int + streams, err := startFSEventsStreams(watches, func(paths []string) (*fseventsStream, error) { + calls = append(calls, len(paths)) + if len(calls) == 1 { + return nil, errStreamStartFailed + } + return &fseventsStream{}, nil + }) + if err != nil { + t.Fatal(err) + } + if len(streams) != 3 { + t.Fatalf("expected 3 chunked streams, got %d", len(streams)) + } + wantCalls := []int{count, fseventsPathsPerStream, fseventsPathsPerStream, 1} + if !slices.Equal(calls, wantCalls) { + t.Fatalf("startStream calls = %v, want %v", calls, wantCalls) + } +} From 2f755c6c0108d4f6e479df51fd29ef72b7b6523a Mon Sep 17 00:00:00 2001 From: Jake Bailey <5341706+jakebailey@users.noreply.github.com> Date: Thu, 25 Jun 2026 15:21:33 -0700 Subject: [PATCH 3/6] Document shared FSEvents streams --- internal/fswatch/CHANGES.md | 10 ++++++++++ internal/fswatch/fsevents_darwin.go | 10 ++++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/internal/fswatch/CHANGES.md b/internal/fswatch/CHANGES.md index 01ecfd975d5..1d627ace83a 100644 --- a/internal/fswatch/CHANGES.md +++ b/internal/fswatch/CHANGES.md @@ -127,6 +127,16 @@ can't starve event delivery on any of the others. In practice most callers will only ever use one backend (`Default()`), so this mainly matters for processes that mix backends, but the cost of the split is essentially nothing. +### Shared FSEvents streams + +Upstream opens one macOS FSEventStream per subscription. Go's FSEvents backend +shares streams across all logical directory watches in a backend instance. The +fast path attempts one stream containing every active physical watch root; if +that stream cannot be started, the backend retries with bounded path chunks. +Events from shared streams are routed back to matching logical watches by path, +so non-recursive and per-subscriber ignore semantics are preserved while using +far fewer system-wide FSEvents stream slots. + ## New backends **fanotify** (Linux, kernel ≥ 5.13) is the default on Linux when available. It diff --git a/internal/fswatch/fsevents_darwin.go b/internal/fswatch/fsevents_darwin.go index a76e5c782ae..61f71771992 100644 --- a/internal/fswatch/fsevents_darwin.go +++ b/internal/fswatch/fsevents_darwin.go @@ -209,8 +209,10 @@ func checkWatcher(w *dirWatch) error { } var ( - errStreamCreateNull = errors.New("FSEventStreamCreate returned NULL") - errStreamStartFailed = errors.New("error starting FSEvents stream") + errCFStringCreateNull = errors.New("CFStringCreate returned NULL") + errCFArrayCreateNull = errors.New("CFArrayCreate returned NULL") + errStreamCreateNull = errors.New("FSEventStreamCreate returned NULL") + errStreamStartFailed = errors.New("error starting FSEvents stream") ) var ( @@ -294,7 +296,7 @@ func (b *fsEventsBackend) startStream(paths []string) (*fseventsStream, error) { for _, cfString := range cfStrings { cfRelease(cfString) } - return nil, errStreamCreateNull + return nil, errCFStringCreateNull } cfStrings = append(cfStrings, cfDir) } @@ -306,7 +308,7 @@ func (b *fsEventsBackend) startStream(paths []string) (*fseventsStream, error) { pathsToWatch := cfArrayCreate(0, unsafe.Pointer(&cfStrings[0]), len(cfStrings), 0) if pathsToWatch == 0 { - return nil, errStreamCreateNull + return nil, errCFArrayCreateNull } defer cfRelease(pathsToWatch) From e695074b27d313c7cd9fc73836d09561b242602a Mon Sep 17 00:00:00 2001 From: Jake Bailey <5341706+jakebailey@users.noreply.github.com> Date: Thu, 25 Jun 2026 15:38:16 -0700 Subject: [PATCH 4/6] Scope shared FSEvents callbacks to stream paths --- internal/fswatch/fsevents_darwin.go | 34 +++++++++++-------- internal/fswatch/fsevents_darwin_ffi.go | 7 ++-- .../fswatch/fsevents_darwin_shared_test.go | 24 +++++++++++++ 3 files changed, 47 insertions(+), 18 deletions(-) diff --git a/internal/fswatch/fsevents_darwin.go b/internal/fswatch/fsevents_darwin.go index 61f71771992..dffea7919c2 100644 --- a/internal/fswatch/fsevents_darwin.go +++ b/internal/fswatch/fsevents_darwin.go @@ -9,7 +9,6 @@ import ( "runtime" "slices" "sort" - "strings" "sync" "sync/atomic" "syscall" @@ -101,8 +100,9 @@ import ( // flagMustScanSubDirs → ErrOverflow with detail (user/kernel/too-many). // // Root deletion: -// Detected in the callback; cb.closed is set so future callbacks are -// no-ops. Stream teardown is deferred to Close. +// Detected in the callback; the logical watch is marked terminated and +// receives ErrWatchTerminated. The shared stream remains active for other +// watches until the owner closes or reconciles the terminated watch. // --------------------------------------------------------------------------- // ----- FSEvents flag bits (from FSEvents.h) ------------------------------ @@ -166,6 +166,7 @@ type fseventsStream struct { stream atomic.Uintptr cb *streamCallback pinner runtime.Pinner + paths []string } // ----- the watcherImpl ------------------------------------------------------- @@ -236,9 +237,6 @@ func (b *fsEventsBackend) activeWatchesLocked() []fseventsWatchSnapshot { } watches = append(watches, fseventsWatchSnapshot{w: w, state: state}) } - slices.SortFunc(watches, func(a, b fseventsWatchSnapshot) int { - return strings.Compare(a.w.physicalDir, b.w.physicalDir) - }) return watches } @@ -312,11 +310,11 @@ func (b *fsEventsBackend) startStream(paths []string) (*fseventsStream, error) { } defer cfRelease(pathsToWatch) - cb, err := newStreamCallback(b) + cb, err := newStreamCallback(b, paths) if err != nil { return nil, err } - state := &fseventsStream{cb: cb} + state := &fseventsStream{cb: cb, paths: slices.Clone(paths)} state.pinner.Pin(cb) ctx := fsEventStreamContext{info: uintptr(unsafe.Pointer(cb))} @@ -457,10 +455,6 @@ func (b *fsEventsBackend) closeWatch(w *dirWatch) error { func fsEventsCallback(cb *streamCallback, payload *fsEventsCallbackPayload) { defer payload.close() - if cb.closed.Load() { - return - } - const ( flagSize = unsafe.Sizeof(uint32(0)) ) @@ -473,7 +467,7 @@ func fsEventsCallback(cb *streamCallback, payload *fsEventsCallbackPayload) { paths := payload.paths flags := payload.flags - watches := cb.backend.activeWatchesSnapshot() + watches := cb.backend.activeWatchesSnapshot(cb.paths) touched := map[*dirWatch]struct{}{} for i := range numEvents { @@ -568,10 +562,20 @@ func fsEventsCallback(cb *streamCallback, payload *fsEventsCallbackPayload) { } } -func (b *fsEventsBackend) activeWatchesSnapshot() []fseventsWatchSnapshot { +func (b *fsEventsBackend) activeWatchesSnapshot(paths []string) []fseventsWatchSnapshot { b.mu.Lock() defer b.mu.Unlock() - return b.activeWatchesLocked() + watches := b.activeWatchesLocked() + if len(paths) == 0 { + return watches + } + filtered := watches[:0] + for _, watch := range watches { + if slices.Contains(paths, watch.w.physicalDir) { + filtered = append(filtered, watch) + } + } + return filtered } func fseventsDisplayPath(w *dirWatch, rawPath string) (string, bool) { diff --git a/internal/fswatch/fsevents_darwin_ffi.go b/internal/fswatch/fsevents_darwin_ffi.go index cfee33f0e05..33d2b300217 100644 --- a/internal/fswatch/fsevents_darwin_ffi.go +++ b/internal/fswatch/fsevents_darwin_ffi.go @@ -7,7 +7,7 @@ import ( "math" "os" "runtime" - "sync/atomic" + "slices" "syscall" "unsafe" @@ -453,7 +453,7 @@ type streamCallback struct { queue uintptr // per-stream serial dispatch queue done chan struct{} backend *fsEventsBackend - closed atomic.Bool + paths []string } type fsEventsCallbackPayload struct { @@ -478,7 +478,7 @@ func (p *fsEventsCallbackPayload) close() { // callbacks. The per-stream serial queue serializes this stream's callbacks // and prevents cross-stream head-of-line blocking that a process-wide serial // queue would cause. -func newStreamCallback(backend *fsEventsBackend) (*streamCallback, error) { +func newStreamCallback(backend *fsEventsBackend, paths []string) (*streamCallback, error) { var eventPipe [2]int if err := unix.Pipe(eventPipe[:]); err != nil { return nil, err @@ -501,6 +501,7 @@ func newStreamCallback(backend *fsEventsBackend) (*streamCallback, error) { queue: queue, done: make(chan struct{}), backend: backend, + paths: slices.Clone(paths), } go cb.eventLoop() return cb, nil diff --git a/internal/fswatch/fsevents_darwin_shared_test.go b/internal/fswatch/fsevents_darwin_shared_test.go index 456c9b694c8..6cd7edc0c98 100644 --- a/internal/fswatch/fsevents_darwin_shared_test.go +++ b/internal/fswatch/fsevents_darwin_shared_test.go @@ -137,3 +137,27 @@ func TestFSEventsSharedStreamFallsBackToChunks(t *testing.T) { t.Fatalf("startStream calls = %v, want %v", calls, wantCalls) } } + +func TestFSEventsActiveWatchesSnapshotFiltersToStreamPaths(t *testing.T) { + t.Parallel() + + b := newFSEventsBackend() + watchA := &dirWatch{physicalDir: "/watch/a"} + watchB := &dirWatch{physicalDir: "/watch/b"} + watchC := &dirWatch{physicalDir: "/watch/c"} + b.watches[watchA] = &fseventsState{} + b.watches[watchB] = &fseventsState{} + b.watches[watchC] = &fseventsState{} + + got := b.activeWatchesSnapshot([]string{"/watch/a", "/watch/c"}) + gotPaths := make([]string, 0, len(got)) + for _, watch := range got { + gotPaths = append(gotPaths, watch.w.physicalDir) + } + slices.Sort(gotPaths) + + want := []string{"/watch/a", "/watch/c"} + if !slices.Equal(gotPaths, want) { + t.Fatalf("activeWatchesSnapshot paths = %v, want %v", gotPaths, want) + } +} From 6e9ef2a21ba794575ea16db2787cbfaa146b374c Mon Sep 17 00:00:00 2001 From: Jake Bailey <5341706+jakebailey@users.noreply.github.com> Date: Thu, 25 Jun 2026 16:02:19 -0700 Subject: [PATCH 5/6] Scope FSEvents overflow routing --- internal/fswatch/fsevents_darwin.go | 15 +++++++-- .../fswatch/fsevents_darwin_shared_test.go | 31 +++++++++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/internal/fswatch/fsevents_darwin.go b/internal/fswatch/fsevents_darwin.go index dffea7919c2..b7c64be6389 100644 --- a/internal/fswatch/fsevents_darwin.go +++ b/internal/fswatch/fsevents_darwin.go @@ -494,8 +494,10 @@ func fsEventsCallback(cb *streamCallback, payload *fsEventsCallbackPayload) { overflow = errFSEventsTooMany } for _, watch := range watches { - watch.w.events.setError(overflow) - touched[watch.w] = struct{}{} + if fseventsOverflowMatches(watch.w, path) { + watch.w.events.setError(overflow) + touched[watch.w] = struct{}{} + } } } @@ -571,7 +573,7 @@ func (b *fsEventsBackend) activeWatchesSnapshot(paths []string) []fseventsWatchS } filtered := watches[:0] for _, watch := range watches { - if slices.Contains(paths, watch.w.physicalDir) { + if _, ok := slices.BinarySearch(paths, watch.w.physicalDir); ok { filtered = append(filtered, watch) } } @@ -587,3 +589,10 @@ func fseventsDisplayPath(w *dirWatch, rawPath string) (string, bool) { } return "", false } + +func fseventsOverflowMatches(w *dirWatch, rawPath string) bool { + if isInDirectoryOrSelf(w.physicalDir, rawPath) || isInDirectoryOrSelf(rawPath, w.physicalDir) { + return true + } + return w.physicalDir != w.dir && (isInDirectoryOrSelf(w.dir, rawPath) || isInDirectoryOrSelf(rawPath, w.dir)) +} diff --git a/internal/fswatch/fsevents_darwin_shared_test.go b/internal/fswatch/fsevents_darwin_shared_test.go index 6cd7edc0c98..8537e46744e 100644 --- a/internal/fswatch/fsevents_darwin_shared_test.go +++ b/internal/fswatch/fsevents_darwin_shared_test.go @@ -161,3 +161,34 @@ func TestFSEventsActiveWatchesSnapshotFiltersToStreamPaths(t *testing.T) { t.Fatalf("activeWatchesSnapshot paths = %v, want %v", gotPaths, want) } } + +func TestFSEventsOverflowMatchesWatch(t *testing.T) { + t.Parallel() + + w := &dirWatch{ + dir: "/logical/root", + physicalDir: "/physical/root", + } + cases := []struct { + name string + rawPath string + want bool + }{ + {name: "physical root", rawPath: "/physical/root", want: true}, + {name: "physical descendant", rawPath: "/physical/root/sub", want: true}, + {name: "physical ancestor", rawPath: "/physical", want: true}, + {name: "logical root", rawPath: "/logical/root", want: true}, + {name: "logical descendant", rawPath: "/logical/root/sub", want: true}, + {name: "logical ancestor", rawPath: "/logical", want: true}, + {name: "unrelated", rawPath: "/other/root", want: false}, + {name: "sibling prefix", rawPath: "/physical/root2", want: false}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + t.Parallel() + if got := fseventsOverflowMatches(w, c.rawPath); got != c.want { + t.Fatalf("fseventsOverflowMatches(%q) = %v, want %v", c.rawPath, got, c.want) + } + }) + } +} From 3a1423e76166c8fdd901b9669a2d9d4af8b9feb7 Mon Sep 17 00:00:00 2001 From: Jake Bailey <5341706+jakebailey@users.noreply.github.com> Date: Thu, 25 Jun 2026 16:26:48 -0700 Subject: [PATCH 6/6] Route FSEvents callbacks without backend snapshot --- internal/fswatch/fsevents_darwin.go | 51 ++++++++++--------- internal/fswatch/fsevents_darwin_ffi.go | 8 ++- .../fswatch/fsevents_darwin_shared_test.go | 22 +++++--- 3 files changed, 44 insertions(+), 37 deletions(-) diff --git a/internal/fswatch/fsevents_darwin.go b/internal/fswatch/fsevents_darwin.go index b7c64be6389..cc25dad2752 100644 --- a/internal/fswatch/fsevents_darwin.go +++ b/internal/fswatch/fsevents_darwin.go @@ -166,7 +166,6 @@ type fseventsStream struct { stream atomic.Uintptr cb *streamCallback pinner runtime.Pinner - paths []string } // ----- the watcherImpl ------------------------------------------------------- @@ -244,7 +243,7 @@ func (b *fsEventsBackend) startStreams(watches []fseventsWatchSnapshot) ([]*fsev return startFSEventsStreams(watches, b.startStream) } -func startFSEventsStreams(watches []fseventsWatchSnapshot, startStream func([]string) (*fseventsStream, error)) ([]*fseventsStream, error) { +func startFSEventsStreams(watches []fseventsWatchSnapshot, startStream func([]string, []fseventsWatchSnapshot) (*fseventsStream, error)) ([]*fseventsStream, error) { if len(watches) == 0 { return nil, nil } @@ -260,7 +259,7 @@ func startFSEventsStreams(watches []fseventsWatchSnapshot, startStream func([]st } sort.Strings(paths) - stream, err := startStream(paths) + stream, err := startStream(paths, watches) if err == nil { return []*fseventsStream{stream}, nil } @@ -269,7 +268,8 @@ func startFSEventsStreams(watches []fseventsWatchSnapshot, startStream func([]st remainingPaths := paths for len(remainingPaths) > 0 { chunkLen := min(len(remainingPaths), fseventsPathsPerStream) - stream, err := startStream(remainingPaths[:chunkLen]) + chunkPaths := remainingPaths[:chunkLen] + stream, err := startStream(chunkPaths, watchesForFSEventsPaths(watches, chunkPaths)) if err != nil { stopFSEventsStreams(streams) return nil, err @@ -280,8 +280,21 @@ func startFSEventsStreams(watches []fseventsWatchSnapshot, startStream func([]st return streams, nil } +func watchesForFSEventsPaths(watches []fseventsWatchSnapshot, paths []string) []fseventsWatchSnapshot { + if len(paths) == 0 { + return nil + } + filtered := make([]fseventsWatchSnapshot, 0, len(watches)) + for _, watch := range watches { + if _, ok := slices.BinarySearch(paths, watch.w.physicalDir); ok { + filtered = append(filtered, watch) + } + } + return filtered +} + // startStream creates and starts one FSEventStream watching all supplied paths. -func (b *fsEventsBackend) startStream(paths []string) (*fseventsStream, error) { +func (b *fsEventsBackend) startStream(paths []string, watches []fseventsWatchSnapshot) (*fseventsStream, error) { if len(paths) == 0 { return nil, nil } @@ -310,11 +323,11 @@ func (b *fsEventsBackend) startStream(paths []string) (*fseventsStream, error) { } defer cfRelease(pathsToWatch) - cb, err := newStreamCallback(b, paths) + cb, err := newStreamCallback(watches) if err != nil { return nil, err } - state := &fseventsStream{cb: cb, paths: slices.Clone(paths)} + state := &fseventsStream{cb: cb} state.pinner.Pin(cb) ctx := fsEventStreamContext{info: uintptr(unsafe.Pointer(cb))} @@ -467,7 +480,7 @@ func fsEventsCallback(cb *streamCallback, payload *fsEventsCallbackPayload) { paths := payload.paths flags := payload.flags - watches := cb.backend.activeWatchesSnapshot(cb.paths) + watches := cb.watches touched := map[*dirWatch]struct{}{} for i := range numEvents { @@ -494,6 +507,9 @@ func fsEventsCallback(cb *streamCallback, payload *fsEventsCallbackPayload) { overflow = errFSEventsTooMany } for _, watch := range watches { + if watch.state.terminated.Load() { + continue + } if fseventsOverflowMatches(watch.w, path) { watch.w.events.setError(overflow) touched[watch.w] = struct{}{} @@ -514,6 +530,9 @@ func fsEventsCallback(cb *streamCallback, payload *fsEventsCallbackPayload) { pathExistsKnown := false for _, watch := range watches { + if watch.state.terminated.Load() { + continue + } w := watch.w displayPath, ok := fseventsDisplayPath(w, rawPath) if !ok { @@ -564,22 +583,6 @@ func fsEventsCallback(cb *streamCallback, payload *fsEventsCallbackPayload) { } } -func (b *fsEventsBackend) activeWatchesSnapshot(paths []string) []fseventsWatchSnapshot { - b.mu.Lock() - defer b.mu.Unlock() - watches := b.activeWatchesLocked() - if len(paths) == 0 { - return watches - } - filtered := watches[:0] - for _, watch := range watches { - if _, ok := slices.BinarySearch(paths, watch.w.physicalDir); ok { - filtered = append(filtered, watch) - } - } - return filtered -} - func fseventsDisplayPath(w *dirWatch, rawPath string) (string, bool) { if isInDirectoryOrSelf(w.physicalDir, rawPath) { return w.displayPath(rawPath), true diff --git a/internal/fswatch/fsevents_darwin_ffi.go b/internal/fswatch/fsevents_darwin_ffi.go index 33d2b300217..43877c0c8b5 100644 --- a/internal/fswatch/fsevents_darwin_ffi.go +++ b/internal/fswatch/fsevents_darwin_ffi.go @@ -452,8 +452,7 @@ type streamCallback struct { eventFile *os.File queue uintptr // per-stream serial dispatch queue done chan struct{} - backend *fsEventsBackend - paths []string + watches []fseventsWatchSnapshot } type fsEventsCallbackPayload struct { @@ -478,7 +477,7 @@ func (p *fsEventsCallbackPayload) close() { // callbacks. The per-stream serial queue serializes this stream's callbacks // and prevents cross-stream head-of-line blocking that a process-wide serial // queue would cause. -func newStreamCallback(backend *fsEventsBackend, paths []string) (*streamCallback, error) { +func newStreamCallback(watches []fseventsWatchSnapshot) (*streamCallback, error) { var eventPipe [2]int if err := unix.Pipe(eventPipe[:]); err != nil { return nil, err @@ -500,8 +499,7 @@ func newStreamCallback(backend *fsEventsBackend, paths []string) (*streamCallbac eventFile: os.NewFile(uintptr(eventPipe[0]), "fsevents-event"), queue: queue, done: make(chan struct{}), - backend: backend, - paths: slices.Clone(paths), + watches: slices.Clone(watches), } go cb.eventLoop() return cb, nil diff --git a/internal/fswatch/fsevents_darwin_shared_test.go b/internal/fswatch/fsevents_darwin_shared_test.go index 8537e46744e..1f03e9a2f6e 100644 --- a/internal/fswatch/fsevents_darwin_shared_test.go +++ b/internal/fswatch/fsevents_darwin_shared_test.go @@ -119,8 +119,10 @@ func TestFSEventsSharedStreamFallsBackToChunks(t *testing.T) { } var calls []int - streams, err := startFSEventsStreams(watches, func(paths []string) (*fseventsStream, error) { + var watchCalls []int + streams, err := startFSEventsStreams(watches, func(paths []string, streamWatches []fseventsWatchSnapshot) (*fseventsStream, error) { calls = append(calls, len(paths)) + watchCalls = append(watchCalls, len(streamWatches)) if len(calls) == 1 { return nil, errStreamStartFailed } @@ -136,20 +138,24 @@ func TestFSEventsSharedStreamFallsBackToChunks(t *testing.T) { if !slices.Equal(calls, wantCalls) { t.Fatalf("startStream calls = %v, want %v", calls, wantCalls) } + if !slices.Equal(watchCalls, wantCalls) { + t.Fatalf("startStream watch calls = %v, want %v", watchCalls, wantCalls) + } } -func TestFSEventsActiveWatchesSnapshotFiltersToStreamPaths(t *testing.T) { +func TestWatchesForFSEventsPaths(t *testing.T) { t.Parallel() - b := newFSEventsBackend() watchA := &dirWatch{physicalDir: "/watch/a"} watchB := &dirWatch{physicalDir: "/watch/b"} watchC := &dirWatch{physicalDir: "/watch/c"} - b.watches[watchA] = &fseventsState{} - b.watches[watchB] = &fseventsState{} - b.watches[watchC] = &fseventsState{} + watches := []fseventsWatchSnapshot{ + {w: watchA, state: &fseventsState{}}, + {w: watchB, state: &fseventsState{}}, + {w: watchC, state: &fseventsState{}}, + } - got := b.activeWatchesSnapshot([]string{"/watch/a", "/watch/c"}) + got := watchesForFSEventsPaths(watches, []string{"/watch/a", "/watch/c"}) gotPaths := make([]string, 0, len(got)) for _, watch := range got { gotPaths = append(gotPaths, watch.w.physicalDir) @@ -158,7 +164,7 @@ func TestFSEventsActiveWatchesSnapshotFiltersToStreamPaths(t *testing.T) { want := []string{"/watch/a", "/watch/c"} if !slices.Equal(gotPaths, want) { - t.Fatalf("activeWatchesSnapshot paths = %v, want %v", gotPaths, want) + t.Fatalf("watchesForFSEventsPaths = %v, want %v", gotPaths, want) } }