Skip to content

Commit 6d2d861

Browse files
fix(provider): protect SweepingProvider.wg (#1200)
* fix(provider): protect `SweepingProvider.wg` * fix(provider): conflict resolution (#1199) * fix(ResettableKeystore): race when closing during reset (#1201)
1 parent 7eb605c commit 6d2d861

File tree

1 file changed

+45
-1
lines changed

1 file changed

+45
-1
lines changed

provider/provider.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ type SweepingProvider struct {
150150
cancelCtx context.CancelFunc
151151
closeOnce sync.Once
152152
wg sync.WaitGroup
153+
wgLk sync.RWMutex // Protects wg.Add() from racing with Close()
153154
cleanupFuncs []func() error
154155

155156
peerid peer.ID
@@ -488,7 +489,11 @@ func (s *SweepingProvider) run() {
488489
func (s *SweepingProvider) Close() error {
489490
var err error
490491
s.closeOnce.Do(func() {
492+
// Acquire write lock to prevent new wg.Add() calls during shutdown
493+
s.wgLk.Lock()
491494
close(s.done)
495+
s.wgLk.Unlock()
496+
492497
s.cancelCtx()
493498
s.wg.Wait()
494499
s.approxPrefixLenRunning.Lock()
@@ -1215,7 +1220,14 @@ func (s *SweepingProvider) handleReprovide() {
12151220
// present.
12161221
s.reprovideQueue.Remove(currentPrefix)
12171222

1223+
s.wgLk.RLock()
1224+
if s.closed() {
1225+
s.wgLk.RUnlock()
1226+
return
1227+
}
12181228
s.wg.Add(1)
1229+
s.wgLk.RUnlock()
1230+
12191231
go func() {
12201232
if err := s.workerPool.Acquire(periodicWorker); err == nil {
12211233
s.batchReprovide(currentPrefix)
@@ -1258,7 +1270,14 @@ func (s *SweepingProvider) handleProvide(force, reprovide bool, keys ...mh.Multi
12581270
s.provideQueue.Enqueue(prefixAndKeys.Prefix, prefixAndKeys.Keys...)
12591271
}
12601272

1273+
s.wgLk.RLock()
1274+
if s.closed() {
1275+
s.wgLk.RUnlock()
1276+
return
1277+
}
12611278
s.wg.Add(1)
1279+
s.wgLk.RUnlock()
1280+
12621281
go s.provideLoop()
12631282
}
12641283

@@ -1516,10 +1535,19 @@ func parseReprovideHistoryKey(k string) (time.Time, bitstr.Key, error) {
15161535
// This function is guarded by s.lateReprovideRunning, ensuring the function
15171536
// cannot be called again while it is working on reproviding late regions.
15181537
func (s *SweepingProvider) catchupPendingWork() {
1519-
if s.closed() || !s.lateReprovideRunning.TryLock() {
1538+
if !s.lateReprovideRunning.TryLock() {
1539+
return
1540+
}
1541+
1542+
s.wgLk.RLock()
1543+
if s.closed() {
1544+
s.wgLk.RUnlock()
1545+
s.lateReprovideRunning.Unlock()
15201546
return
15211547
}
15221548
s.wg.Add(2)
1549+
s.wgLk.RUnlock()
1550+
15231551
go func() {
15241552
// Reprovide late regions if any.
15251553
s.reprovideLateRegions()
@@ -1564,7 +1592,15 @@ func (s *SweepingProvider) provideLoop() {
15641592
}
15651593
prefix, keys, ok := s.provideQueue.Dequeue()
15661594
if ok {
1595+
s.wgLk.RLock()
1596+
if s.closed() {
1597+
s.wgLk.RUnlock()
1598+
s.workerPool.Release(burstWorker)
1599+
return
1600+
}
15671601
s.wg.Add(1)
1602+
s.wgLk.RUnlock()
1603+
15681604
go func(prefix bitstr.Key, keys []mh.Multihash) {
15691605
s.batchProvide(prefix, keys)
15701606
s.workerPool.Release(burstWorker)
@@ -1597,7 +1633,15 @@ func (s *SweepingProvider) reprovideLateRegions() {
15971633
}
15981634
prefix, ok := s.reprovideQueue.Dequeue()
15991635
if ok {
1636+
s.wgLk.RLock()
1637+
if s.closed() {
1638+
s.wgLk.RUnlock()
1639+
s.workerPool.Release(burstWorker)
1640+
return
1641+
}
16001642
s.wg.Add(1)
1643+
s.wgLk.RUnlock()
1644+
16011645
go func(prefix bitstr.Key) {
16021646
s.batchReprovide(prefix)
16031647
s.workerPool.Release(burstWorker)

0 commit comments

Comments
 (0)