Skip to content

Commit bdc0bed

Browse files
Merge pull request #8 from ibuildthecloud/master
Ensure that polling doesn't skip initial rows
2 parents 28a2eeb + 87d73e3 commit bdc0bed

File tree

3 files changed

+21
-18
lines changed

3 files changed

+21
-18
lines changed

pkg/client/client.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package client
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"time"
78

@@ -14,6 +15,10 @@ type Value struct {
1415
Modified int64
1516
}
1617

18+
var (
19+
ErrNotFound = errors.New("etcdwrapper: key not found")
20+
)
21+
1722
type Client interface {
1823
Get(ctx context.Context, key string) (Value, error)
1924
Put(ctx context.Context, key string, value []byte) error
@@ -59,7 +64,7 @@ func (c *client) Get(ctx context.Context, key string) (Value, error) {
5964
}, nil
6065
}
6166

62-
return Value{}, nil
67+
return Value{}, ErrNotFound
6368
}
6469

6570
func (c *client) Put(ctx context.Context, key string, value []byte) error {

pkg/drivers/generic/generic.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,9 @@ func (d *Generic) execute(ctx context.Context, sql string, args ...interface{})
199199
wait := strategy.Backoff(backoff.Linear(100 + time.Millisecond))
200200
for i := uint(0); i < 20; i++ {
201201
if i > 2 {
202-
logrus.Infof("EXEC (%d) %v : %s", i, args, Stripped(sql))
202+
logrus.Debugf("EXEC (try: %d) %v : %s", i, args, Stripped(sql))
203203
} else {
204-
logrus.Tracef("EXEC (%d) %v : %s", i, args, Stripped(sql))
204+
logrus.Tracef("EXEC (try: %d) %v : %s", i, args, Stripped(sql))
205205
}
206206
result, err = d.DB.ExecContext(ctx, sql, args...)
207207
if err != nil && d.Retry != nil && d.Retry(err) {

pkg/logstructured/sqllog/sql.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,9 @@ type Dialect interface {
4141
IsFill(key string) bool
4242
}
4343

44-
func (s *SQLLog) Start(ctx context.Context) error {
44+
func (s *SQLLog) Start(ctx context.Context) (err error) {
4545
s.ctx = ctx
46-
go s.compact()
47-
return nil
46+
return
4847
}
4948

5049
func (s *SQLLog) compact() {
@@ -266,14 +265,22 @@ func filter(events interface{}, checkPrefix bool, prefix string) ([]*server.Even
266265
}
267266

268267
func (s *SQLLog) startWatch() (chan interface{}, error) {
268+
pollStart, err := s.d.GetCompactRevision(s.ctx)
269+
if err != nil {
270+
return nil, err
271+
}
272+
269273
c := make(chan interface{})
270-
go s.poll(c)
274+
// start compaction and polling at the same time to watch starts
275+
// at the oldest revision, but compaction doesn't create gaps
276+
go s.compact()
277+
go s.poll(c, pollStart)
271278
return c, nil
272279
}
273280

274-
func (s *SQLLog) poll(result chan interface{}) {
281+
func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
275282
var (
276-
last int64
283+
last = pollStart
277284
skip int64
278285
skipTime time.Time
279286
)
@@ -293,15 +300,6 @@ func (s *SQLLog) poll(result chan interface{}) {
293300
case <-wait.C:
294301
}
295302

296-
if last == 0 {
297-
if currentRev, err := s.CurrentRevision(s.ctx); err != nil {
298-
logrus.Errorf("failed to find current revision: %v", err)
299-
continue
300-
} else {
301-
last = currentRev
302-
}
303-
}
304-
305303
rows, err := s.d.After(s.ctx, "%", last)
306304
if err != nil {
307305
logrus.Errorf("fail to list latest changes: %v", err)

0 commit comments

Comments
 (0)