From ed6df5f0a0f6f3017d498234dcaa80cd2be9ba75 Mon Sep 17 00:00:00 2001 From: Vitalymt Date: Wed, 27 May 2026 17:54:38 +0000 Subject: [PATCH] fix: flush accumulated buffer on EOF without trailing newline When a file doesn't end with a newline character, the last line is silently dropped. The read loop accumulates data in accumBuf until it finds a '\n', but at EOF the remaining data is saved to job.tail without ever being emitted through controller.In(). If the file is never written to again, that data is lost permanently. Fix: emit accumBuf as a final event before calling processEOF when EOF is reached and the buffer is non-empty. Clear both accumBuf and job.tail afterward to prevent re-emission on file truncation. Fixes #912 --- plugin/input/file/worker.go | 8 ++++++++ plugin/input/file/worker_test.go | 11 +++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/plugin/input/file/worker.go b/plugin/input/file/worker.go index 18bb6738d..96485a104 100644 --- a/plugin/input/file/worker.go +++ b/plugin/input/file/worker.go @@ -202,6 +202,14 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi job.tail = append(job.tail[:0], accumBuf...) job.curOffset += readTotal + // Flush remaining data when EOF is reached and there's no trailing newline. + // Without this, the last line of a file that doesn't end with '\n' is silently dropped. + if isEOFReached && len(accumBuf) > 0 { + job.lastEventSeq = controller.In(sourceID, sourceName, pipeline.NewOffsets(lastOffset+scanned, offsets), accumBuf, isVirgin, metadataInfo) + accumBuf = accumBuf[:0] + job.tail = job.tail[:0] + } + // check if file was truncated. if isEOFReached { err := w.processEOF(file, job, jobProvider, lastOffset+readTotal) diff --git a/plugin/input/file/worker_test.go b/plugin/input/file/worker_test.go index e3defc21a..8a307a56f 100644 --- a/plugin/input/file/worker_test.go +++ b/plugin/input/file/worker_test.go @@ -81,11 +81,18 @@ func TestWorkerWork(t *testing.T) { expData: "abc\n", }, { - name: "should_ok_when_read_1_line_without_newline", + name: "should_emit_last_line_without_trailing_newline", maxEventSize: 1024, inFile: "abc", readBufferSize: 1024, - expData: "", + expData: "abc", + }, + { + name: "should_emit_last_line_among_multiple_without_trailing_newline", + maxEventSize: 1024, + inFile: "line1\nline2\nline3", + readBufferSize: 1024, + expData: "line3", }, } for _, tt := range tests {