Skip to content
106 changes: 59 additions & 47 deletions lib/ex_hls/demuxing_engine/mpeg_ts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do
alias Membrane.{AAC, H264, RemoteStream}
alias MPEG.TS.Demuxer

@enforce_keys [:demuxer, :last_tden_tag]
defstruct @enforce_keys ++ [track_timestamps_data: %{}]
@enforce_keys [:demuxer]
defstruct @enforce_keys ++ [track_timestamps_data: %{}, last_tden_tag: nil, packets_map: %{}]

# using it a boundary expressed in nanoseconds, instead of the usual 90kHz clock ticks,
# generates up to 1/10th of ms error per 26.5 hours of stream which is acceptable in
Expand All @@ -19,7 +19,8 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do

@type t :: %__MODULE__{
demuxer: Demuxer.t(),
last_tden_tag: String.t() | nil
last_tden_tag: String.t() | nil,
packets_map: %{(track_id :: non_neg_integer()) => MPEG.TS.Demuxer.Container.t()}
}

@impl true
Expand All @@ -28,31 +29,31 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do
# different demuxing engines
def new(_timestamp_offset_ms) do
demuxer = Demuxer.new()

# we need to explicitly override that `waiting_random_access_indicator` as otherwise Demuxer
# discards all the input data
# TODO - figure out how to do it properly
demuxer = %{demuxer | waiting_random_access_indicator: false}

%__MODULE__{demuxer: demuxer, last_tden_tag: nil}
%__MODULE__{demuxer: demuxer}
end

@impl true
def feed!(%__MODULE__{} = demuxing_engine, binary) do
demuxing_engine
|> Map.update!(:demuxer, &Demuxer.push_buffer(&1, binary))
{new_packets, demuxer} = Demuxer.demux(demuxing_engine.demuxer, binary)

packets_map =
Enum.reduce(new_packets, demuxing_engine.packets_map, fn new_packet, packets_map ->
Map.update(packets_map, new_packet.pid, Qex.new([new_packet]), &Qex.push(&1, new_packet))
end)

%{demuxing_engine | demuxer: demuxer, packets_map: packets_map}
end

