diff --git a/README.md b/README.md index ce9187c..03971c8 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ The package can be installed by adding `ex_hls` to your list of dependencies in ```elixir def deps do [ - {:ex_hls, "~> 0.1.5"} + {:ex_hls, "~> 0.2.0"} ] end ``` @@ -55,7 +55,7 @@ Now you can get the Elixir stream containing media chunks: ```elixir stream = ExHLS.Client.generate_stream(client) Enum.take(stream, 5) -# Returns: +# Returns: # [ # %ExHLS.Chunk{ # payload: <<220, 0, 76, 97, 118, 99, 54, 49, 46, 51, 46, 49, 48, 48, 0, 66, diff --git a/lib/ex_hls/demuxing_engine/mpeg_ts.ex b/lib/ex_hls/demuxing_engine/mpeg_ts.ex index 710f131..ef133e8 100644 --- a/lib/ex_hls/demuxing_engine/mpeg_ts.ex +++ b/lib/ex_hls/demuxing_engine/mpeg_ts.ex @@ -9,8 +9,8 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do alias Membrane.{AAC, H264, RemoteStream} alias MPEG.TS.Demuxer - @enforce_keys [:demuxer, :last_tden_tag] - defstruct @enforce_keys ++ [track_timestamps_data: %{}] + @enforce_keys [:demuxer] + defstruct @enforce_keys ++ [track_timestamps_data: %{}, last_tden_tag: nil, packets_map: %{}] # using it a boundary expressed in nanoseconds, instead of the usual 90kHz clock ticks, # generates up to 1/10th of ms error per 26.5 hours of stream which is acceptable in @@ -19,7 +19,8 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do @type t :: %__MODULE__{ demuxer: Demuxer.t(), - last_tden_tag: String.t() | nil + last_tden_tag: String.t() | nil, + packets_map: %{(track_id :: non_neg_integer()) => Qex.t(MPEG.TS.Demuxer.Container.t())} } @impl true @@ -28,31 +29,31 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do # different demuxing engines def new(_timestamp_offset_ms) do demuxer = Demuxer.new() - - # we need to explicitly override that `waiting_random_access_indicator` as otherwise Demuxer - # discards all the input data - # TODO - figure out how to do it properly - demuxer = %{demuxer | waiting_random_access_indicator: false} - - %__MODULE__{demuxer: demuxer, last_tden_tag: nil} + %__MODULE__{demuxer: demuxer} end @impl true def feed!(%__MODULE__{} = demuxing_engine, binary) do - demuxing_engine - |> Map.update!(:demuxer, &Demuxer.push_buffer(&1, binary)) + {new_packets, demuxer} = Demuxer.demux(demuxing_engine.demuxer, binary) + + packets_map = + Enum.reduce(new_packets, demuxing_engine.packets_map, fn new_packet, packets_map -> + Map.update(packets_map, new_packet.pid, Qex.new([new_packet]), &Qex.push(&1, new_packet)) + end) + + %{demuxing_engine | demuxer: demuxer, packets_map: packets_map} end @impl true def get_tracks_info(%__MODULE__{} = demuxing_engine) do - with %{streams: streams} <- demuxing_engine.demuxer.pmt do + with %{streams: streams} <- demuxing_engine.demuxer do tracks_info = streams |> Enum.flat_map(fn - {id, %{stream_type: :AAC}} -> + {id, %{stream_type: :AAC_ADTS}} -> [{id, %RemoteStream{content_format: AAC}}] - {id, %{stream_type: :H264}} -> + {id, %{stream_type: :H264_AVC}} -> [{id, %RemoteStream{content_format: H264}}] {id, unsupported_stream_info} -> @@ -80,47 +81,56 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do @impl true def pop_chunk(%__MODULE__{} = demuxing_engine, track_id) do - with {[packet], demuxer} <- Demuxer.take(demuxing_engine.demuxer, track_id) do - {maybe_tden_tag, demuxer} = maybe_read_tden_tag(demuxer, packet.pts) - tden_tag = maybe_tden_tag || demuxing_engine.last_tden_tag + with {[packet], demuxing_engine} <- take_packets(demuxing_engine, track_id) do + demuxing_engine = maybe_read_tden_tag(demuxing_engine, packet.payload.pts) {demuxing_engine, packet} = - %{demuxing_engine | demuxer: demuxer, last_tden_tag: tden_tag} - |> handle_possible_timestamps_rollover(track_id, packet) + demuxing_engine |> handle_possible_timestamps_rollover(track_id, packet) chunk = %ExHLS.Chunk{ - payload: packet.data, - pts_ms: packet.pts |> packet_ts_to_millis(), - dts_ms: packet.dts |> packet_ts_to_millis(), + payload: packet.payload.data, + pts_ms: packet.payload.pts |> packet_ts_to_millis(), + dts_ms: packet.payload.dts |> packet_ts_to_millis(), track_id: track_id, metadata: %{ - discontinuity: packet.discontinuity, - is_aligned: packet.is_aligned, - tden_tag: tden_tag + discontinuity: packet.payload.discontinuity, + is_aligned: packet.payload.is_aligned, + tden_tag: demuxing_engine.last_tden_tag } } {:ok, chunk, demuxing_engine} else - {[], demuxer} -> - {:error, :empty_track_data, %{demuxing_engine | demuxer: demuxer}} + {[], demuxing_engine} -> + {:error, :empty_track_data, demuxing_engine} end end - defp maybe_read_tden_tag(demuxer, packet_pts) do + defp take_packets(demuxing_engine, track_id) do + with {:ok, packets} <- Map.fetch(demuxing_engine.packets_map, track_id), + {{:value, packet}, rest} <- Qex.pop(packets) do + demuxing_engine = put_in(demuxing_engine.packets_map[track_id], rest) + {[packet], demuxing_engine} + else + _other -> {[], demuxing_engine} + end + end + + defp maybe_read_tden_tag(demuxing_engine, packet_pts) do withl no_id3_stream: {id3_track_id, _stream_description} <- - demuxer.pmt.streams + demuxing_engine.demuxer.streams |> Enum.find(fn {_pid, stream_description} -> stream_description.stream_type == :METADATA_IN_PES end), - no_id3_data: {[id3], demuxer} <- Demuxer.take(demuxer, id3_track_id), - id3_not_in_timerange: true <- id3.pts <= packet_pts do - {parse_tden_tag(id3.data), demuxer} + no_id3_data: {[id3], demuxing_engine} <- take_packets(demuxing_engine, id3_track_id), + id3_not_in_timerange: true <- id3.payload.pts <= packet_pts do + tden_tag = parse_tden_tag(id3.payload.data) || demuxing_engine.last_tden_tag + %{demuxing_engine | last_tden_tag: tden_tag} else - no_id3_stream: nil -> {nil, demuxer} - no_id3_data: {[], updated_demuxer} -> {nil, updated_demuxer} - id3_not_in_timerange: false -> {nil, demuxer} + no_id3_stream: nil -> demuxing_engine + no_id3_data: {[], updated_demuxing_engine} -> updated_demuxing_engine + id3_not_in_timerange: false -> demuxing_engine end end @@ -143,7 +153,7 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do @impl true def end_stream(%__MODULE__{} = demuxing_engine) do - demuxer = Demuxer.end_of_stream(demuxing_engine.demuxer) + {_flushed, demuxer} = Demuxer.flush(demuxing_engine.demuxer) %{demuxing_engine | demuxer: demuxer} end @@ -155,32 +165,34 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do rollovers_offset = rollovers_count * @timestamp_range_size_ns - packet = - packet + payload = + packet.payload |> Map.update!(:pts, &add_offset_if_not_nil(&1, rollovers_offset)) |> Map.update!(:dts, &add_offset_if_not_nil(&1, rollovers_offset)) - {demuxing_engine, packet} = + {demuxing_engine, payload} = with last_ts when last_ts != nil <- last_dts || last_pts, - true <- last_ts > (packet.dts || packet.pts) do + true <- last_ts > (payload.dts || payload.pts) do demuxing_engine = demuxing_engine |> update_in([:track_timestamps_data, track_id, :rollovers_count], &(&1 + 1)) - packet = - packet + payload = + payload |> Map.update!(:pts, &add_offset_if_not_nil(&1, @timestamp_range_size_ns)) |> Map.update!(:dts, &add_offset_if_not_nil(&1, @timestamp_range_size_ns)) - {demuxing_engine, packet} + {demuxing_engine, payload} else - _other -> {demuxing_engine, packet} + _other -> {demuxing_engine, payload} end demuxing_engine = demuxing_engine - |> put_in([:track_timestamps_data, track_id, :last_pts], packet.pts) - |> put_in([:track_timestamps_data, track_id, :last_dts], packet.dts) + |> put_in([:track_timestamps_data, track_id, :last_pts], payload.pts) + |> put_in([:track_timestamps_data, track_id, :last_dts], payload.dts) + + packet = %{packet | payload: payload} {demuxing_engine, packet} end diff --git a/mix.exs b/mix.exs index ace020b..0f17913 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule ExHLS.Mixfile do use Mix.Project - @version "0.1.5" + @version "0.2.0" @github_url "https://github.com/membraneframework/ex_hls" def project do @@ -43,13 +43,10 @@ defmodule ExHLS.Mixfile do {:bunch, "~> 1.6"}, {:membrane_mp4_plugin, "~> 0.36.0"}, {:membrane_h26x_plugin, "~> 0.10.2"}, - {:mpeg_ts, - github: "membraneframework-labs/kim_mpeg_ts", - branch: "varsill/fix_pes_optional_header_resolving"}, + {:mpeg_ts, "~> 3.3.5"}, {:ex_doc, ">= 0.0.0", only: :dev, runtime: false}, {:dialyxir, ">= 0.0.0", only: :dev, runtime: false}, - {:credo, ">= 0.0.0", only: :dev, runtime: false}, - {:mock, "~> 0.3.0", only: :test} + {:credo, ">= 0.0.0", only: :dev, runtime: false} ] end diff --git a/mix.lock b/mix.lock index 3907203..00d97cd 100644 --- a/mix.lock +++ b/mix.lock @@ -18,7 +18,6 @@ "makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"}, "makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"}, "makeup_erlang": {:hex, :makeup_erlang, "1.0.2", "03e1804074b3aa64d5fad7aa64601ed0fb395337b982d9bcf04029d68d51b6a7", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "af33ff7ef368d5893e4a267933e7744e46ce3cf1f61e2dccf53a111ed3aa3727"}, - "meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"}, "membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"}, "membrane_cmaf_format": {:hex, :membrane_cmaf_format, "0.7.1", "9ea858faefdcb181cdfa8001be827c35c5f854e9809ad57d7062cff1f0f703fd", [:mix], [], "hexpm", "3c7b4ed2a986e27f6f336d2f19e9442cb31d93b3142fc024c019572faca54a73"}, "membrane_core": {:hex, :membrane_core, "1.2.4", "3f9fc78cef29b69acadd4f959c8ec23cbb1544c26c8e8474589b143ada9a0da2", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ec7a77b7ab457267c0243338383365f6ef5ace2686ddc129939e502a58eba546"}, @@ -32,8 +31,7 @@ "membrane_timestamp_queue": {:hex, :membrane_timestamp_queue, "0.2.2", "1c831b2273d018a6548654aa9f7fa7c4b683f71d96ffe164934ef55f9d11f693", [:mix], [{:heap, "~> 2.0", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c830e760baaced0988421671cd2c83c7cda8d1bd2b61fd05332711675d1204f"}, "mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"}, "mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"}, -"mpeg_ts": {:git, "https://github.com/membraneframework-labs/kim_mpeg_ts.git", "c8c770e0e7714c72b3faa7f20088fdbd76f5bade", [branch: "varsill/fix_pes_optional_header_resolving"]}, - "mock": {:hex, :mock, "0.3.9", "10e44ad1f5962480c5c9b9fa779c6c63de9bd31997c8e04a853ec990a9d841af", [:mix], [{:meck, "~> 0.9.2", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "9e1b244c4ca2551bb17bb8415eed89e40ee1308e0fbaed0a4fdfe3ec8a4adbd3"}, + "mpeg_ts": {:hex, :mpeg_ts, "3.3.5", "b0fd6714753da5ad51e686ddbdb4cf4a4480f43dd6df311c7e2da4359df1960f", [:mix], [], "hexpm", "503bf4f557057efb35433893ffdb8da2be4643b2be161e8721a0daec9825b600"}, "nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"}, "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, diff --git a/test/demuxing_engine_mpegts_test.exs b/test/demuxing_engine_mpegts_test.exs index db73d07..50c2181 100644 --- a/test/demuxing_engine_mpegts_test.exs +++ b/test/demuxing_engine_mpegts_test.exs @@ -1,8 +1,5 @@ defmodule ExHLS.DemuxingEngine.MPEGTS.Test do use ExUnit.Case, async: false - import Mock - - alias MPEG.TS.Demuxer test "handling timestamp rollovers" do timestamp_range = (2 ** 33 * 1_000_000_000) |> div(90_000) @@ -16,52 +13,41 @@ defmodule ExHLS.DemuxingEngine.MPEGTS.Test do %{ og_timestamp: og_timestamp, - pts: rolled_timestamp, - dts: rolled_timestamp, - data: <<>>, - discontinuity: false, - is_aligned: true + payload: %{ + pts: rolled_timestamp, + dts: rolled_timestamp, + data: <<>>, + discontinuity: false, + is_aligned: true + } } end) - demuxer = %{ - waiting_random_access_indicator: nil, - packet_buffers: %{1 => packets, 2 => packets}, - pmt: %{streams: %{}} - } - - new = fn -> demuxer end - - take = fn demuxer, track_id -> - demuxer - |> get_and_update_in( - [:packet_buffers, track_id], - fn [head | tail] -> {[head], tail} end - ) - end + demuxing_engine = ExHLS.DemuxingEngine.MPEGTS.new(0) - with_mock Demuxer, new: new, take: take do - demuxing_engine = ExHLS.DemuxingEngine.MPEGTS.new(0) - - [1, 2] - |> Enum.reduce(demuxing_engine, fn track_id, demuxing_engine -> - {chunks, demuxing_engine} = - 1..200 - |> Enum.map_reduce(demuxing_engine, fn _i, demuxing_engine -> - {:ok, chunk, demuxing_engine} = - ExHLS.DemuxingEngine.MPEGTS.pop_chunk(demuxing_engine, track_id) + demuxing_engine = %{ + demuxing_engine + | packets_map: %{1 => Qex.new(packets), 2 => Qex.new(packets)} + } - {chunk, demuxing_engine} - end) + [1, 2] + |> Enum.reduce(demuxing_engine, fn track_id, demuxing_engine -> + {chunks, demuxing_engine} = + 1..200 + |> Enum.map_reduce(demuxing_engine, fn _i, demuxing_engine -> + {:ok, chunk, demuxing_engine} = + ExHLS.DemuxingEngine.MPEGTS.pop_chunk(demuxing_engine, track_id) - Enum.zip(chunks, packets) - |> Enum.each(fn {chunk, packet} -> - assert chunk.pts_ms == div(packet.og_timestamp, 1_000_000) - assert chunk.dts_ms == div(packet.og_timestamp, 1_000_000) + {chunk, demuxing_engine} end) - demuxing_engine + Enum.zip(chunks, packets) + |> Enum.each(fn {chunk, packet} -> + assert chunk.pts_ms == div(packet.og_timestamp, 1_000_000) + assert chunk.dts_ms == div(packet.og_timestamp, 1_000_000) end) - end + + demuxing_engine + end) end end