Skip to content

Commit 4e04255

Browse files
Merge pull request #11 from ibuildthecloud/master
Make revision consistent for empty sets
2 parents f904b78 + a18dbb9 commit 4e04255

File tree

1 file changed

+26
-4
lines changed

1 file changed

+26
-4
lines changed

pkg/logstructured/logstructured.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,15 +155,24 @@ func (l *LogStructured) Delete(ctx context.Context, key string, revision int64)
155155

156156
func (l *LogStructured) List(ctx context.Context, prefix, startKey string, limit, revision int64) (revRet int64, kvRet []*server.KeyValue, errRet error) {
157157
defer func() {
158-
l.adjustRevision(ctx, &revRet)
159158
logrus.Debugf("LIST %s, start=%s, limit=%d, rev=%d => rev=%d, kvs=%d, err=%v", prefix, startKey, limit, revision, revRet, len(kvRet), errRet)
160159
}()
161160

162161
rev, events, err := l.log.List(ctx, prefix, startKey, limit, revision, false)
163162
if err != nil {
164163
return 0, nil, err
165164
}
166-
if revision != 0 {
165+
if revision == 0 && len(events) == 0 {
166+
// if no revision is requested and no events are returned, then
167+
// get the current revision and relist. Relist is required because
168+
// between now and getting the current revision something could have
169+
// been created.
170+
currentRev, err := l.log.CurrentRevision(ctx)
171+
if err != nil {
172+
return 0, nil, err
173+
}
174+
return l.List(ctx, prefix, startKey, limit, currentRev)
175+
} else if revision != 0 {
167176
rev = revision
168177
}
169178

@@ -176,10 +185,23 @@ func (l *LogStructured) List(ctx context.Context, prefix, startKey string, limit
176185

177186
func (l *LogStructured) Count(ctx context.Context, prefix string) (revRet int64, count int64, err error) {
178187
defer func() {
179-
l.adjustRevision(ctx, &revRet)
180188
logrus.Debugf("COUNT %s => rev=%d, count=%d, err=%v", prefix, revRet, count, err)
181189
}()
182-
return l.log.Count(ctx, prefix)
190+
rev, count, err := l.log.Count(ctx, prefix)
191+
if err != nil {
192+
return 0, 0, err
193+
}
194+
195+
if count == 0 {
196+
// if count is zero, then so is revision, so now get the current revision and re-count at that revision
197+
currentRev, err := l.log.CurrentRevision(ctx)
198+
if err != nil {
199+
return 0, 0, err
200+
}
201+
rev, rows, err := l.List(ctx, prefix, prefix, 1000, currentRev)
202+
return rev, int64(len(rows)), err
203+
}
204+
return rev, count, nil
183205
}
184206

185207
func (l *LogStructured) Update(ctx context.Context, key string, value []byte, revision, lease int64) (revRet int64, kvRet *server.KeyValue, updateRet bool, errRet error) {

0 commit comments

Comments
 (0)