Skip to content

Commit 79abbcb

Browse files
committed
add logs race condition fix
1 parent e68c4fd commit 79abbcb

File tree

2 files changed

+51
-0
lines changed

2 files changed

+51
-0
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/aws_batch_log_record_processor.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,3 +178,12 @@ def _estimate_log_size(self, log: LogData, depth: int = 3) -> int: # pylint: di
178178
queue = new_queue
179179

180180
return size
181+
182+
# Only export the logs once to avoid the race condition of the worker thread and force flush thread
183+
# https://github.com/open-telemetry/opentelemetry-python/issues/3193
184+
# https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L199
185+
def force_flush(self, timeout_millis: Optional[int] = None) -> bool:
186+
if self._shutdown:
187+
return False
188+
self._export(BatchLogExportStrategy.EXPORT_AT_LEAST_ONE_BATCH)
189+
return True

aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/exporter/otlp/aws/logs/test_aws_batch_log_record_processor.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,48 @@ def test_export_single_batch_some_logs_over_size_limit(self, _, __, ___):
231231
expected_size = expected_sizes[index]
232232
self.assertEqual(len(batch), expected_size)
233233

234+
def test_force_flush_returns_false_when_shutdown(self):
235+
"""Tests that force_flush returns False when processor is shutdown"""
236+
self.processor.shutdown()
237+
result = self.processor.force_flush()
238+
239+
# Verify force_flush returns False and no export is called
240+
self.assertFalse(result)
241+
self.mock_exporter.export.assert_not_called()
242+
243+
@patch(
244+
"amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.attach",
245+
return_value=MagicMock(),
246+
)
247+
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.detach")
248+
@patch("amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor.set_value")
249+
def test_force_flush_exports_only_one_batch(self, _, __, ___):
250+
"""Tests that force_flush should try to at least export one batch of logs. Rest of the logs will be dropped"""
251+
# Set max_export_batch_size to 5 to limit batch size
252+
self.processor._max_export_batch_size = 5
253+
self.processor._shutdown = False
254+
255+
# Add 6 logs to queue, after the export there should be 1 log remaining
256+
log_count = 6
257+
test_logs = self.generate_test_log_data(log_body="test message", count=log_count)
258+
259+
for log in test_logs:
260+
self.processor._queue.appendleft(log)
261+
262+
self.assertEqual(len(self.processor._queue), log_count)
263+
264+
result = self.processor.force_flush()
265+
266+
self.assertTrue(result)
267+
# 1 log should remain
268+
self.assertEqual(len(self.processor._queue), 1)
269+
self.mock_exporter.export.assert_called_once()
270+
271+
# Verify only one batch of 5 logs was exported
272+
args, _ = self.mock_exporter.export.call_args
273+
exported_batch = args[0]
274+
self.assertEqual(len(exported_batch), 5)
275+
234276
@staticmethod
235277
def generate_test_log_data(
236278
log_body,

0 commit comments

Comments
 (0)