Skip to content

Commit f904b78

Browse files
Merge pull request #10 from ibuildthecloud/master
Fix pessimestic update revision check
2 parents 7fc7fb0 + aab49b4 commit f904b78

File tree

4 files changed

+34
-23
lines changed

4 files changed

+34
-23
lines changed

pkg/drivers/generic/generic.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,8 +294,11 @@ func (d *Generic) CurrentRevision(ctx context.Context) (int64, error) {
294294
return id, err
295295
}
296296

297-
func (d *Generic) After(ctx context.Context, prefix string, rev int64) (*sql.Rows, error) {
297+
func (d *Generic) After(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error) {
298298
sql := d.AfterSQL
299+
if limit > 0 {
300+
sql = fmt.Sprintf("%s LIMIT %d", sql, limit)
301+
}
299302
return d.query(ctx, sql, prefix, rev)
300303
}
301304

pkg/logstructured/logstructured.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ type Log interface {
1313
Start(ctx context.Context) error
1414
CurrentRevision(ctx context.Context) (int64, error)
1515
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeletes bool) (int64, []*server.Event, error)
16-
After(ctx context.Context, prefix string, revision int64) (int64, []*server.Event, error)
16+
After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error)
1717
Watch(ctx context.Context, prefix string) <-chan []*server.Event
1818
Count(ctx context.Context, prefix string) (int64, int64, error)
1919
Append(ctx context.Context, event *server.Event) (int64, error)
@@ -185,10 +185,14 @@ func (l *LogStructured) Count(ctx context.Context, prefix string) (revRet int64,
185185
func (l *LogStructured) Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, kvRet *server.KeyValue, updateRet bool, errRet error) {
186186
defer func() {
187187
l.adjustRevision(ctx, &revRet)
188-
logrus.Debugf("UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, kv=%v, updated=%v, err=%v", key, len(value), revision, lease, revRet, kvRet != nil, updateRet, errRet)
188+
kvRev := int64(0)
189+
if kvRet != nil {
190+
kvRev = kvRet.ModRevision
191+
}
192+
logrus.Debugf("UPDATE %s, value=%d, rev=%d, lease=%v => rev=%d, kvrev=%d, updated=%v, err=%v", key, len(value), revision, lease, revRet, kvRev, updateRet, errRet)
189193
}()
190194

191-
rev, event, err := l.get(ctx, key, revision, false)
195+
rev, event, err := l.get(ctx, key, 0, false)
192196
if err != nil {
193197
return 0, nil, false, err
194198
}
@@ -295,7 +299,7 @@ func (l *LogStructured) Watch(ctx context.Context, prefix string, revision int64
295299

296300
result := make(chan []*server.Event, 100)
297301

298-
rev, kvs, err := l.log.After(ctx, prefix, revision)
302+
rev, kvs, err := l.log.After(ctx, prefix, revision, 0)
299303
if err != nil {
300304
logrus.Errorf("failed to list %s for revision %d", prefix, revision)
301305
cancel()

pkg/logstructured/sqllog/sql.go

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type Dialect interface {
3131
List(ctx context.Context, prefix, startKey string, limit, revision int64, includeDeleted bool) (*sql.Rows, error)
3232
Count(ctx context.Context, prefix string) (int64, int64, error)
3333
CurrentRevision(ctx context.Context) (int64, error)
34-
After(ctx context.Context, prefix string, rev int64) (*sql.Rows, error)
34+
After(ctx context.Context, prefix string, rev, limit int64) (*sql.Rows, error)
3535
Insert(ctx context.Context, key string, create, delete bool, createRevision, previousRevision int64, ttl int64, value, prevValue []byte) (int64, error)
3636
GetRevision(ctx context.Context, revision int64) (*sql.Rows, error)
3737
DeleteRevision(ctx context.Context, revision int64) error
@@ -152,12 +152,12 @@ func (s *SQLLog) CurrentRevision(ctx context.Context) (int64, error) {
152152
return s.d.CurrentRevision(ctx)
153153
}
154154

155-
func (s *SQLLog) After(ctx context.Context, prefix string, revision int64) (int64, []*server.Event, error) {
155+
func (s *SQLLog) After(ctx context.Context, prefix string, revision, limit int64) (int64, []*server.Event, error) {
156156
if strings.HasSuffix(prefix, "/") {
157157
prefix += "%"
158158
}
159159

160-
rows, err := s.d.After(ctx, prefix, revision)
160+
rows, err := s.d.After(ctx, prefix, revision, limit)
161161
if err != nil {
162162
return 0, nil, err
163163
}
@@ -280,27 +280,31 @@ func (s *SQLLog) startWatch() (chan interface{}, error) {
280280

281281
func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
282282
var (
283-
last = pollStart
284-
skip int64
285-
skipTime time.Time
283+
last = pollStart
284+
skip int64
285+
skipTime time.Time
286+
waitForMore = true
286287
)
287288

288289
wait := time.NewTicker(time.Second)
289290
defer wait.Stop()
290291
defer close(result)
291292

292293
for {
293-
select {
294-
case <-s.ctx.Done():
295-
return
296-
case check := <-s.notify:
297-
if check <= last {
298-
continue
294+
if waitForMore {
295+
select {
296+
case <-s.ctx.Done():
297+
return
298+
case check := <-s.notify:
299+
if check <= last {
300+
continue
301+
}
302+
case <-wait.C:
299303
}
300-
case <-wait.C:
301304
}
305+
waitForMore = true
302306

303-
rows, err := s.d.After(s.ctx, "%", last)
307+
rows, err := s.d.After(s.ctx, "%", last, 500)
304308
if err != nil {
305309
logrus.Errorf("fail to list latest changes: %v", err)
306310
continue
@@ -316,6 +320,8 @@ func (s *SQLLog) poll(result chan interface{}, pollStart int64) {
316320
continue
317321
}
318322

323+
waitForMore = len(events) < 100
324+
319325
rev := last
320326
var (
321327
sequential []*server.Event

pkg/server/server.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,8 @@ func toKVs(kvs ...*KeyValue) []*mvccpb.KeyValue {
104104
ret := make([]*mvccpb.KeyValue, 0, len(kvs))
105105
for _, kv := range kvs {
106106
newKV := toKV(kv)
107-
if newKV == nil {
108-
fmt.Println("HIHIHIH")
109-
} else {
110-
ret = append(ret, toKV(kv))
107+
if newKV != nil {
108+
ret = append(ret, newKV)
111109
}
112110
}
113111
return ret

0 commit comments

Comments
 (0)