@impl true
def get_tracks_info(%__MODULE__{} = demuxing_engine) do
with %{streams: streams} <- demuxing_engine.demuxer.pmt do
with %{streams: streams} <- demuxing_engine.demuxer do
tracks_info =
streams
|> Enum.flat_map(fn
{id, %{stream_type: :AAC}} ->
{id, %{stream_type: :AAC_ADTS}} ->
[{id, %RemoteStream{content_format: AAC}}]

{id, %{stream_type: :H264}} ->
{id, %{stream_type: :H264_AVC}} ->
[{id, %RemoteStream{content_format: H264}}]

{id, unsupported_stream_info} ->
Expand Down Expand Up @@ -80,47 +81,56 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do

@impl true
def pop_chunk(%__MODULE__{} = demuxing_engine, track_id) do
with {[packet], demuxer} <- Demuxer.take(demuxing_engine.demuxer, track_id) do
{maybe_tden_tag, demuxer} = maybe_read_tden_tag(demuxer, packet.pts)
tden_tag = maybe_tden_tag || demuxing_engine.last_tden_tag
with {[packet], demuxing_engine} <- take_packets(demuxing_engine, track_id) do
demuxing_engine = maybe_read_tden_tag(demuxing_engine, packet.payload.pts)

{demuxing_engine, packet} =
%{demuxing_engine | demuxer: demuxer, last_tden_tag: tden_tag}
|> handle_possible_timestamps_rollover(track_id, packet)
demuxing_engine |> handle_possible_timestamps_rollover(track_id, packet)

chunk = %ExHLS.Chunk{
payload: packet.data,
pts_ms: packet.pts |> packet_ts_to_millis(),
dts_ms: packet.dts |> packet_ts_to_millis(),
payload: packet.payload.data,
pts_ms: packet.payload.pts |> packet_ts_to_millis(),
dts_ms: packet.payload.dts |> packet_ts_to_millis(),
track_id: track_id,
metadata: %{
discontinuity: packet.discontinuity,
is_aligned: packet.is_aligned,
tden_tag: tden_tag
discontinuity: packet.payload.discontinuity,
is_aligned: packet.payload.is_aligned,
tden_tag: demuxing_engine.last_tden_tag
}
}

{:ok, chunk, demuxing_engine}
else
{[], demuxer} ->
{:error, :empty_track_data, %{demuxing_engine | demuxer: demuxer}}
{[], demuxing_engine} ->
{:error, :empty_track_data, demuxing_engine}
end
end

defp maybe_read_tden_tag(demuxer, packet_pts) do
defp take_packets(demuxing_engine, track_id) do
with {:ok, packets} <- Map.fetch(demuxing_engine.packets_map, track_id),
{{:value, packet}, rest} <- Qex.pop(packets) do
demuxing_engine = put_in(demuxing_engine.packets_map[track_id], rest)
{[packet], demuxing_engine}
else
_other -> {[], demuxing_engine}
Comment on lines +110 to +115
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<3

end
end

defp maybe_read_tden_tag(demuxing_engine, packet_pts) do
withl no_id3_stream:
{id3_track_id, _stream_description} <-
demuxer.pmt.streams
demuxing_engine.demuxer.streams
|> Enum.find(fn {_pid, stream_description} ->
stream_description.stream_type == :METADATA_IN_PES
end),
no_id3_data: {[id3], demuxer} <- Demuxer.take(demuxer, id3_track_id),
id3_not_in_timerange: true <- id3.pts <= packet_pts do
{parse_tden_tag(id3.data), demuxer}
no_id3_data: {[id3], demuxing_engine} <- take_packets(demuxing_engine, id3_track_id),
id3_not_in_timerange: true <- id3.payload.pts <= packet_pts do
tden_tag = parse_tden_tag(id3.payload.data) || demuxing_engine.last_tden_tag
%{demuxing_engine | last_tden_tag: tden_tag}
else
no_id3_stream: nil -> {nil, demuxer}
no_id3_data: {[], updated_demuxer} -> {nil, updated_demuxer}
id3_not_in_timerange: false -> {nil, demuxer}
no_id3_stream: nil -> demuxing_engine
no_id3_data: {[], updated_demuxing_engine} -> updated_demuxing_engine
id3_not_in_timerange: false -> demuxing_engine
end
end

Expand All @@ -143,7 +153,7 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do

@impl true
def end_stream(%__MODULE__{} = demuxing_engine) do
demuxer = Demuxer.end_of_stream(demuxing_engine.demuxer)
{_flushed, demuxer} = Demuxer.flush(demuxing_engine.demuxer)
%{demuxing_engine | demuxer: demuxer}
end

Expand All @@ -155,32 +165,34 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do

rollovers_offset = rollovers_count * @timestamp_range_size_ns

packet =
packet
payload =
packet.payload
|> Map.update!(:pts, &add_offset_if_not_nil(&1, rollovers_offset))
|> Map.update!(:dts, &add_offset_if_not_nil(&1, rollovers_offset))

{demuxing_engine, packet} =
{demuxing_engine, payload} =
with last_ts when last_ts != nil <- last_dts || last_pts,
true <- last_ts > (packet.dts || packet.pts) do
true <- last_ts > (payload.dts || payload.pts) do
demuxing_engine =
demuxing_engine
|> update_in([:track_timestamps_data, track_id, :rollovers_count], &(&1 + 1))

packet =
packet
payload =
payload
|> Map.update!(:pts, &add_offset_if_not_nil(&1, @timestamp_range_size_ns))
|> Map.update!(:dts, &add_offset_if_not_nil(&1, @timestamp_range_size_ns))

{demuxing_engine, packet}
{demuxing_engine, payload}
else
_other -> {demuxing_engine, packet}
_other -> {demuxing_engine, payload}
end

demuxing_engine =
demuxing_engine
|> put_in([:track_timestamps_data, track_id, :last_pts], packet.pts)
|> put_in([:track_timestamps_data, track_id, :last_dts], packet.dts)
|> put_in([:track_timestamps_data, track_id, :last_pts], payload.pts)
|> put_in([:track_timestamps_data, track_id, :last_dts], payload.dts)

packet = %{packet | payload: payload}

{demuxing_engine, packet}
end
Expand Down
7 changes: 2 additions & 5 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,10 @@ defmodule ExHLS.Mixfile do
{:bunch, "~> 1.6"},
{:membrane_mp4_plugin, "~> 0.36.0"},
{:membrane_h26x_plugin, "~> 0.10.2"},
{:mpeg_ts,
github: "membraneframework-labs/kim_mpeg_ts",
branch: "varsill/fix_pes_optional_header_resolving"},
{:mpeg_ts, "~> 3.3.5"},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
{:dialyxir, ">= 0.0.0", only: :dev, runtime: false},
{:credo, ">= 0.0.0", only: :dev, runtime: false},
{:mock, "~> 0.3.0", only: :test}
{:credo, ">= 0.0.0", only: :dev, runtime: false}
]
end

