Skip to content

Commit 24efece

Browse files
authored
Merge branch 'main' into fern-bot/2025-08-13T04-23Z
2 parents 9eab5e3 + 16a01ba commit 24efece

File tree

2 files changed

+79
-21
lines changed

2 files changed

+79
-21
lines changed

src/hume/empathic_voice/chat/audio/audio_utilities.py

Lines changed: 55 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
# Tsrc/hume/empathic_voice/chat/audio/audio_utilities.pyHIS FILE IS MANUALLY MAINTAINED: see .fernignore
1+
# THIS FILE IS MANUALLY MAINTAINED: see .fernignore
22
"""
33
* WAV/PCM handled with `wave` module
44
* MP3 decoded by shelling out to ffmpeg (`ffmpeg` must be in $PATH)
55
"""
66

77
from __future__ import annotations
88
import asyncio, io, wave, queue, shlex
9-
from typing import TYPE_CHECKING, AsyncIterable, Optional
9+
from typing import TYPE_CHECKING, AsyncIterable, Optional, Callable, Awaitable
1010

1111
_missing: Optional[Exception] = None
1212
try:
@@ -48,40 +48,52 @@ async def play_audio(
4848
blob: bytes,
4949
*,
5050
device: Optional[int] = None,
51-
blocksize = None
51+
blocksize = None,
52+
sample_rate: int = 48000,
5253
) -> None:
5354
async def _one_chunk():
5455
yield blob
55-
await play_audio_streaming(_one_chunk().__aiter__(), device=device, blocksize=blocksize)
56+
await play_audio_streaming(_one_chunk().__aiter__(), device=device, blocksize=blocksize, sample_rate=sample_rate)
5657

5758

5859
async def play_audio_streaming(
5960
chunks: AsyncIterable[bytes],
6061
*,
6162
device: Optional[int] = None,
6263
blocksize: Optional[int] = None,
64+
sample_rate: int = 48000,
65+
on_playback_active: Optional[Callable[[], Awaitable[None]]] = None,
66+
on_playback_idle: Optional[Callable[[], Awaitable[None]]] = None,
6367
) -> None:
6468
_need_deps()
6569
iterator = chunks.__aiter__()
6670
first = await iterator.__anext__()
6771

72+
if on_playback_active:
73+
await on_playback_active()
74+
6875
if _looks_like_mp3(first):
69-
await _stream_mp3(chunks, first, device=device)
76+
await _stream_mp3(chunks, first, device=device, on_playback_active=on_playback_active, on_playback_idle=on_playback_idle)
7077
elif _looks_like_wav(first):
71-
await _stream_wav(chunks, first, device=device)
78+
await _stream_wav(chunks, first, device=device, on_playback_active=on_playback_active, on_playback_idle=on_playback_idle)
7279
else:
7380
async def _reassembled():
7481
yield first
7582
async for chunk in chunks:
7683
yield chunk
77-
await _stream_pcm(_reassembled(), 48000, 1, device=device, blocksize=blocksize)
84+
await _stream_pcm(_reassembled(), sample_rate, 1, device=device, blocksize=blocksize, on_playback_active=on_playback_active, on_playback_idle=on_playback_idle)
85+
86+
if on_playback_idle:
87+
await on_playback_idle()
7888

7989
async def _stream_pcm(
8090
pcm_chunks: AsyncIterable[bytes],
8191
sample_rate: int,
8292
n_channels: int,
8393
device: Optional[int] = None,
8494
blocksize: Optional[int] = _DEFAULT_BLOCKSIZE,
95+
on_playback_active: Optional[Callable[[], Awaitable[None]]] = None,
96+
on_playback_idle: Optional[Callable[[], Awaitable[None]]] = None,
8597
) -> None:
8698
"""Generic PCM player: pulls raw PCM from chunks and plays via sounddevice."""
8799
_need_deps()
@@ -102,16 +114,39 @@ async def feeder():
102114
# consume queue in sounddevice callback
103115
async def player():
104116
buf = b""
117+
finished = False
118+
was_idle = False
105119
def cb(outdata, frames, *_):
106-
nonlocal buf
120+
nonlocal buf, finished, was_idle
107121
need = frames * n_channels * _BYTES_PER_SAMP
108122
while len(buf) < need:
109-
part = pcm_queue.get()
110-
if part is None:
111-
raise sd.CallbackStop
112-
buf += part
113-
outdata[:] = buf[:need]
114-
buf = buf[need:]
123+
try:
124+
part = pcm_queue.get_nowait()
125+
if part is None:
126+
finished = True
127+
break
128+
buf += part
129+
if was_idle and on_playback_active:
130+
was_idle = False
131+
loop.call_soon_threadsafe(lambda: asyncio.create_task(on_playback_active()))
132+
except queue.Empty:
133+
if not was_idle and on_playback_idle:
134+
was_idle = True
135+
loop.call_soon_threadsafe(lambda: asyncio.create_task(on_playback_idle()))
136+
break
137+
138+
if len(buf) >= need:
139+
# We have enough data
140+
outdata[:] = buf[:need]
141+
buf = buf[need:]
142+
elif finished:
143+
# Stream is finished, stop playback
144+
raise sd.CallbackStop
145+
else:
146+
# Not enough data and stream not finished - fill with silence
147+
silence = b'\x00' * (need - len(buf))
148+
outdata[:] = buf + silence
149+
buf = b""
115150

