Skip to content

Commit c238446

Browse files
committed
Rewrited buffer to enable repeating read & added tests (#11)
* Added test for slow data source * Added * Fixed slow reader test * Fixed slowReader.Read * Removed outdated comments from readBuffer
1 parent 031e035 commit c238446

File tree

2 files changed

+86
-15
lines changed

2 files changed

+86
-15
lines changed

async_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package json
2+
3+
import (
4+
"encoding/json"
5+
"github.com/stretchr/testify/assert"
6+
"github.com/stretchr/testify/require"
7+
"io"
8+
"testing"
9+
"time"
10+
)
11+
12+
var _ io.Reader = &slowReader{}
13+
14+
type slowReader struct {
15+
src []byte
16+
index int
17+
len int
18+
pause time.Duration
19+
}
20+
21+
func newSlowReader(src []byte, pause time.Duration) *slowReader {
22+
return &slowReader{
23+
src: src,
24+
len: 1,
25+
pause: pause,
26+
}
27+
}
28+
29+
func (s *slowReader) Work() {
30+
for i := s.len; i <= len(s.src); i++ {
31+
time.Sleep(s.pause)
32+
s.len = i
33+
}
34+
}
35+
36+
func (s *slowReader) Read(p []byte) (int, error) {
37+
n := len(p)
38+
readerLen := s.len
39+
if s.index == len(s.src) {
40+
return 0, io.EOF
41+
}
42+
if s.index+n <= readerLen {
43+
copy(p, s.src[s.index:s.index+n])
44+
s.index += n
45+
return n, nil
46+
}
47+
length := readerLen - s.index
48+
copy(p, s.src[s.index:readerLen])
49+
s.index = readerLen
50+
return length, nil
51+
}
52+
53+
func TestUnmarshal_SlowReader(t *testing.T) {
54+
initBig()
55+
data := newSlowReader(jsonBig, time.Microsecond)
56+
res := new(map[string]interface{})
57+
expected := new(map[string]interface{})
58+
require.NoError(t, json.Unmarshal(jsonBig, expected))
59+
go data.Work()
60+
require.NoError(t, Unmarshal(data, res))
61+
assert.Equal(t, expected, res)
62+
}

internal/readbuffer/readbuffer.go

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
11
package readbuffer
22

3-
import "io"
3+
import (
4+
"io"
5+
"time"
6+
)
47

5-
const readBufSize = 1 << 10
8+
const (
9+
readBufSize = 1 << 10
10+
readTimeout = 100 * time.Millisecond
11+
)
612

713
type ReadBuffer struct {
814
buf []byte
@@ -24,21 +30,24 @@ func (r *ReadBuffer) Get(n int) ([]byte, error) {
2430
r.index += n
2531
return res, nil
2632
}
27-
got := make([]byte, r.len-r.index)
28-
copy(got, r.buf[r.index:r.len])
33+
res := make([]byte, r.len-r.index)
34+
copy(res, r.buf[r.index:r.len])
2935
r.index = r.len
30-
n -= len(got)
31-
if err := r.load(); err != nil {
32-
return got, err
33-
}
34-
if r.len-r.index >= n {
35-
res := append(got, r.buf[r.index:r.index+n]...)
36-
r.index += n
37-
return res, nil
36+
n -= len(res)
37+
var err error
38+
for err = r.load(); err == nil; err = r.load() {
39+
if r.len-r.index >= n {
40+
res = append(res, r.buf[r.index:r.index+n]...)
41+
r.index += n
42+
return res, nil
43+
}
44+
45+
res = append(res, r.buf[r.index:r.len]...)
46+
n -= r.len - r.index
47+
r.index = r.len
48+
time.Sleep(readTimeout)
3849
}
39-
res := append(got, r.buf[r.index:r.len]...)
40-
r.index = r.len
41-
return res, nil
50+
return res, err
4251
}
4352

4453
func (r *ReadBuffer) load() error {

0 commit comments

Comments
 (0)