Skip to content

Commit 10f8399

Browse files
authored
feat: 添加 MultipleBytes,并且实现多协程安全读写 (#272)
1 parent 97490cd commit 10f8399

File tree

4 files changed

+519
-0
lines changed

4 files changed

+519
-0
lines changed

iox/concurrent_multiple_bytes.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2021 ecodeclub
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package iox
16+
17+
import (
18+
"sync"
19+
)
20+
21+
// ConcurrentMultipleBytes 是 MultipleBytes 的线程安全装饰器
22+
type ConcurrentMultipleBytes struct {
23+
mb *MultipleBytes
24+
lock sync.Mutex
25+
}
26+
27+
// NewConcurrentMultipleBytes 创建一个新的线程安全的 MultipleBytes 实例
28+
// sliceCount 参数用于预分配内部切片数组的容量
29+
func NewConcurrentMultipleBytes(sliceCount int) *ConcurrentMultipleBytes {
30+
return &ConcurrentMultipleBytes{
31+
mb: NewMultipleBytes(sliceCount),
32+
}
33+
}
34+
35+
// Read 实现 io.Reader 接口
36+
// 从当前位置读取数据到 p 中,如果没有数据可读返回 io.EOF
37+
func (c *ConcurrentMultipleBytes) Read(p []byte) (n int, err error) {
38+
c.lock.Lock()
39+
defer c.lock.Unlock()
40+
return c.mb.Read(p)
41+
}
42+
43+
// Write 实现 io.Writer 接口
44+
// 将 p 中的数据写入到内部缓冲区
45+
func (c *ConcurrentMultipleBytes) Write(p []byte) (n int, err error) {
46+
c.lock.Lock()
47+
defer c.lock.Unlock()
48+
return c.mb.Write(p)
49+
}
50+
51+
// Reset 重置读取位置到开始处
52+
func (c *ConcurrentMultipleBytes) Reset() {
53+
c.lock.Lock()
54+
defer c.lock.Unlock()
55+
c.mb.Reset()
56+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright 2021 ecodeclub
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package iox
16+
17+
import (
18+
"io"
19+
"sync"
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
)
24+
25+
func TestConcurrentMultipleBytes(t *testing.T) {
26+
t.Run("基本读写功能", func(t *testing.T) {
27+
cmb := NewConcurrentMultipleBytes(2)
28+
data := []byte{1, 2, 3, 4}
29+
30+
// 写入数据
31+
n, err := cmb.Write(data)
32+
assert.Equal(t, len(data), n)
33+
assert.Nil(t, err)
34+
35+
// 读取数据
36+
read := make([]byte, 4)
37+
n, err = cmb.Read(read)
38+
assert.Equal(t, len(data), n)
39+
assert.Nil(t, err)
40+
assert.Equal(t, data, read[:n])
41+
})
42+
43+
t.Run("并发读写", func(t *testing.T) {
44+
cmb := NewConcurrentMultipleBytes(3)
45+
var wg sync.WaitGroup
46+
47+
// 并发写入
48+
for i := 0; i < 3; i++ {
49+
wg.Add(1)
50+
go func(val byte) {
51+
defer wg.Done()
52+
n, err := cmb.Write([]byte{val})
53+
assert.Equal(t, 1, n)
54+
assert.Nil(t, err)
55+
}(byte(i + 1))
56+
}
57+
wg.Wait()
58+
59+
// 并发读取
60+
results := make([][]byte, 3)
61+
for i := 0; i < 3; i++ {
62+
wg.Add(1)
63+
go func(idx int) {
64+
defer wg.Done()
65+
read := make([]byte, 1)
66+
n, err := cmb.Read(read)
67+
if err != nil && err != io.EOF {
68+
assert.Nil(t, err)
69+
return
70+
}
71+
results[idx] = read[:n]
72+
}(i)
73+
}
74+
wg.Wait()
75+
76+
// 验证总读取字节数
77+
total := 0
78+
for _, res := range results {
79+
total += len(res)
80+
}
81+
assert.Equal(t, 3, total)
82+
})
83+
84+
t.Run("边界场景", func(t *testing.T) {
85+
cmb := NewConcurrentMultipleBytes(1)
86+
87+
// 空切片写入
88+
n, err := cmb.Write([]byte{})
89+
assert.Equal(t, 0, n)
90+
assert.Nil(t, err)
91+
92+
// 空切片读取
93+
read := make([]byte, 1)
94+
n, err = cmb.Read(read)
95+
assert.Equal(t, 0, n)
96+
assert.Equal(t, io.EOF, err)
97+
})
98+
99+
t.Run("Reset功能", func(t *testing.T) {
100+
cmb := NewConcurrentMultipleBytes(1)
101+
data := []byte{1, 2}
102+
103+
// 写入数据
104+
n, err := cmb.Write(data)
105+
assert.Equal(t, len(data), n)
106+
assert.Nil(t, err)
107+
108+
// 读取一部分
109+
read := make([]byte, 1)
110+
n, err = cmb.Read(read)
111+
assert.Equal(t, 1, n)
112+
assert.Nil(t, err)
113+
114+
// 重置
115+
cmb.Reset()
116+
117+
// 重新读取
118+
read = make([]byte, 2)
119+
n, err = cmb.Read(read)
120+
assert.Equal(t, 2, n)
121+
assert.Nil(t, err)
122+
assert.Equal(t, data, read[:n])
123+
})
124+
}

iox/multiple_bytes.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright 2021 ecodeclub
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package iox
16+
17+
import (
18+
"io"
19+
)
20+
21+
// MultipleBytes 是一个实现了 io.Reader 和 io.Writer 接口的结构体
22+
// 它可以安全地在多个 goroutine 之间共享
23+
type MultipleBytes struct {
24+
data [][]byte
25+
idx1 int // 第几个切片
26+
idx2 int // data[idx1] 中的下标
27+
}
28+
29+
// NewMultipleBytes 创建一个新的 MultipleBytes 实例
30+
// sliceCount 参数用于预分配内部切片数组的容量
31+
func NewMultipleBytes(sliceCount int) *MultipleBytes {
32+
return &MultipleBytes{
33+
data: make([][]byte, 0, sliceCount),
34+
}
35+
}
36+
37+
// Read 实现 io.Reader 接口
38+
// 从当前位置读取数据到 p 中,如果没有数据可读返回 io.EOF
39+
func (m *MultipleBytes) Read(p []byte) (n int, err error) {
40+
// 如果没有数据或者已经读完了所有数据
41+
if len(m.data) == 0 || (m.idx1 >= len(m.data)) {
42+
return 0, io.EOF
43+
}
44+
45+
totalRead := 0
46+
for m.idx1 < len(m.data) {
47+
currentSlice := m.data[m.idx1]
48+
remaining := len(currentSlice) - m.idx2
49+
if remaining <= 0 {
50+
m.idx1++
51+
m.idx2 = 0
52+
continue
53+
}
54+
55+
toRead := len(p) - totalRead
56+
if toRead <= 0 {
57+
break
58+
}
59+
60+
if remaining > toRead {
61+
n = copy(p[totalRead:], currentSlice[m.idx2:m.idx2+toRead])
62+
m.idx2 += n
63+
} else {
64+
n = copy(p[totalRead:], currentSlice[m.idx2:])
65+
m.idx1++
66+
m.idx2 = 0
67+
}
68+
totalRead += n
69+
}
70+
71+
return totalRead, nil
72+
}
73+
74+
// Write 实现 io.Writer 接口
75+
// 将 p 中的数据写入到内部缓冲区
76+
func (m *MultipleBytes) Write(p []byte) (n int, err error) {
77+
if len(p) == 0 {
78+
return 0, nil
79+
}
80+
81+
// 直接将输入切片追加到内部存储
82+
m.data = append(m.data, p)
83+
84+
return len(p), nil
85+
}
86+
87+
// Reset 重置读取位置到开始处
88+
func (m *MultipleBytes) Reset() {
89+
m.idx1 = 0
90+
m.idx2 = 0
91+
}

0 commit comments

Comments
 (0)