Skip to content

Commit 8e2bccf

Browse files
author
Mehak Bindra
committed
streaming logic fix
1 parent 49362b0 commit 8e2bccf

File tree

3 files changed

+87
-35
lines changed

3 files changed

+87
-35
lines changed

packages/apps/src/microsoft/teams/apps/http_stream.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,6 @@ def emit(self, activity: Union[MessageActivityInput, TypingActivityInput, str])
104104
Args:
105105
activity: The activity to emit.
106106
"""
107-
if self._timeout is not None:
108-
self._timeout.cancel()
109-
self._timeout = None
110107

111108
if isinstance(activity, str):
112109
activity = MessageActivityInput(text=activity, type="message")
@@ -115,7 +112,9 @@ def emit(self, activity: Union[MessageActivityInput, TypingActivityInput, str])
115112
# Clear the queue empty event since we just added an item
116113
self._queue_empty_event.clear()
117114

118-
self._timeout = Timeout(0.5, self._flush)
115+
if not self._timeout:
116+
loop = asyncio.get_running_loop()
117+
loop.create_task(self._flush())
119118

120119
def update(self, text: str) -> None:
121120
"""
@@ -171,10 +170,15 @@ async def _flush(self) -> None:
171170
Flush the current activity queue.
172171
"""
173172
# If there are no items in the queue, nothing to flush
174-
async with self._lock:
175-
if not self._queue:
176-
return
173+
if self._lock.locked():
174+
return
175+
176+
await self._lock.acquire()
177177

178+
if not self._queue:
179+
return
180+
181+
try:
178182
if self._timeout is not None:
179183
self._timeout.cancel()
180184
self._timeout = None
@@ -224,6 +228,10 @@ async def _flush(self) -> None:
224228
if self._queue and not self._timeout:
225229
self._timeout = Timeout(0.5, self._flush)
226230

231+
finally:
232+
# Reset flushing flag so future emits can trigger another flush
233+
self._lock.release()
234+
227235
async def _send_activity(self, to_send: TypingActivityInput):
228236
"""
229237
Send an activity to the Teams conversation with the ID.

packages/apps/src/microsoft/teams/apps/utils/timer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def __init__(self, delay: float, callback: TimerCallback) -> None:
2323
self._handle: Optional[asyncio.TimerHandle] = None
2424
self._cancelled: bool = False
2525

26-
loop = asyncio.get_event_loop()
26+
loop = asyncio.get_running_loop()
2727
self._handle = loop.call_later(delay, self._run)
2828

2929
def _run(self) -> None:

packages/apps/tests/test_http_stream.py

Lines changed: 71 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
# pyright: basic
66

77
import asyncio
8-
from datetime import datetime, timedelta
98
from unittest.mock import MagicMock
109

1110
import pytest
@@ -35,13 +34,12 @@ def mock_api_client(self):
3534
client.conversations = mock_conversations
3635

3736
client.send_call_count = 0
38-
client.send_times = []
3937
client.sent_activities = []
4038

4139
async def mock_send(activity):
4240
client.send_call_count += 1
43-
client.send_times.append(datetime.now())
4441
client.sent_activities.append(activity)
42+
await asyncio.sleep(0.05) # Simulate network delay
4543
return SentActivity(id=f"test-id-{client.send_call_count}", activity_params=activity)
4644

4745
client.conversations.activities().create = mock_send
@@ -63,37 +61,45 @@ def http_stream(self, mock_api_client, conversation_reference, mock_logger):
6361
return HttpStream(mock_api_client, conversation_reference, mock_logger)
6462

6563
@pytest.mark.asyncio
66-
async def test_stream_emit_message_flushes_after_500ms(self, mock_api_client, conversation_reference, mock_logger):
67-
"""Test that messages are flushed after 500ms timeout."""
64+
async def test_stream_emit_message_flushes_immediately(self, mock_api_client, conversation_reference, mock_logger):
65+
"""Test that messages are flushed immediately."""
6866

6967
stream = HttpStream(mock_api_client, conversation_reference, mock_logger)
70-
start_time = datetime.now()
71-
7268
stream.emit("Test message")
73-
await asyncio.sleep(0.6) # Wait longer than 500ms timeout
74-
75-
assert mock_api_client.send_call_count > 0, "Should have sent at least one message"
76-
assert any(t >= start_time + timedelta(milliseconds=450) for t in mock_api_client.send_times), (
77-
"Should have waited approximately 500ms before sending"
78-
)
69+
await asyncio.sleep(0.07) # Wait for the flush task to complete
70+
assert mock_api_client.send_call_count == 1
7971

8072
@pytest.mark.asyncio
81-
async def test_stream_multiple_emits_restarts_timer(self, mock_api_client, conversation_reference, mock_logger):
73+
async def test_stream_multiple_emits_timer_check(self, mock_api_client, conversation_reference, mock_logger):
8274
"""Test that multiple emits reset the timer."""
8375

8476
stream = HttpStream(mock_api_client, conversation_reference, mock_logger)
8577

