Skip to content

WIP chore: audio recognition (opus) #79

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 87 additions & 5 deletions recognizer/assets/js/room.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,97 @@ const imgscore = document.getElementById('imgscore');
const time = document.getElementById('time');

let localStream;
let audioStrem;
let socket;
let channel;
let pc;

let audioRecorder;
let audioChunks = [];
let audioSendInterval;

async function startAudioRecording() {
audioChunks = [];

audioRecorder = new MediaRecorder(audioStrem, { mimeType: 'audio/webm;codecs=opus'});

audioRecorder.ondataavailable = (event) => {
if (event.data.size > 0) {
audioChunks.push(event.data);
}
};

audioRecorder.onstop = () => {
const audioBlob = new Blob(audioChunks, { type: 'audio/webm;codecs=opus' });
sendAudio(audioBlob);
audioChunks = [];
};

audioRecorder.start();
}

function stopAudioRecording() {
if (audioRecorder) {
audioRecorder.stop();
}
}

function sendAudio(audioBlob) {
if (!channel || !channel.push) {
console.error('Channel is not initialized or push method is not available. Cannot send audio.');
return;
}

const reader = new FileReader();
reader.readAsDataURL(audioBlob);
reader.onloadend = () => {
try {
const base64AudioMessage = reader.result.split(',')[1];
channel.push('audio_chunk', { audio: base64AudioMessage });
console.log('📤 Audio sent:', base64AudioMessage.length, 'bytes');
} catch (error) {
console.error('Error sending audio:', error);
}
};
}

function startAudioSendingLoop() {
audioSendInterval = setInterval(() => {
stopAudioRecording();
startAudioRecording();
}, 1000); // Send audio every 1 seconds
}

function stopAudioSendingLoop() {
if (audioSendInterval) {
clearInterval(audioSendInterval);
}
}


async function connect() {
console.log('Connecting');
button.onclick = disconnect;

localStream = await navigator.mediaDevices.getUserMedia({
audio: true,
audio: false,
video: {
width: { ideal: 320 },
height: { ideal: 160 },
frameRate: { ideal: 15 },
},
});

audioStrem = await navigator.mediaDevices.getUserMedia({
audio: {
echoCancellation: true,
noiseSuppression: true
},
video: false}
)

videoPlayer.srcObject = localStream;
startAudioRecording();

socket = new Socket('/socket', {});
socket.connect();
Expand Down Expand Up @@ -60,9 +133,15 @@ async function connect() {
});

channel.on('imgReco', (msg) => {
const pred = msg['predictions'][0];
imgpred.innerText = pred['label'];
imgscore.innerText = pred['score'].toFixed(3);
const pred = msg['chunks'];

if (Array.isArray(pred) && pred.length > 0) {
imgpred.innerText = pred[0].text
} else {
imgpred.innerText = ''
}

imgscore.innerText = ''
});

channel.on('sessionTime', (msg) => {
Expand All @@ -76,19 +155,22 @@ async function connect() {
JSON.stringify({ type: 'ice', data: ev.candidate })
);
};
pc.addTrack(localStream.getAudioTracks()[0]);
pc.addTrack(localStream.getVideoTracks()[0]);

const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
channel.push('signaling', JSON.stringify(offer));

startAudioSendingLoop()
}

function disconnect() {
console.log('Disconnecting');
localStream.getTracks().forEach((track) => track.stop());
videoPlayer.srcObject = null;

stopAudioSendingLoop()

if (typeof channel !== 'undefined') {
channel.leave();
}
Expand Down
20 changes: 20 additions & 0 deletions recognizer/lib/recognizer/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ defmodule Recognizer.Application do
name: Recognizer.VideoServing,
batch_size: 4,
batch_timeout: 100},
{Nx.Serving,
serving: create_audio_serving(),
name: Recognizer.AudioServing,
batch_size: 4,
batch_timeout: 100},
{Lobby, @max_rooms}
]

Expand Down Expand Up @@ -56,4 +61,19 @@ defmodule Recognizer.Application do
defn_options: [compiler: EXLA]
)
end

defp create_audio_serving() do
# Load the pre-trained model
{:ok, model_info} = Bumblebee.load_model({:hf, "openai/whisper-tiny"})
{:ok, featurizer} = Bumblebee.load_featurizer({:hf, "openai/whisper-tiny"})
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "openai/whisper-tiny"})
{:ok, generation_config} = Bumblebee.load_generation_config({:hf, "openai/whisper-tiny"})

Bumblebee.Audio.speech_to_text_whisper(model_info, featurizer, tokenizer, generation_config,
compile: [batch_size: 4],
defn_options: [
compiler: EXLA
]
)
end
end
61 changes: 53 additions & 8 deletions recognizer/lib/recognizer/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule Recognizer.Room do

@max_session_time_s Application.compile_env!(:recognizer, :max_session_time_s)
@session_time_timer_interval_ms 1_000
@jitter_buffer_latency_ms 50
@jitter_buffer_latency_ms 100

@video_codecs [
%RTPCodecParameters{
Expand All @@ -20,6 +20,14 @@ defmodule Recognizer.Room do
}
]

@audio_codecs [
%RTPCodecParameters{
payload_type: 97,
mime_type: "audio/opus",
clock_rate: 48_000
}
]

defp id(room_id), do: {:via, Registry, {Recognizer.RoomRegistry, room_id}}

def start_link(room_id) do
Expand All @@ -34,6 +42,10 @@ defmodule Recognizer.Room do
GenServer.cast(id(room_id), {:receive_signaling_msg, msg})
end

