diff --git a/bulk_processor.go b/bulk_processor.go index f2711f82..798b2311 100644 --- a/bulk_processor.go +++ b/bulk_processor.go @@ -580,6 +580,14 @@ func (w *bulkWorker) commit(ctx context.Context) error { err := RetryNotify(commitFunc, w.p.backoff, notifyFunc) w.updateStats(res) if err != nil { + // After exhausting all retry attempts, the worker must clear the requests + // for the next round. This is important when backoff is disabled or limited + // to a number of rounds, and the aftermath is a failure because the + // commitFunc (and consequently the underlying BulkService.Do call) will not + // clear the requests from the worker. Without this, the same requests will + // be used on the next round of execution of the worker. + w.service.Reset() + w.p.c.errorf("elastic: bulk processor %q failed: %v", w.p.name, err) } diff --git a/bulk_processor_test.go b/bulk_processor_test.go index 19ce060b..66316c91 100644 --- a/bulk_processor_test.go +++ b/bulk_processor_test.go @@ -8,6 +8,8 @@ import ( "context" "fmt" "math/rand" + "net/http" + "net/http/httptest" "reflect" "sync/atomic" "testing" @@ -335,6 +337,60 @@ func TestBulkProcessorFlush(t *testing.T) { } } +func TestBulkWorker_commit_clearFailedRequests(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + defer server.Close() + + client, err := NewClient( + SetURL(server.URL), + SetSniff(false), + SetHealthcheck(false), + SetHttpClient(tooManyHTTPClient{}), + ) + if err != nil { + t.Fatal(err) + } + + var calls int + + bulkProcessor, err := NewBulkProcessorService(client). + BulkActions(-1). + BulkSize(-1). + FlushInterval(0). + RetryItemStatusCodes(). + Backoff(StopBackoff{}). + After(BulkAfterFunc(func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error) { + calls++ + if calls == 1 { + if len(requests) != 10 { + t.Errorf("expected 10 requests; got: %d", len(requests)) + } + } else if len(requests) > 0 { + t.Errorf("expected 0 requests; got: %d", len(requests)) + } + })).Do(context.Background()) + + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + bulkProcessor.Add(NewBulkIndexRequest()) + } + + // first flush should process 10 items + if err := bulkProcessor.Flush(); err != nil { + t.Fatal(err) + } + // second flush should process none (even if the first flush failed) + if err := bulkProcessor.Flush(); err != nil { + t.Fatal(err) + } + if err := bulkProcessor.Close(); err != nil { + t.Fatal(err) + } +} + // -- Helper -- func testBulkProcessor(t *testing.T, numDocs int, svc *BulkProcessorService) { @@ -427,3 +483,12 @@ func testBulkProcessor(t *testing.T, numDocs int, svc *BulkProcessorService) { t.Fatalf("expected %d documents; got: %d", numDocs, count) } } + +type tooManyHTTPClient struct { +} + +func (t tooManyHTTPClient) Do(r *http.Request) (*http.Response, error) { + recorder := httptest.NewRecorder() + recorder.WriteHeader(http.StatusTooManyRequests) + return recorder.Result(), nil +}