diff --git a/recognizer/assets/js/room.js b/recognizer/assets/js/room.js index abc812e..0b31d91 100644 --- a/recognizer/assets/js/room.js +++ b/recognizer/assets/js/room.js @@ -12,10 +12,74 @@ const imgscore = document.getElementById('imgscore'); const time = document.getElementById('time'); let localStream; +let audioStrem; let socket; let channel; let pc; +let audioRecorder; +let audioChunks = []; +let audioSendInterval; + +async function startAudioRecording() { + audioChunks = []; + + audioRecorder = new MediaRecorder(audioStrem, { mimeType: 'audio/webm;codecs=opus'}); + + audioRecorder.ondataavailable = (event) => { + if (event.data.size > 0) { + audioChunks.push(event.data); + } + }; + + audioRecorder.onstop = () => { + const audioBlob = new Blob(audioChunks, { type: 'audio/webm;codecs=opus' }); + sendAudio(audioBlob); + audioChunks = []; + }; + + audioRecorder.start(); +} + +function stopAudioRecording() { + if (audioRecorder) { + audioRecorder.stop(); + } +} + +function sendAudio(audioBlob) { + if (!channel || !channel.push) { + console.error('Channel is not initialized or push method is not available. Cannot send audio.'); + return; + } + + const reader = new FileReader(); + reader.readAsDataURL(audioBlob); + reader.onloadend = () => { + try { + const base64AudioMessage = reader.result.split(',')[1]; + channel.push('audio_chunk', { audio: base64AudioMessage }); + console.log('📤 Audio sent:', base64AudioMessage.length, 'bytes'); + } catch (error) { + console.error('Error sending audio:', error); + } + }; +} + +function startAudioSendingLoop() { + audioSendInterval = setInterval(() => { + stopAudioRecording(); + startAudioRecording(); + }, 5000); // Send audio every 5 seconds +} + +function stopAudioSendingLoop() { + if (audioSendInterval) { + clearInterval(audioSendInterval); + } +} + + async function connect() { console.log('Connecting'); button.onclick = disconnect; @@ -29,7 +93,16 @@ async function connect() { }, }); + audioStrem = await navigator.mediaDevices.getUserMedia({ + audio: { + echoCancellation: true, + noiseSuppression: true + }, + video: false} + ) + videoPlayer.srcObject = localStream; + startAudioRecording(); socket = new Socket('/socket', {}); socket.connect(); @@ -59,12 +132,15 @@ async function connect() { } }); - channel.on('imgReco', (msg) => { - const pred = msg['predictions'][0]; + channel.on('imgReco', (pred) => { imgpred.innerText = pred['label']; imgscore.innerText = pred['score'].toFixed(3); }); + channel.on('audioTranscription', (msg) => { + audiotranscription.innerText = msg['text'] + }); + channel.on('sessionTime', (msg) => { time.innerText = msg['time']; }); @@ -82,6 +158,8 @@ async function connect() { const offer = await pc.createOffer(); await pc.setLocalDescription(offer); channel.push('signaling', JSON.stringify(offer)); + + startAudioSendingLoop() } function disconnect() { @@ -89,6 +167,8 @@ function disconnect() { localStream.getTracks().forEach((track) => track.stop()); videoPlayer.srcObject = null; + stopAudioSendingLoop() + if (typeof channel !== 'undefined') { channel.leave(); } diff --git a/recognizer/config/config.exs b/recognizer/config/config.exs index 848158a..fab4bc4 100644 --- a/recognizer/config/config.exs +++ b/recognizer/config/config.exs @@ -55,6 +55,16 @@ config :nx, default_backend: EXLA.Backend config :recognizer, max_rooms: 5, max_session_time_s: 200 +config :exla, + clients: [ + host: [platform: :host], + cuda: [ + # platform: :cuda, + default_device_id: String.to_integer(System.get_env("DEFAULT_DEVICE_ID", "0")), + memory_fraction: String.to_float(System.get_env("MEMORY_FRACTION", "0.9")) + ] + ] + # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config "#{config_env()}.exs" diff --git a/recognizer/lib/recognizer/application.ex b/recognizer/lib/recognizer/application.ex index a636d0e..7363970 100644 --- a/recognizer/lib/recognizer/application.ex +++ b/recognizer/lib/recognizer/application.ex @@ -29,6 +29,11 @@ defmodule Recognizer.Application do name: Recognizer.VideoServing, batch_size: 4, batch_timeout: 100}, + {Nx.Serving, + serving: create_audio_serving(), + name: Recognizer.AudioServing, + batch_size: 4, + batch_timeout: 100}, {Lobby, @max_rooms} ] @@ -56,4 +61,19 @@ defmodule Recognizer.Application do defn_options: [compiler: EXLA] ) end + + defp create_audio_serving() do + # Load the pre-trained model + {:ok, model_info} = Bumblebee.load_model({:hf, "openai/whisper-tiny"}) + {:ok, featurizer} = Bumblebee.load_featurizer({:hf, "openai/whisper-tiny"}) + {:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "openai/whisper-tiny"}) + {:ok, generation_config} = Bumblebee.load_generation_config({:hf, "openai/whisper-tiny"}) + + Bumblebee.Audio.speech_to_text_whisper(model_info, featurizer, tokenizer, generation_config, + compile: [batch_size: 4], + defn_options: [ + compiler: EXLA + ] + ) + end end diff --git a/recognizer/lib/recognizer/room.ex b/recognizer/lib/recognizer/room.ex index 8a478e1..8ddb2ba 100644 --- a/recognizer/lib/recognizer/room.ex +++ b/recognizer/lib/recognizer/room.ex @@ -20,6 +20,14 @@ defmodule Recognizer.Room do } ] + @audio_codecs [ + %RTPCodecParameters{ + payload_type: 97, + mime_type: "audio/opus", + clock_rate: 48_000 + } + ] + defp id(room_id), do: {:via, Registry, {Recognizer.RoomRegistry, room_id}} def start_link(room_id) do @@ -34,6 +42,10 @@ defmodule Recognizer.Room do GenServer.cast(id(room_id), {:receive_signaling_msg, msg}) end + def receive_audio_msg(room_id, audio_base64) do + GenServer.cast(id(room_id), {:receive_audio_msg, audio_base64}) + end + def stop(room_id) do GenServer.stop(id(room_id), :shutdown) end @@ -44,6 +56,7 @@ defmodule Recognizer.Room do Process.send_after(self(), :session_time, @session_time_timer_interval_ms) {:ok, video_depayloader} = @video_codecs |> hd() |> Depayloader.new() + {:ok, audio_depayloader} = @audio_codecs |> hd() |> Depayloader.new() {:ok, %{ @@ -53,7 +66,9 @@ defmodule Recognizer.Room do task: nil, video_track: nil, video_depayloader: video_depayloader, + audio_depayloader: audio_depayloader, video_decoder: Xav.Decoder.new(:vp8), + audio_decoder: Xav.Decoder.new(:opus), video_buffer: JitterBuffer.new(latency: @jitter_buffer_latency_ms), audio_track: nil, session_start_time: System.monotonic_time(:millisecond) @@ -69,6 +84,7 @@ defmodule Recognizer.Room do {:ok, pc} = PeerConnection.start_link( video_codecs: @video_codecs, + audio_codecs: @audio_codecs, ice_port_range: ice_port_range ) @@ -86,8 +102,23 @@ defmodule Recognizer.Room do end @impl true - def handle_cast({:receive_signaling_msg, msg}, state) do - case Jason.decode!(msg) do + def handle_cast({:receive_audio_msg, audio_base64}, state) do + file = "/tmp/audio-#{state.id}.webm" + audio_data = Base.decode64!(audio_base64) + + File.write!(file, audio_data) + Task.async(fn -> Nx.Serving.batched_run(Recognizer.AudioServing, {:file, file}) end) + + {:noreply, state} + end + + @impl true + def handle_cast({:receive_signaling_msg, msg}, state) when is_binary(msg) do + handle_cast({:receive_signaling_msg, Jason.decode!(msg)}, state) + end + + def handle_cast({:receive_signaling_msg, msg}, state) when is_map(msg) do + case msg do %{"type" => "offer"} = offer -> desc = SessionDescription.from_json(offer) :ok = PeerConnection.set_remote_description(state.pc, desc) @@ -96,6 +127,9 @@ defmodule Recognizer.Room do msg = %{"type" => "answer", "sdp" => answer.sdp} send(state.channel, {:signaling, msg}) + %{"type" => "ice", "data" => nil} -> + :ok + %{"type" => "ice", "data" => data} when data != nil -> candidate = ICECandidate.from_json(data) :ok = PeerConnection.add_ice_candidate(state.pc, candidate) @@ -145,6 +179,7 @@ defmodule Recognizer.Room do {:ex_webrtc, _pc, {:rtp, track_id, nil, _packet}}, %{audio_track: %{id: track_id}} = state ) do + # FIX IT, never appears # Do something fun with the audio! {:noreply, state} end @@ -191,12 +226,24 @@ defmodule Recognizer.Room do end @impl true - def handle_info({_ref, predicitons}, state) do - send(state.channel, {:img_reco, predicitons}) + def handle_info({_ref, %{predictions: [_ | _] = predictions}}, state) do + prediction = predictions |> Enum.sort_by(& &1.score) |> hd() + + send(state.channel, {:img_reco, prediction}) state = %{state | task: nil} {:noreply, state} end + def handle_info({_ref, %{chunks: chunks}}, state) do + transcription = + chunks + |> Enum.sort_by(& &1.start_timestamp_seconds) + |> Enum.reduce("", &(&2 <> "\n" <> &1.text)) + + send(state.channel, {:audio_transcription, %{text: transcription}}) + {:noreply, state} + end + @impl true def handle_info(_msg, state) do {:noreply, state} diff --git a/recognizer/lib/recognizer_web/channels/room_channel.ex b/recognizer/lib/recognizer_web/channels/room_channel.ex index cbe7cbd..4d015d1 100644 --- a/recognizer/lib/recognizer_web/channels/room_channel.ex +++ b/recognizer/lib/recognizer_web/channels/room_channel.ex @@ -18,6 +18,11 @@ defmodule RecognizerWeb.RoomChannel do {:noreply, socket} end + def handle_in("audio_chunk", %{"audio" => audio_base64}, socket) do + :ok = Room.receive_audio_msg(socket.assigns.room_id, audio_base64) + {:noreply, socket} + end + def handle_info({:signaling, msg}, socket) do push(socket, "signaling", msg) {:noreply, socket} @@ -28,6 +33,11 @@ defmodule RecognizerWeb.RoomChannel do {:noreply, socket} end + def handle_info({:audio_transcription, msg}, socket) do + push(socket, "audioTranscription", msg) + {:noreply, socket} + end + def handle_info({:session_time, session_time}, socket) do push(socket, "sessionTime", %{time: session_time}) {:noreply, socket} diff --git a/recognizer/lib/recognizer_web/controllers/room_html/room.html.heex b/recognizer/lib/recognizer_web/controllers/room_html/room.html.heex index 283fecc..c80c4ba 100644 --- a/recognizer/lib/recognizer_web/controllers/room_html/room.html.heex +++ b/recognizer/lib/recognizer_web/controllers/room_html/room.html.heex @@ -1,3 +1,6 @@ +
@@ -30,6 +33,16 @@
+
+
Audio
+
+
+
+