116151
with sd.RawOutputStream(
117152
samplerate=sample_rate,
@@ -129,6 +164,8 @@ async def _stream_wav(
129164
chunks: AsyncIterable[bytes],
130165
first: bytes,
131166
device: Optional[int] = None,
167+
on_playback_active: Optional[Callable[[], Awaitable[None]]] = None,
168+
on_playback_idle: Optional[Callable[[], Awaitable[None]]] = None,
132169
) -> None:
133170
# build header + ensure we have 44 bytes
134171
header = bytearray(first)
@@ -143,12 +180,14 @@ async def pcm_gen():
143180
async for c in iterator:
144181
yield c
145182

146-
await _stream_pcm(pcm_gen(), sample_rate, n_channels, device=device)
183+
await _stream_pcm(pcm_gen(), sample_rate, n_channels, device=device, on_playback_active=on_playback_active, on_playback_idle=on_playback_idle)
147184

148185
async def _stream_mp3(
149186
chunks: AsyncIterable[bytes],
150187
first: bytes,
151188
device: Optional[int] = None,
189+
on_playback_active: Optional[Callable[[], Awaitable[None]]] = None,
190+
on_playback_idle: Optional[Callable[[], Awaitable[None]]] = None,
152191
) -> None:
153192
cmd = (
154193
"ffmpeg -hide_banner -loglevel error -i pipe:0 "
@@ -184,4 +223,4 @@ async def pcm_generator():
184223
await feed_task
185224
await proc.wait()
186225

187-
await _stream_pcm(pcm_generator(), 48_000, 2, device=device)
226+
await _stream_pcm(pcm_generator(), 48_000, 2, device=device, on_playback_active=on_playback_active, on_playback_idle=on_playback_idle)

src/hume/empathic_voice/chat/audio/chat_client.py

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from dataclasses import dataclass
77
from typing import AsyncIterable
88

9-
from hume.empathic_voice.chat.audio.audio_utilities import play_audio
9+
from hume.empathic_voice.chat.audio.audio_utilities import play_audio_streaming
1010
from hume.empathic_voice.chat.audio.microphone_sender import Sender
1111
from hume.empathic_voice.chat.socket_client import ChatWebsocketConnection
1212

@@ -30,10 +30,29 @@ def new(cls, *, sender: Sender, byte_strs: AsyncIterable[bytes]) -> "ChatClient"
3030
return cls(sender=sender, byte_strs=byte_strs)
3131

3232
async def _play(self) -> None:
33-
async for byte_str in self.byte_strs:
34-
await self.sender.on_audio_begin()
35-
await play_audio(byte_str)
36-
await self.sender.on_audio_end()
33+
async def iterable() -> AsyncIterable[bytes]:
34+
first = True
35+
async for byte_str in self.byte_strs:
36+
# Each chunk of audio data sent from evi is a .wav
37+
# file. We want to concatenate these as one long .wav
38+
# stream rather than playing each individual .wav file
39+
# and starting and stopping the audio player for each
40+
# chunk.
41+
#
42+
# Every .wav file starts with a 44 byte header that
43+
# declares metadata like the sample rate and the number
44+
# of channels. We assume that the first .wav header
45+
# applies for the entire stream, so for all but the
46+
# first chunk we skip the first 44 bytes.
47+
if not first:
48+
byte_str = byte_str[44:]
49+
yield byte_str
50+
first = False
51+
await play_audio_streaming(
52+
iterable(),
53+
on_playback_active=self.sender.on_audio_begin,
54+
on_playback_idle=self.sender.on_audio_end
55+
)
3756

3857
async def run(self, *, socket: ChatWebsocketConnection) -> None:
3958
"""Run the chat client.

0 commit comments

Comments
 (0)