Skip to content

Commit 0385a45

Browse files
Merge pull request #24 from ibuildthecloud/master
Dedup compact_rev_key on start
2 parents bb03967 + 118e08f commit 0385a45

File tree

2 files changed

+51
-12
lines changed

2 files changed

+51
-12
lines changed

pkg/drivers/generic/generic.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -255,18 +255,7 @@ func (d *Generic) GetCompactRevision(ctx context.Context) (int64, error) {
255255
}
256256

257257
func (d *Generic) SetCompactRevision(ctx context.Context, revision int64) error {
258-
result, err := d.execute(ctx, d.UpdateCompactSQL, revision)
259-
if err != nil {
260-
return err
261-
}
262-
num, err := result.RowsAffected()
263-
if err != nil {
264-
return err
265-
}
266-
if num != 0 {
267-
return nil
268-
}
269-
_, err = d.Insert(ctx, "compact_rev_key", false, false, 0, revision, 0, []byte(""), nil)
258+
_, err := d.execute(ctx, d.UpdateCompactSQL, revision)
270259
return err
271260
}
272261

pkg/logstructured/sqllog/sql.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,52 @@ func (s *SQLLog) Start(ctx context.Context) (err error) {
4646
return
4747
}
4848

49+
func (s *SQLLog) compactStart(ctx context.Context) error {
50+
rows, err := s.d.After(ctx, "compact_rev_key", 0, 0)
51+
if err != nil {
52+
return err
53+
}
54+
55+
_, _, events, err := RowsToEvents(rows)
56+
if err != nil {
57+
return err
58+
}
59+
60+
if len(events) == 0 {
61+
_, err := s.Append(ctx, &server.Event{
62+
Create: true,
63+
KV: &server.KeyValue{
64+
Key: "compact_rev_key",
65+
Value: []byte(""),
66+
},
67+
})
68+
return err
69+
} else if len(events) == 1 {
70+
return nil
71+
}
72+
73+
// this is to work around a bug in which we ended up with two compact_rev_key rows
74+
maxRev := int64(0)
75+
maxID := int64(0)
76+
for _, event := range events {
77+
if event.PrevKV != nil && event.PrevKV.ModRevision > maxRev {
78+
maxRev = event.PrevKV.ModRevision
79+
maxID = event.KV.ModRevision
80+
}
81+
}
82+
83+
for _, event := range events {
84+
if event.KV.ModRevision == maxID {
85+
continue
86+
}
87+
if err := s.d.DeleteRevision(ctx, event.KV.ModRevision); err != nil {
88+
return err
89+
}
90+
}
91+
92+
return nil
93+
}
94+
4995
func (s *SQLLog) compact() {
5096
var (
5197
nextEnd int64
@@ -277,6 +323,10 @@ func filter(events interface{}, checkPrefix bool, prefix string) ([]*server.Even
277323
}
278324

279325
func (s *SQLLog) startWatch() (chan interface{}, error) {
326+
if err := s.compactStart(s.ctx); err != nil {
327+
return nil, err
328+
}
329+
280330
pollStart, err := s.d.GetCompactRevision(s.ctx)
281331
if err != nil {
282332
return nil, err

0 commit comments

Comments
 (0)