diff --git a/bulk_processor.go b/bulk_processor.go index f2711f82..631d8dbb 100644 --- a/bulk_processor.go +++ b/bulk_processor.go @@ -503,7 +503,7 @@ func (w *bulkWorker) work(ctx context.Context) { w.flushAckC <- struct{}{} } if err != nil { - w.p.c.errorf("elastic: bulk processor %q was unable to perform work: %v", w.p.name, err) + w.p.c.errorf(ctx, "elastic: bulk processor %q was unable to perform work: %v", w.p.name, err) if !stop { waitForActive := func() { // Add back pressure to prevent Add calls from filling up the request queue @@ -556,7 +556,7 @@ func (w *bulkWorker) commit(ctx context.Context) error { } // notifyFunc will be called if retry fails notifyFunc := func(err error) { - w.p.c.errorf("elastic: bulk processor %q failed but may retry: %v", w.p.name, err) + w.p.c.errorf(ctx, "elastic: bulk processor %q failed but may retry: %v", w.p.name, err) } id := atomic.AddInt64(&w.p.executionId, 1) @@ -580,7 +580,7 @@ func (w *bulkWorker) commit(ctx context.Context) error { err := RetryNotify(commitFunc, w.p.backoff, notifyFunc) w.updateStats(res) if err != nil { - w.p.c.errorf("elastic: bulk processor %q failed: %v", w.p.name, err) + w.p.c.errorf(ctx, "elastic: bulk processor %q failed: %v", w.p.name, err) } // Invoke after callback @@ -599,14 +599,14 @@ func (w *bulkWorker) waitForActiveConnection(ready chan<- struct{}) { client := w.p.c stopReconnC := w.p.stopReconnC - w.p.c.errorf("elastic: bulk processor %q is waiting for an active connection", w.p.name) + w.p.c.errorf(context.Background(), "elastic: bulk processor %q is waiting for an active connection", w.p.name) // loop until a health check finds at least 1 active connection or the reconnection channel is closed for { select { case _, ok := <-stopReconnC: if !ok { - w.p.c.errorf("elastic: bulk processor %q active connection check interrupted", w.p.name) + w.p.c.errorf(context.Background(), "elastic: bulk processor %q active connection check interrupted", w.p.name) return } case <-t.C: diff --git a/client.go b/client.go index c412b181..12026369 100644 --- a/client.go +++ b/client.go @@ -26,7 +26,7 @@ import ( const ( // Version is the current version of Elastic. - Version = "7.0.29" + Version = "7.0.30" // DefaultURL is the default endpoint of Elasticsearch on the local machine. // It is used e.g. when initializing a new Client without a specific URL. @@ -84,6 +84,9 @@ const ( ) var ( + // nilByte is used in JSON marshal/unmarshal + nilByte = []byte("null") + // ErrNoClient is raised when no Elasticsearch node is available. ErrNoClient = errors.New("no Elasticsearch node available") @@ -798,7 +801,7 @@ func (c *Client) Start() { c.running = true c.mu.Unlock() - c.infof("elastic: client started") + c.infof(context.Background(), "elastic: client started") } // Stop stops the background processes that the client is running, @@ -828,27 +831,39 @@ func (c *Client) Stop() { c.running = false c.mu.Unlock() - c.infof("elastic: client stopped") + c.infof(context.Background(), "elastic: client stopped") } // errorf logs to the error log. -func (c *Client) errorf(format string, args ...interface{}) { +func (c *Client) errorf(ctx context.Context, format string, args ...interface{}) { if c.errorlog != nil { - c.errorlog.Printf(format, args...) + if logger, ok := c.errorlog.(LoggerWithContext); ok { + logger.PrintfWithContext(ctx, format, args...) + } else { + c.errorlog.Printf(format, args...) + } } } // infof logs informational messages. -func (c *Client) infof(format string, args ...interface{}) { +func (c *Client) infof(ctx context.Context, format string, args ...interface{}) { if c.infolog != nil { - c.infolog.Printf(format, args...) + if logger, ok := c.infolog.(LoggerWithContext); ok { + logger.PrintfWithContext(ctx, format, args...) + } else { + c.infolog.Printf(format, args...) + } } } // tracef logs to the trace log. -func (c *Client) tracef(format string, args ...interface{}) { +func (c *Client) tracef(ctx context.Context, format string, args ...interface{}) { if c.tracelog != nil { - c.tracelog.Printf(format, args...) + if logger, ok := c.tracelog.(LoggerWithContext); ok { + logger.PrintfWithContext(ctx, format, args...) + } else { + c.tracelog.Printf(format, args...) + } } } @@ -857,7 +872,7 @@ func (c *Client) dumpRequest(r *http.Request) { if c.tracelog != nil { out, err := httputil.DumpRequestOut(r, true) if err == nil { - c.tracef("%s\n", string(out)) + c.tracef(r.Context(), "%s\n", string(out)) } } } @@ -867,7 +882,7 @@ func (c *Client) dumpResponse(resp *http.Response) { if c.tracelog != nil { out, err := httputil.DumpResponse(resp, true) if err == nil { - c.tracef("%s\n", string(out)) + c.tracef(context.Background(), "%s\n", string(out)) } } } @@ -1055,7 +1070,7 @@ func (c *Client) updateConns(conns []*conn) { } if !found { // New connection didn't exist, so add it to our list of new conns. - c.infof("elastic: %s joined the cluster", conn.URL()) + c.infof(context.Background(), "elastic: %s joined the cluster", conn.URL()) newConns = append(newConns, conn) } } @@ -1147,11 +1162,11 @@ func (c *Client) healthcheck(parentCtx context.Context, timeout time.Duration, f // Wait for the Goroutine (or its timeout) select { case <-ctx.Done(): // timeout - c.errorf("elastic: %s is dead", conn.URL()) + c.errorf(ctx, "elastic: %s is dead", conn.URL()) conn.MarkAsDead() case err := <-errc: if err != nil { - c.errorf("elastic: %s is dead", conn.URL()) + c.errorf(ctx, "elastic: %s is dead", conn.URL()) conn.MarkAsDead() break } @@ -1159,7 +1174,7 @@ func (c *Client) healthcheck(parentCtx context.Context, timeout time.Duration, f conn.MarkAsAlive() } else { conn.MarkAsDead() - c.errorf("elastic: %s is dead [status=%d]", conn.URL(), status) + c.errorf(ctx, "elastic: %s is dead [status=%d]", conn.URL(), status) } } } @@ -1256,7 +1271,7 @@ func (c *Client) next() (*conn, error) { // So we are marking them as alive--if sniffing is disabled. // They'll then be picked up in the next call to PerformRequest. if !c.snifferEnabled { - c.errorf("elastic: all %d nodes marked as dead; resurrecting them to prevent deadlock", len(c.conns)) + c.errorf(context.Background(), "elastic: all %d nodes marked as dead; resurrecting them to prevent deadlock", len(c.conns)) for _, conn := range c.conns { conn.MarkAsAlive() } @@ -1380,13 +1395,13 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) continue // try again } if err != nil { - c.errorf("elastic: cannot get connection from pool") + c.errorf(ctx, "elastic: cannot get connection from pool") return nil, err } req, err = NewRequest(opt.Method, conn.URL()+pathWithParams) if err != nil { - c.errorf("elastic: cannot create request for %s %s: %v", strings.ToUpper(opt.Method), conn.URL()+pathWithParams, err) + c.errorf(ctx, "elastic: cannot create request for %s %s: %v", strings.ToUpper(opt.Method), conn.URL()+pathWithParams, err) return nil, err } if basicAuth { @@ -1415,7 +1430,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) if opt.Body != nil { err = req.SetBody(opt.Body, gzipEnabled) if err != nil { - c.errorf("elastic: couldn't set body %+v for request: %v", opt.Body, err) + c.errorf(ctx, "elastic: couldn't set body %+v for request: %v", opt.Body, err) return nil, err } } @@ -1433,12 +1448,12 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) n++ wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err) if rerr != nil { - c.errorf("elastic: %s is dead", conn.URL()) + c.errorf(ctx, "elastic: %s is dead", conn.URL()) conn.MarkAsDead() return nil, rerr } if !ok { - c.errorf("elastic: %s is dead", conn.URL()) + c.errorf(ctx, "elastic: %s is dead", conn.URL()) conn.MarkAsDead() return nil, err } @@ -1450,7 +1465,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) n++ wait, ok, rerr := retrier.Retry(ctx, n, (*http.Request)(req), res, err) if rerr != nil { - c.errorf("elastic: %s is dead", conn.URL()) + c.errorf(ctx, "elastic: %s is dead", conn.URL()) conn.MarkAsDead() return nil, rerr } @@ -1473,7 +1488,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) if len(res.Header["Warning"]) > 0 { c.deprecationlog((*http.Request)(req), res) for _, warning := range res.Header["Warning"] { - c.errorf("Deprecation warning: %s", warning) + c.errorf(ctx, "Deprecation warning: %s", warning) } } @@ -1497,7 +1512,7 @@ func (c *Client) PerformRequest(ctx context.Context, opt PerformRequestOptions) } duration := time.Now().UTC().Sub(start) - c.infof("%s %s [status:%d, request:%.3fs]", + c.infof(ctx, "%s %s [status:%d, request:%.3fs]", strings.ToUpper(opt.Method), req.URL, resp.StatusCode, diff --git a/logger.go b/logger.go index 095eb4cd..ec9e727e 100644 --- a/logger.go +++ b/logger.go @@ -4,7 +4,15 @@ package elastic +import "context" + // Logger specifies the interface for all log operations. type Logger interface { Printf(format string, v ...interface{}) } + +// LoggerWithContext extends the Logger interface by a context. +type LoggerWithContext interface { + Logger + PrintfWithContext(ctx context.Context, format string, v ...interface{}) +}