Skip to content

Commit 4add79b

Browse files
authored
SQLITE Contention: Add retries and enable WAL mode (#355)
* Using SQLITE's busy timeout didn't fix the issue with busy errors preventing sessions from being updated and learning from occuring. * Enabling WAL mode seems to have made a huge difference * WAL uses a separate file to log edits and allows concurrent reads and writes * Add logic at the application layer to retry the updates. * Use an atomic counter to track the number of concurrent updates to try to confirm we don't have multiple concurrent writes. * To better track issues with Analyzer lag start tracking the timestamp of the lag processed log message. * Add a Gauge metric to report Analyzer LAG
1 parent 2ddd974 commit 4add79b

File tree

18 files changed

+533
-1265
lines changed

18 files changed

+533
-1265
lines changed

app/pkg/analyze/analyzer.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ import (
1111
"sync"
1212
"time"
1313

14+
"github.com/prometheus/client_golang/prometheus"
15+
"github.com/prometheus/client_golang/prometheus/promauto"
16+
1417
"github.com/jlewi/foyle/app/pkg/logs/matchers"
1518

1619
"github.com/jlewi/foyle/app/pkg/runme/converters"
@@ -34,6 +37,13 @@ import (
3437
"k8s.io/client-go/util/workqueue"
3538
)
3639

40+
var (
41+
lagGauge = promauto.NewGauge(prometheus.GaugeOpts{
42+
Name: "analyzer_logs_lag_seconds", // Metric name
43+
Help: "Lag in seconds between the current time and last processed log entry", // Metric description
44+
})
45+
)
46+
3747
const (
3848
// traceField is the field that contains the traceId in a log entry. We use this to identify processing related
3949
// to a particular trace. We don't use the field "traceId" because these log entries aren't actually part of the
@@ -264,6 +274,8 @@ func (a *Analyzer) processLogFile(ctx context.Context, path string) error {
264274

265275
traceIDs := make(map[string]bool)
266276

277+
lastLogTime := time.Time{}
278+
267279
// We read the lines line by line. We keep track of all the traceIDs mentioned in those lines. We
268280
// Then do a combineAndCheckpoint for all the traceIDs mentioned. Lines are also persisted in a KV store
269281
// keyed by traceID. So if on the next iteration we get a new line for a given traceId and need to reprocess
@@ -275,6 +287,8 @@ func (a *Analyzer) processLogFile(ctx context.Context, path string) error {
275287
continue
276288
}
277289

290+
lastLogTime = entry.Time()
291+
278292
// Add the entry to a session if it should be.
279293
a.sessBuilder.processLogEntry(entry, a.learnNotifier)
280294

@@ -312,7 +326,7 @@ func (a *Analyzer) processLogFile(ctx context.Context, path string) error {
312326
}
313327

314328
// Now run a combineAndCheckpoint
315-
a.combineAndCheckpoint(ctx, path, offset, traceIDs)
329+
a.combineAndCheckpoint(ctx, path, offset, lastLogTime, traceIDs)
316330

317331
// If we are shutting down we don't want to keep processing the file.
318332
// By aborting shutdown here as opposed to here we are blocking shutdown for as least as long it takes
@@ -326,7 +340,7 @@ func (a *Analyzer) processLogFile(ctx context.Context, path string) error {
326340

327341
// combineAndCheckpoint runs a combine operation for all the traceIDs listed in the map.
328342
// Progress is then checkpointed.
329-
func (a *Analyzer) combineAndCheckpoint(ctx context.Context, path string, offset int64, traceIDs map[string]bool) {
343+
func (a *Analyzer) combineAndCheckpoint(ctx context.Context, path string, offset int64, lastLogTime time.Time, traceIDs map[string]bool) {
330344
log := logs.FromContext(ctx)
331345
// Combine the entries for each trace that we saw.
332346
// N.B. We could potentially make this more efficient by checking if the log message is the final message
@@ -337,7 +351,11 @@ func (a *Analyzer) combineAndCheckpoint(ctx context.Context, path string, offset
337351
}
338352
}
339353
// Update the offset
340-
a.setLogFileOffset(path, offset)
354+
a.setLogFileOffset(path, offset, lastLogTime)
355+
356+
// Update the lag
357+
lag := time.Since(lastLogTime)
358+
lagGauge.Set(lag.Seconds())
341359
}
342360

343361
func (a *Analyzer) GetWatermark() *logspb.LogsWaterMark {
@@ -362,13 +380,15 @@ func (a *Analyzer) getLogFileOffset(path string) int64 {
362380
return a.logFileOffsets.Offset
363381
}
364382

365-
func (a *Analyzer) setLogFileOffset(path string, offset int64) {
383+
func (a *Analyzer) setLogFileOffset(path string, offset int64, lastTimestamp time.Time) {
366384
a.mu.Lock()
367385
defer a.mu.Unlock()
386+
lastTime := timestamppb.New(lastTimestamp)
368387
oldWatermark := a.logFileOffsets
369388
a.logFileOffsets = &logspb.LogsWaterMark{
370-
File: path,
371-
Offset: offset,
389+
File: path,
390+
Offset: offset,
391+
LastLogTimestamp: lastTime,
372392
}
373393

374394
log := logs.NewLogger()

app/pkg/analyze/crud.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package analyze
33
import (
44
"context"
55
"sort"
6+
"time"
67

78
"connectrpc.com/connect"
89
"github.com/jlewi/foyle/app/pkg/logs"
@@ -22,14 +23,21 @@ type CrudHandler struct {
2223
blocksDB *pebble.DB
2324
tracesDB *pebble.DB
2425
analyzer *Analyzer
26+
location *time.Location
2527
}
2628

2729
func NewCrudHandler(cfg config.Config, blocksDB *pebble.DB, tracesDB *pebble.DB, analyzer *Analyzer) (*CrudHandler, error) {
30+
// Load the PST time zone
31+
location, err := time.LoadLocation("America/Los_Angeles")
32+
if err != nil {
33+
return nil, errors.Wrap(err, "Failed to load location America/Los_Angeles")
34+
}
2835
return &CrudHandler{
2936
cfg: cfg,
3037
blocksDB: blocksDB,
3138
tracesDB: tracesDB,
3239
analyzer: analyzer,
40+
location: location,
3341
}, nil
3442
}
3543

@@ -112,5 +120,9 @@ func (h *CrudHandler) Status(ctx context.Context, request *connect.Request[logsp
112120
Watermark: h.analyzer.GetWatermark(),
113121
}
114122

123+
lastTime := response.GetWatermark().GetLastLogTimestamp().AsTime()
124+
lastTime = lastTime.In(h.location)
125+
formattedTime := lastTime.Format("2006-01-02 3:04:05 PM MST")
126+
response.LastLogTimestampHuman = formattedTime
115127
return connect.NewResponse(response), nil
116128
}

app/pkg/analyze/session_manager.go

Lines changed: 126 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ import (
77
"fmt"
88
"os"
99
"path/filepath"
10+
"sync/atomic"
11+
"time"
12+
13+
"github.com/cenkalti/backoff/v4"
1014

1115
"github.com/go-logr/zapr"
1216
"go.uber.org/zap"
@@ -47,6 +51,11 @@ var (
4751
Name: "sqlite_busy",
4852
Help: "Number of operations that failed because sqlite was busy",
4953
})
54+
55+
activeUpdatesGauge = promauto.NewGauge(prometheus.GaugeOpts{
56+
Name: "session_manager_active_updates", // Metric name
57+
Help: "Number of active updates", // Metric description
58+
})
5059
)
5160

5261
// GetDDL return the DDL for the database.
@@ -63,6 +72,9 @@ type SessionUpdater func(session *logspb.Session) error
6372
type SessionsManager struct {
6473
queries *fsql.Queries
6574
db *sql.DB
75+
// Keep track of the number of concurrent calls to Update.
76+
// This is intended to try to track down why SQLITE_BUSY errors is so frequent.
77+
activeUpdates atomic.Int32
6678
}
6779

6880
func NewSessionsManager(db *sql.DB) (*SessionsManager, error) {
@@ -81,6 +93,20 @@ func NewSessionsManager(db *sql.DB) (*SessionsManager, error) {
8193
log := zapr.NewLogger(zap.L())
8294
log.Info("sqlite busy_timeout set", "timeout", 5000)
8395

96+
if _, err := db.Exec("PRAGMA busy_timeout = 10000;"); err != nil {
97+
return nil, errors.Wrapf(err, "Failed to set busy timeout for the database")
98+
}
99+
100+
// Activate WAL mode. This hopefully helps with SQLITE_BUSY errors and contention by using a separate file
101+
// to log writes.
102+
// https://www.sqlite.org/wal.html#:~:text=One%20has%20merely%20to%20run,set%20on%20any%20one%20connection.
103+
// This mode is supposedly persistent the next time the application opens it will still be doing this
104+
output, err := db.Exec("PRAGMA journal_mode=WAL;")
105+
log.Info("Set journal mode to WAL", "output", output)
106+
if err != nil {
107+
return nil, errors.Wrapf(err, "Failed to set journal mode to WAL")
108+
}
109+
84110
// Create the dbtx from the actual database
85111
queries := fsql.New(db)
86112

@@ -117,98 +143,127 @@ func (db *SessionsManager) Get(ctx context.Context, contextID string) (*logspb.S
117143
// inserted if the updateFunc returns nil. If the session already exists then the session is passed to updateFunc
118144
// and the updated value is then written to the database
119145
func (db *SessionsManager) Update(ctx context.Context, contextID string, updateFunc SessionUpdater) error {
146+
// Increment the counter when entering the function
147+
numActive := db.activeUpdates.Add(1)
148+
defer func() {
149+
// Decrement the counter when leaving the function
150+
value := db.activeUpdates.Add(-1)
151+
activeUpdatesGauge.Set(float64(value))
152+
}()
153+
120154
log := logs.FromContext(ctx)
121155
if contextID == "" {
122156
return errors.WithStack(errors.New("contextID must be non-empty"))
123157
}
124158
log = log.WithValues("contextId", contextID)
125159

126-
sessCounter.WithLabelValues("start").Inc()
127-
128-
tx, err := db.db.BeginTx(ctx, &sql.TxOptions{})
129-
if err != nil {
130-
sessCounter.WithLabelValues("failedstart").Inc()
131-
return errors.Wrapf(err, "Failed to start transaction")
160+
// Intended to track whether SQLITE_BUSY errors are correlated with the number of concurrent calls to Update.
161+
if numActive > 1 {
162+
log.Info("Concurrent Session Updates", "numActive", numActive)
132163
}
133164

134-
err = func() error {
135-
queries := db.queries.WithTx(tx)
136-
// Read the record
137-
sessRow, err := queries.GetSession(ctx, contextID)
165+
activeUpdatesGauge.Set(float64(numActive))
166+
sessCounter.WithLabelValues("start").Inc()
138167

139-
// If the session doesn't exist then we do nothing because session is initializeed to empty session
140-
session := &logspb.Session{
141-
ContextId: contextID,
142-
}
143-
if err != nil {
144-
logDBErrors(ctx, err)
145-
if err != sql.ErrNoRows {
146-
sessCounter.WithLabelValues("failedget").Inc()
147-
return errors.Wrapf(err, "Failed to get session with id %v", contextID)
168+
// Wrap the updates in a retry loop. This is intended to deal with SQLITE_BUSY errors and other possible sources
169+
// of contention
170+
b := backoff.NewExponentialBackOff(backoff.WithMaxElapsedTime(5*time.Minute), backoff.WithMaxInterval(30*time.Second))
171+
for {
172+
err := func() error {
173+
tx, err := db.db.BeginTx(ctx, &sql.TxOptions{})
174+
if err != nil {
175+
// See https://go.dev/doc/database/execute-transactions We do not to issue a rollback if BeginTx fails
176+
sessCounter.WithLabelValues("failedstart").Inc()
177+
return errors.Wrapf(err, "Failed to start transaction")
148178
}
149-
// ErrNoRows means the session doesn't exist so we just continue with the empty session
150-
} else {
151-
// Deserialize the proto
152-
if err := proto.Unmarshal(sessRow.Proto, session); err != nil {
153-
return errors.Wrapf(err, "Failed to deserialize session")
179+
180+
// Ensure Rollback gets called.
181+
// This is a null op if the transaction has already been committed or rolled back.
182+
defer func() {
183+
if err := tx.Rollback(); err != nil {
184+
log.Error(err, "Failed to rollback transaction")
185+
}
186+
}()
187+
188+
queries := db.queries.WithTx(tx)
189+
// Read the record
190+
sessRow, err := queries.GetSession(ctx, contextID)
191+
192+
// If the session doesn't exist then we do nothing because session is initializeed to empty session
193+
session := &logspb.Session{
194+
ContextId: contextID,
195+
}
196+
if err != nil {
197+
logDBErrors(ctx, err)
198+
if err != sql.ErrNoRows {
199+
sessCounter.WithLabelValues("failedget").Inc()
200+
return errors.Wrapf(err, "Failed to get session with id %v", contextID)
201+
}
202+
// ErrNoRows means the session doesn't exist so we just continue with the empty session
203+
} else {
204+
// Deserialize the proto
205+
if err := proto.Unmarshal(sessRow.Proto, session); err != nil {
206+
return errors.Wrapf(err, "Failed to deserialize session")
207+
}
154208
}
155-
}
156209

157-
sessCounter.WithLabelValues("callupdatefunc").Inc()
210+
sessCounter.WithLabelValues("callupdatefunc").Inc()
158211

159-
if err := updateFunc(session); err != nil {
160-
return errors.Wrapf(err, "Failed to update session")
161-
}
212+
if err := updateFunc(session); err != nil {
213+
return errors.Wrapf(err, "Failed to update session")
214+
}
162215

163-
newRow, err := protoToRow(session)
164-
if err != nil {
165-
return errors.Wrapf(err, "Failed to convert session proto to table row")
166-
}
216+
newRow, err := protoToRow(session)
217+
if err != nil {
218+
return errors.Wrapf(err, "Failed to convert session proto to table row")
219+
}
167220

168-
if newRow.Contextid != contextID {
169-
return errors.WithStack(errors.Errorf("contextID in session doesn't match contextID. Update was called with contextID: %v but session has contextID: %v", contextID, newRow.Contextid))
170-
}
221+
if newRow.Contextid != contextID {
222+
return errors.WithStack(errors.Errorf("contextID in session doesn't match contextID. Update was called with contextID: %v but session has contextID: %v", contextID, newRow.Contextid))
223+
}
171224

172-
update := fsql.UpdateSessionParams{
173-
Contextid: contextID,
174-
Proto: newRow.Proto,
175-
Starttime: newRow.Starttime,
176-
Endtime: newRow.Endtime,
177-
Selectedid: newRow.Selectedid,
178-
Selectedkind: newRow.Selectedkind,
179-
TotalInputTokens: newRow.TotalInputTokens,
180-
TotalOutputTokens: newRow.TotalOutputTokens,
181-
NumGenerateTraces: newRow.NumGenerateTraces,
182-
}
225+
update := fsql.UpdateSessionParams{
226+
Contextid: contextID,
227+
Proto: newRow.Proto,
228+
Starttime: newRow.Starttime,
229+
Endtime: newRow.Endtime,
230+
Selectedid: newRow.Selectedid,
231+
Selectedkind: newRow.Selectedkind,
232+
TotalInputTokens: newRow.TotalInputTokens,
233+
TotalOutputTokens: newRow.TotalOutputTokens,
234+
NumGenerateTraces: newRow.NumGenerateTraces,
235+
}
183236

184-
sessCounter.WithLabelValues("callupdatesession").Inc()
185-
if err := queries.UpdateSession(ctx, update); err != nil {
186-
logDBErrors(ctx, err)
187-
return errors.Wrapf(err, "Failed to update session")
188-
}
189-
return nil
190-
}()
237+
sessCounter.WithLabelValues("callupdatesession").Inc()
238+
if err := queries.UpdateSession(ctx, update); err != nil {
239+
logDBErrors(ctx, err)
240+
return errors.Wrapf(err, "Failed to update session")
241+
}
191242

192-
if err == nil {
193-
if err := tx.Commit(); err != nil {
194-
logDBErrors(ctx, err)
195-
log.Error(err, "Failed to commit transaction")
196-
sessCounter.WithLabelValues("commitfail").Inc()
197-
return errors.Wrapf(err, "Failed to commit transaction")
243+
if err := tx.Commit(); err != nil {
244+
logDBErrors(ctx, err)
245+
log.Error(err, "Failed to commit transaction")
246+
sessCounter.WithLabelValues("commitfail").Inc()
247+
return errors.Wrapf(err, "Failed to commit transaction")
248+
}
249+
sessCounter.WithLabelValues("success").Inc()
250+
return nil
251+
}()
252+
253+
if err == nil {
254+
sessCounter.WithLabelValues("done").Inc()
255+
return nil
198256
}
199-
sessCounter.WithLabelValues("success").Inc()
200-
} else {
201-
logDBErrors(ctx, err)
202-
sessCounter.WithLabelValues("fail").Inc()
203-
log.Error(err, "Failed to update session")
204-
if txErr := tx.Rollback(); txErr != nil {
205-
log.Error(txErr, "Failed to rollback transaction")
257+
258+
wait := b.NextBackOff()
259+
if wait == backoff.Stop {
260+
sessCounter.WithLabelValues("done").Inc()
261+
err := errors.Errorf("Failed to update session for contextId %s", contextID)
262+
log.Error(err, "Failed to update session")
263+
return err
206264
}
207-
return err
265+
time.Sleep(wait)
208266
}
209-
210-
sessCounter.WithLabelValues("done").Inc()
211-
return nil
212267
}
213268

214269
func (m *SessionsManager) GetSession(ctx context.Context, request *connect.Request[logspb.GetSessionRequest]) (*connect.Response[logspb.GetSessionResponse], error) {

0 commit comments

Comments
 (0)