def receive_audio_msg(room_id, audio_base64) do
GenServer.cast(id(room_id), {:receive_audio_msg, audio_base64})
end

def stop(room_id) do
GenServer.stop(id(room_id), :shutdown)
end
Expand All @@ -44,6 +56,7 @@ defmodule Recognizer.Room do
Process.send_after(self(), :session_time, @session_time_timer_interval_ms)

{:ok, video_depayloader} = @video_codecs |> hd() |> Depayloader.new()
{:ok, audio_depayloader} = @audio_codecs |> hd() |> Depayloader.new()

{:ok,
%{
Expand All @@ -53,7 +66,9 @@ defmodule Recognizer.Room do
task: nil,
video_track: nil,
video_depayloader: video_depayloader,
audio_depayloader: audio_depayloader,
video_decoder: Xav.Decoder.new(:vp8),
audio_decoder: Xav.Decoder.new(:opus),
video_buffer: JitterBuffer.new(latency: @jitter_buffer_latency_ms),
audio_track: nil,
session_start_time: System.monotonic_time(:millisecond)
Expand All @@ -69,6 +84,7 @@ defmodule Recognizer.Room do
{:ok, pc} =
PeerConnection.start_link(
video_codecs: @video_codecs,
audio_codecs: @audio_codecs,
ice_port_range: ice_port_range
)

Expand All @@ -86,8 +102,27 @@ defmodule Recognizer.Room do
end

@impl true
def handle_cast({:receive_signaling_msg, msg}, state) do
case Jason.decode!(msg) do
def handle_cast({:receive_audio_msg, audio_base64}, state) do
audio_data = Base.decode64!(audio_base64)

with {:ok, frame} <- Xav.Decoder.decode(state.audio_decoder, audio_data) do
tensor = Xav.Frame.to_nx(frame)
Task.async(fn -> Nx.Serving.batched_run(Recognizer.AudioServing, tensor) end)
else
{:error, :no_keyframe} ->
Logger.warning("Couldn't decode audio frame - missing keyframe!")
end

{:noreply, state}
end

@impl true
def handle_cast({:receive_signaling_msg, msg}, state) when is_binary(msg) do
handle_cast({:receive_signaling_msg, Jason.decode!(msg)}, state)
end

def handle_cast({:receive_signaling_msg, msg}, state) when is_map(msg) do
case msg do
%{"type" => "offer"} = offer ->
desc = SessionDescription.from_json(offer)
:ok = PeerConnection.set_remote_description(state.pc, desc)
Expand All @@ -96,6 +131,9 @@ defmodule Recognizer.Room do
msg = %{"type" => "answer", "sdp" => answer.sdp}
send(state.channel, {:signaling, msg})

%{"type" => "ice", "data" => nil} ->
:ok

%{"type" => "ice", "data" => data} when data != nil ->
candidate = ICECandidate.from_json(data)
:ok = PeerConnection.add_ice_candidate(state.pc, candidate)
Expand Down Expand Up @@ -142,9 +180,13 @@ defmodule Recognizer.Room do

@impl true
def handle_info(
{:ex_webrtc, _pc, {:rtp, track_id, nil, _packet}},
{:ex_webrtc, _pc, {:rtp, track_id, nil, packet}},
%{audio_track: %{id: track_id}} = state
) do
# FIX IT
# never appears
packet |> IO.inspect(label: :audio_packet)

# Do something fun with the audio!
{:noreply, state}
end
Expand Down Expand Up @@ -192,6 +234,7 @@ defmodule Recognizer.Room do

@impl true
def handle_info({_ref, predicitons}, state) do
# predicitons |> IO.inspect(label: :predicitons)
send(state.channel, {:img_reco, predicitons})
state = %{state | task: nil}
{:noreply, state}
Expand All @@ -211,15 +254,15 @@ defmodule Recognizer.Room do
end

defp handle_packet(packet, state) do
{frame, depayloader} = Depayloader.depayload(state.video_depayloader, packet)
state = %{state | video_depayloader: depayloader}
{frame, depayloader} = Depayloader.depayload(state.audio_depayloader, packet)
state = %{state | audio_depayloader: depayloader}

with false <- is_nil(frame),
# decoder needs to decode every frame, no matter we are going to process it or not
{:ok, frame} <- Xav.Decoder.decode(state.video_decoder, frame),
{:ok, frame} <- Xav.Decoder.decode(state.audio_decoder, frame),
true <- is_nil(state.task) do
tensor = Xav.Frame.to_nx(frame)
task = Task.async(fn -> Nx.Serving.batched_run(Recognizer.VideoServing, tensor) end)
task = Task.async(fn -> Nx.Serving.batched_run(Recognizer.AudioServing, tensor) end)
%{state | task: task}
else
other when other in [:ok, true, false] ->
Expand All @@ -229,5 +272,7 @@ defmodule Recognizer.Room do
Logger.warning("Couldn't decode video frame - missing keyframe!")
state
end

state
end
end
5 changes: 5 additions & 0 deletions recognizer/lib/recognizer_web/channels/room_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ defmodule RecognizerWeb.RoomChannel do
{:noreply, socket}
end

def handle_in("audio_chunk", %{"audio" => audio_base64}, socket) do
:ok = Room.receive_audio_msg(socket.assigns.room_id, audio_base64)
{:noreply, socket}
end

def handle_info({:signaling, msg}, socket) do
push(socket, "signaling", msg)
{:noreply, socket}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
<script>
window.roomId = "<%= @conn.params["room_id"] %>";
</script>
<div id="room" phx-hook="Room">
<video id="videoPlayer" class="w-full rounded-xl" autoplay muted controls></video>

Expand Down