Expand Down
4 changes: 1 addition & 3 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
"makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"},
"makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"},
"makeup_erlang": {:hex, :makeup_erlang, "1.0.2", "03e1804074b3aa64d5fad7aa64601ed0fb395337b982d9bcf04029d68d51b6a7", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "af33ff7ef368d5893e4a267933e7744e46ce3cf1f61e2dccf53a111ed3aa3727"},
"meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"},
"membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"},
"membrane_cmaf_format": {:hex, :membrane_cmaf_format, "0.7.1", "9ea858faefdcb181cdfa8001be827c35c5f854e9809ad57d7062cff1f0f703fd", [:mix], [], "hexpm", "3c7b4ed2a986e27f6f336d2f19e9442cb31d93b3142fc024c019572faca54a73"},
"membrane_core": {:hex, :membrane_core, "1.2.4", "3f9fc78cef29b69acadd4f959c8ec23cbb1544c26c8e8474589b143ada9a0da2", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ec7a77b7ab457267c0243338383365f6ef5ace2686ddc129939e502a58eba546"},
Expand All @@ -32,8 +31,7 @@
"membrane_timestamp_queue": {:hex, :membrane_timestamp_queue, "0.2.2", "1c831b2273d018a6548654aa9f7fa7c4b683f71d96ffe164934ef55f9d11f693", [:mix], [{:heap, "~> 2.0", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c830e760baaced0988421671cd2c83c7cda8d1bd2b61fd05332711675d1204f"},
"mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"},
"mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"},
"mpeg_ts": {:git, "https://github.com/membraneframework-labs/kim_mpeg_ts.git", "c8c770e0e7714c72b3faa7f20088fdbd76f5bade", [branch: "varsill/fix_pes_optional_header_resolving"]},
"mock": {:hex, :mock, "0.3.9", "10e44ad1f5962480c5c9b9fa779c6c63de9bd31997c8e04a853ec990a9d841af", [:mix], [{:meck, "~> 0.9.2", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "9e1b244c4ca2551bb17bb8415eed89e40ee1308e0fbaed0a4fdfe3ec8a4adbd3"},
"mpeg_ts": {:hex, :mpeg_ts, "3.3.5", "b0fd6714753da5ad51e686ddbdb4cf4a4480f43dd6df311c7e2da4359df1960f", [:mix], [], "hexpm", "503bf4f557057efb35433893ffdb8da2be4643b2be161e8721a0daec9825b600"},
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
Expand Down
68 changes: 27 additions & 41 deletions test/demuxing_engine_mpegts_test.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
defmodule ExHLS.DemuxingEngine.MPEGTS.Test do
use ExUnit.Case, async: false
import Mock

alias MPEG.TS.Demuxer

test "handling timestamp rollovers" do
timestamp_range = (2 ** 33 * 1_000_000_000) |> div(90_000)
Expand All @@ -16,52 +13,41 @@ defmodule ExHLS.DemuxingEngine.MPEGTS.Test do

%{
og_timestamp: og_timestamp,
pts: rolled_timestamp,
dts: rolled_timestamp,
data: <<>>,
discontinuity: false,
is_aligned: true
payload: %{
pts: rolled_timestamp,
dts: rolled_timestamp,
data: <<>>,
discontinuity: false,
is_aligned: true
}
}
end)

demuxer = %{
waiting_random_access_indicator: nil,
packet_buffers: %{1 => packets, 2 => packets},
pmt: %{streams: %{}}
}

new = fn -> demuxer end

take = fn demuxer, track_id ->
demuxer
|> get_and_update_in(
[:packet_buffers, track_id],
fn [head | tail] -> {[head], tail} end
)
end
demuxing_engine = ExHLS.DemuxingEngine.MPEGTS.new(0)

with_mock Demuxer, new: new, take: take do
demuxing_engine = ExHLS.DemuxingEngine.MPEGTS.new(0)

[1, 2]
|> Enum.reduce(demuxing_engine, fn track_id, demuxing_engine ->
{chunks, demuxing_engine} =
1..200
|> Enum.map_reduce(demuxing_engine, fn _i, demuxing_engine ->
{:ok, chunk, demuxing_engine} =
ExHLS.DemuxingEngine.MPEGTS.pop_chunk(demuxing_engine, track_id)
demuxing_engine = %{
demuxing_engine
| packets_map: %{1 => Qex.new(packets), 2 => Qex.new(packets)}
}

{chunk, demuxing_engine}
end)
[1, 2]
|> Enum.reduce(demuxing_engine, fn track_id, demuxing_engine ->
{chunks, demuxing_engine} =
1..200
|> Enum.map_reduce(demuxing_engine, fn _i, demuxing_engine ->
{:ok, chunk, demuxing_engine} =
ExHLS.DemuxingEngine.MPEGTS.pop_chunk(demuxing_engine, track_id)

Enum.zip(chunks, packets)
|> Enum.each(fn {chunk, packet} ->
assert chunk.pts_ms == div(packet.og_timestamp, 1_000_000)
assert chunk.dts_ms == div(packet.og_timestamp, 1_000_000)
{chunk, demuxing_engine}
end)

demuxing_engine
Enum.zip(chunks, packets)
|> Enum.each(fn {chunk, packet} ->
assert chunk.pts_ms == div(packet.og_timestamp, 1_000_000)
assert chunk.dts_ms == div(packet.og_timestamp, 1_000_000)
end)
end

demuxing_engine
end)
end
end