Skip to content

Commit 5f533c9

Browse files
fifth-monthflycash
andauthored
syncx: sync.Cond的超时等待版,Cond.WaitWithContext(ctx) (#192)
--------- Signed-off-by: Ming Deng <[email protected]> Co-authored-by: Ming Deng <[email protected]>
1 parent b656686 commit 5f533c9

File tree

4 files changed

+898
-0
lines changed

4 files changed

+898
-0
lines changed

.CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# 开发中
2+
- [syncx: sync.Cond的超时等待版,Cond.WaitWithContext(ctx)](https://github.com/ecodeclub/ekit/pull/192)
23
- [copier: ReflectCopier copier支持类型转换](https://github.com/ecodeclub/ekit/issues/197)
34
- [mapx: TreeMap 添加 Keys 和 Values 方法](https://github.com/ecodeclub/ekit/pull/181)
45
- [mapx: 修正 HashMap 中使用泛型不当的地方](https://github.com/ecodeclub/ekit/pull/186)

syncx/cond.go

Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
// Copyright 2021 ecodeclub
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package syncx
16+
17+
import (
18+
"context"
19+
"sync"
20+
"sync/atomic"
21+
"unsafe"
22+
)
23+
24+
// Cond 实现了一个条件变量,是等待或宣布一个事件发生的goroutines的汇合点。
25+
//
26+
// 在改变条件和调用Wait方法的时候,Cond 关联的锁对象 L (*Mutex 或者 *RWMutex)必须被加锁,
27+
//
28+
// 在Go内存模型的术语中,Cond 保证 Broadcast或Signal的调用 同步于 因此而解除阻塞的 Wait 之前。
29+
//
30+
// 绝大多数简单用例, 最好使用 channels 而不是 Cond
31+
// (Broadcast 对应于关闭一个 channel, Signal 对应于给一个 channel 发送消息).
32+
type Cond struct {
33+
noCopy noCopy
34+
// L 在观察或改变条件时被加锁
35+
L sync.Locker
36+
notifyList *notifyList
37+
// 用于指向自身的指针,可以用于检测是否被复制使用
38+
checker unsafe.Pointer
39+
// 用于初始化notifyList
40+
once sync.Once
41+
}
42+
43+
// NewCond 返回 关联了 l 的新 Cond .
44+
func NewCond(l sync.Locker) *Cond {
45+
return &Cond{L: l}
46+
}
47+
48+
// Wait 自动解锁 c.L 并挂起当前调用的 goroutine. 在恢复执行之后 Wait 在返回前将加 c.L 锁成功.
49+
// 和其它系统不一样, 除非调用 Broadcast 或 Signal 或者 ctx 超时了,否则 Wait 不会返回.
50+
//
51+
// 成功唤醒时, 返回 nil. 超时失败时, 返回ctx.Err().
52+
// 如果 ctx 超时了, Wait 可能依旧执行成功返回 nil.
53+
//
54+
// 在 Wait 第一次继续执行时,因为 c.L 没有加锁, 当 Wait 返回的时候,调用者通常不能假设条件是真的
55+
// 相反, caller 应该在循环中调用 Wait:
56+
//
57+
// c.L.Lock()
58+
// for !condition() {
59+
// if err := c.Wait(ctx); err != nil {
60+
// // 超时唤醒了,并不是被正常唤醒的,可以做一些超时的处理
61+
// }
62+
// }
63+
// ... condition 满足了,do work ...
64+
// c.L.Unlock()
65+
func (c *Cond) Wait(ctx context.Context) error {
66+
c.checkCopy()
67+
c.checkFirstUse()
68+
t := c.notifyList.add() // 解锁前,将等待的对象放入链表中
69+
c.L.Unlock() // 一定是在等待对象放入链表后再解锁,避免刚解锁就发生协程切换,执行了signal后,再换回来导致永远阻塞
70+
defer c.L.Lock()
71+
return c.notifyList.wait(ctx, t)
72+
}
73+
74+
// Signal 唤醒一个等待在 c 上的goroutine.
75+
//
76+
// 调用时,caller 可以持有也可以不持有 c.L 锁
77+
//
78+
// Signal() 不影响 goroutine 调度的优先级; 如果其它的 goroutines
79+
// 尝试着锁定 c.L, 它们可能在 "waiting" goroutine 之前被唤醒.
80+
func (c *Cond) Signal() {
81+
c.checkCopy()
82+
c.checkFirstUse()
83+
c.notifyList.notifyOne()
84+
}
85+
86+
// Broadcast 唤醒所有等待在 c 上的goroutine.
87+
//
88+
// 调用时,caller 可以持有也可以不持有 c.L 锁
89+
func (c *Cond) Broadcast() {
90+
c.checkCopy()
91+
c.checkFirstUse()
92+
c.notifyList.notifyAll()
93+
}
94+
95+
// checkCopy 检查是否被拷贝使用
96+
func (c *Cond) checkCopy() {
97+
// 判断checker保存的指针是否等于当前的指针(初始化时,并没有初始化checker的值,所以也会出现不相等)
98+
if c.checker != unsafe.Pointer(c) &&
99+
// 由于初次初始化时,c.checker为0值,所以顺便进行一次原子替换,辅助初始化
100+
!atomic.CompareAndSwapPointer(&c.checker, nil, unsafe.Pointer(c)) &&
101+
// 再度检查checker保留指针是否等于当前指针
102+
c.checker != unsafe.Pointer(c) {
103+
panic("syncx.Cond is copied")
104+
}
105+
}
106+
107+
// checkFirstUse 用于初始化notifyList
108+
func (c *Cond) checkFirstUse() {
109+
c.once.Do(func() {
110+
if c.notifyList == nil {
111+
c.notifyList = newNotifyList()
112+
}
113+
})
114+
}
115+
116+
// notifyList 是一个简单的 runtime_notifyList 实现,但增强了 wait 方法
117+
type notifyList struct {
118+
mu sync.Mutex
119+
list *chanList
120+
}
121+
122+
func newNotifyList() *notifyList {
123+
return &notifyList{
124+
mu: sync.Mutex{},
125+
list: newChanList(),
126+
}
127+
}
128+
129+
func (l *notifyList) add() *node {
130+
l.mu.Lock()
131+
defer l.mu.Unlock()
132+
el := l.list.alloc()
133+
l.list.pushBack(el)
134+
return el
135+
}
136+
137+
func (l *notifyList) wait(ctx context.Context, elem *node) error {
138+
ch := elem.Value
139+
// 回收ch,超时时,因为没有被使用过,直接复用
140+
// 正常唤醒时,由于被放入了一条消息,但被取出来了一次,所以elem中的ch可以重复使用
141+
// 由于ch是挂在elem上的,所以elem在ch被回收之前,不可以被错误回收,所以必须在这里进行回收
142+
defer l.list.free(elem)
143+
select { // 由于会随机选择一条,在超时和通知同时存在的话,如果通知先行,则没有影响,如果超时的同时,又来了通知
144+
case <-ctx.Done(): // 进了超时分支
145+
l.mu.Lock()
146+
defer l.mu.Unlock()
147+
select {
148+
// double check: 检查是否在加锁前,刚好被正常通知了,
149+
// 如果取到数据,代表收到了信号了,ch也因为被取了一次消息,意味着可以再次复用
150+
// 转移信号到下一个
151+
// 如果有下一个等待的,就唤醒它
152+
case <-ch:
153+
if l.list.len() != 0 {
154+
l.notifyNext()
155+
}
156+
// 如果取不到数据,代表不可能被正常唤醒了,ch也意味着没有被使用,可以从队列移除等待对象
157+
default:
158+
l.list.remove(elem)
159+
}
160+
return ctx.Err()
161+
case <-ch: // 如果取到数据,代表被正常唤醒了,ch也因为被取了一次消息,意味着可以再次复用
162+
return nil
163+
}
164+
}
165+
166+
func (l *notifyList) notifyOne() {
167+
l.mu.Lock()
168+
defer l.mu.Unlock()
169+
if l.list.len() == 0 {
170+
return
171+
}
172+
l.notifyNext()
173+
}
174+
175+
func (l *notifyList) notifyNext() {
176+
front := l.list.front()
177+
ch := front.Value
178+
l.list.remove(front)
179+
ch <- struct{}{}
180+
}
181+
182+
func (l *notifyList) notifyAll() {
183+
l.mu.Lock()
184+
defer l.mu.Unlock()
185+
for l.list.len() != 0 {
186+
l.notifyNext()
187+
}
188+
}
189+
190+
// node 保存chan的链表元素
191+
type node struct {
192+
prev *node
193+
next *node
194+
Value chan struct{}
195+
}
196+
197+
// chanList 用于存放保存channel的一个双链表, 带复用元素的功能
198+
type chanList struct {
199+
// 哨兵元素,方便处理元素个数为0的情况
200+
sentinel *node
201+
size int
202+
pool *sync.Pool
203+
}
204+
205+
func newChanList() *chanList {
206+
sentinel := &node{}
207+
sentinel.prev = sentinel
208+
sentinel.next = sentinel
209+
return &chanList{
210+
sentinel: sentinel,
211+
size: 0,
212+
pool: &sync.Pool{
213+
New: func() any {
214+
return &node{
215+
Value: make(chan struct{}, 1),
216+
}
217+
},
218+
},
219+
}
220+
}
221+
222+
// len 获取链表长度
223+
func (l *chanList) len() int {
224+
return l.size
225+
}
226+
227+
// front 获取队首元素
228+
func (l *chanList) front() *node {
229+
return l.sentinel.next
230+
}
231+
232+
// alloc 申请新的元素,包含复用的chan
233+
func (l *chanList) alloc() *node {
234+
elem := l.pool.Get().(*node)
235+
return elem
236+
}
237+
238+
// pushBack 追加元素到队尾
239+
func (l *chanList) pushBack(elem *node) {
240+
elem.next = l.sentinel
241+
elem.prev = l.sentinel.prev
242+
l.sentinel.prev.next = elem
243+
l.sentinel.prev = elem
244+
l.size++
245+
}
246+
247+
// remove 元素移除时,还不能回收该元素,避免元素上的chan被错误覆盖
248+
func (l *chanList) remove(elem *node) {
249+
elem.prev.next = elem.next
250+
elem.next.prev = elem.prev
251+
elem.prev = nil
252+
elem.next = nil
253+
l.size--
254+
}
255+
256+
// free 回收该元素,用于下次alloc获取时复用,避免再次分配
257+
func (l *chanList) free(elem *node) {
258+
l.pool.Put(elem)
259+
}
260+
261+
// 用于静态代码检查复制的问题
262+
type noCopy struct{}
263+
264+
func (*noCopy) Lock() {}
265+
func (*noCopy) Unlock() {}

0 commit comments

Comments
 (0)