8678
stream.emit("First message")
87-
await asyncio.sleep(0.3) # Wait less than 500ms
88-
89-
stream.emit("Second message") # This should reset the timer
90-
await asyncio.sleep(0.3) # Still less than 500ms from second emit
91-
assert mock_api_client.send_call_count == 0, "Should not have sent yet"
92-
await asyncio.sleep(0.3) # Now over 500ms from second emit
93-
assert mock_api_client.send_call_count > 0, "Should have sent messages after timer expired"
79+
stream.emit("Second message")
80+
stream.emit("Third message")
81+
stream.emit("Fourth message")
82+
stream.emit("Fifth message")
83+
stream.emit("Sixth message")
84+
stream.emit("Seventh message")
85+
stream.emit("Eighth message")
86+
stream.emit("Ninth message")
87+
stream.emit("Tenth message")
88+
stream.emit("Eleventh message")
89+
stream.emit("Twelfth message")
90+
91+
await asyncio.sleep(0.07) # Wait for the flush task to complete
92+
assert mock_api_client.send_call_count == 1 # First message should trigger flush immediately
93+
94+
stream.emit("Thirteenth message")
95+
await asyncio.sleep(0.3) # Less than 500ms from first flush
96+
assert mock_api_client.send_call_count == 1, "No new flush should have occurred yet"
97+
98+
await asyncio.sleep(0.3) # Now exceed 500ms from last emit
99+
assert mock_api_client.send_call_count == 2, "Second flush should have occurred"
94100

95101
@pytest.mark.asyncio
96-
async def test_stream_send_timeout_handled_gracefully(self, mock_api_client, conversation_reference, mock_logger):
102+
async def test_stream_error_handled_gracefully(self, mock_api_client, conversation_reference, mock_logger):
97103
"""Test that send timeouts are handled gracefully with retries."""
98104
call_count = 0
99105

@@ -104,14 +110,15 @@ async def mock_send_with_timeout(activity):
104110
raise TimeoutError("Operation timed out")
105111

106112
# Succeed on second attempt
113+
await asyncio.sleep(0.05) # Simulate delay
107114
return SentActivity(id=f"success-after-timeout-{call_count}", activity_params=activity)
108115

109116
mock_api_client.conversations.activities().create = mock_send_with_timeout
110117

111118
stream = HttpStream(mock_api_client, conversation_reference, mock_logger)
112119

113120
stream.emit("Test message with timeout")
114-
await asyncio.sleep(0.8) # Wait for flush and retries
121+
await asyncio.sleep(0.6) # Wait for flush and 1 retry to complete
115122

116123
result = await stream.close()
117124

@@ -148,7 +155,7 @@ async def test_stream_update_status_sends_typing_activity(
148155
stream = HttpStream(mock_api_client, conversation_reference, mock_logger)
149156

150157
stream.update("Thinking...")
151-
await asyncio.sleep(0.6) # Wait for the flush task to complete
158+
await asyncio.sleep(0.07) # Wait for the flush task to complete
152159

153160
assert stream.count > 0 or len(mock_api_client.sent_activities) > 0, "Should have processed the update"
154161
assert stream.sequence >= 2, "Should increment sequence after sending"
@@ -167,10 +174,9 @@ async def test_stream_sequence_of_update_and_emit(self, mock_api_client, convers
167174
stream = HttpStream(mock_api_client, conversation_reference, mock_logger)
168175

169176
stream.update("Preparing response...")
170-
await asyncio.sleep(0.6)
171-
172177
stream.emit("Final response message")
173-
await asyncio.sleep(0.6)
178+
179+
await asyncio.sleep(0.5) # Wait for the flush task to complete
174180

175181
assert len(mock_api_client.sent_activities) >= 2, "Should have sent typing activity and message"
176182

@@ -186,3 +192,41 @@ async def test_stream_sequence_of_update_and_emit(self, mock_api_client, convers
186192

187193
# Sequence numbers should have increased
188194
assert stream.sequence >= 3, "Sequence should increment for both update and emit"
195+
196+
@pytest.mark.asyncio
197+
async def test_stream_concurrent_emits_do_not_flush_simultaneously(
198+
self, mock_api_client, conversation_reference, mock_logger
199+
):
200+
"""
201+
Test that multiple concurrent emits do not allow simultaneous flush execution.
202+
"""
203+
concurrent_entries = 0
204+
max_concurrent_entries = 0
205+
lock = asyncio.Lock()
206+
207+
async def mock_send(activity):
208+
nonlocal concurrent_entries, max_concurrent_entries
209+
async with lock:
210+
concurrent_entries += 1
211+
max_concurrent_entries = max(max_concurrent_entries, concurrent_entries)
212+
await asyncio.sleep(0.05) # simulate delay in sending
213+
async with lock:
214+
concurrent_entries -= 1
215+
return activity
216+
217+
mock_api_client.conversations.activities().create = mock_send
218+
219+
stream = HttpStream(mock_api_client, conversation_reference, mock_logger)
220+
221+
# Schedule multiple emits concurrently
222+
async def emit_task():
223+
stream.emit("Concurrent message")
224+
225+
tasks = [asyncio.create_task(emit_task()) for _ in range(10)]
226+
await asyncio.gather(*tasks)
227+
228+
# Wait for flushes to complete
229+
await asyncio.sleep(0.07)
230+
231+
# Only one flush should have entered the critical section at a time
232+
assert max_concurrent_entries == 1, f"Flush entered concurrently {max_concurrent_entries} times, expected 1"

0 commit comments

Comments
 (0)