Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
519c5fc
Bump ex_m3u8 to v0.15.4. Properly handle absolute URIs for variant pl…
varsill Oct 8, 2025
103380c
Merge branch 'handle-weird-segment-extensions' into varsill/handle_ab…
varsill Oct 8, 2025
cd15788
Add segment_format option to the client to override segments demuxer …
varsill Oct 8, 2025
82cc8c2
Improve warning message
varsill Oct 8, 2025
ac7cf92
Pass segment_format to reader
varsill Oct 8, 2025
b66a899
Start playing with the last segment
varsill Oct 8, 2025
5cebfc0
Set proper segment number
varsill Oct 8, 2025
4a645f3
Filter out non-segments
varsill Oct 8, 2025
cd72f0d
Handle TDEN
varsill Oct 14, 2025
894ee48
Synchronize ID3 tag with stream
varsill Oct 16, 2025
efc0a92
Fix bug with id3 generation
varsill Oct 27, 2025
d914feb
Refactor TDEN specific code. Update dependency to mpeg_ts to fix buf …
varsill Nov 3, 2025
4c785ff
Format the code
varsill Nov 5, 2025
182469e
Fix credo warnings
varsill Nov 5, 2025
02673d7
Merge branch 'master' into varsill/ultra_low_latency
varsill Nov 6, 2025
3ba5575
Remove h264 parser commited by accident
varsill Nov 6, 2025
7ddf388
Add ultra_low_latency? option to the client
varsill Nov 6, 2025
2d1b92c
Improve option description
varsill Nov 6, 2025
6021a5c
Add test of the ultra low latency mode. Improve description. Make sur…
varsill Nov 6, 2025
c436d26
Improve option description
varsill Nov 6, 2025
9744b31
Fix credo warning
varsill Nov 6, 2025
da157a0
Merge branch 'varsill/ultra_low_latency' into varsill/support_tden
varsill Nov 6, 2025
b573115
Format the code
varsill Nov 6, 2025
9fe0311
Update dependency to mpeg_ts v3
varsill Nov 6, 2025
87aa082
Update lib/ex_hls/demuxing_engine/mpeg_ts.ex
varsill Nov 7, 2025
1a3a369
Adjust MPEG-TS demuxing engine to api of kim_mpeg_ts v3
varsill Nov 13, 2025
66437b7
Fix mpeg_ts demuxer version to v2. Update tests
varsill Nov 13, 2025
f56a314
Use withl
varsill Nov 13, 2025
da1b54e
improve formatting
varsill Nov 13, 2025
c2a37a3
Make sure encoding is equal to 3
varsill Nov 14, 2025
00525f5
Add size specifier
varsill Nov 14, 2025
33817ba
Remove tag
varsill Nov 14, 2025
7ffa36e
Merge branch 'master' into varsill/support_tden
varsill Nov 17, 2025
0529892
Remove redundant should_start_playing clause
varsill Nov 17, 2025
a2d09cc
Merge branch 'varsill/support_tden' into varsill/adjust_to_mpeg_ts_v3
varsill Nov 17, 2025
f8d5067
Fix demuxer test
varsill Nov 17, 2025
fc3160f
Remove unused mock dependency
varsill Nov 17, 2025
3211a4b
Refacotr
varsill Nov 17, 2025
88d3264
Properly read Id3 track
varsill Nov 18, 2025
853248a
Update dependency to kim_mpeg_ts
varsill Nov 18, 2025
c16156e
Format the code
varsill Nov 18, 2025
2ec02c9
Remove sometag
varsill Nov 18, 2025
8c48722
Bump dependency to mpeg_ts v3.3.5
varsill Nov 18, 2025
ca855f3
Move take_packets/2 under public functions
varsill Nov 19, 2025
8965ae2
Apply reviewers suggestions - use Qex
varsill Nov 19, 2025
79d542a
Merge branch 'master' into release/v0.2.0
varsill Nov 19, 2025
bab0bd7
Bump version to v0.2.0
varsill Nov 19, 2025
fa80bc5
Improve DemuxingEngine.MPEGTS.t() typespec
varsill Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The package can be installed by adding `ex_hls` to your list of dependencies in
```elixir
def deps do
[
{:ex_hls, "~> 0.1.5"}
{:ex_hls, "~> 0.2.0"}
]
end
```
Expand Down Expand Up @@ -55,7 +55,7 @@ Now you can get the Elixir stream containing media chunks:
```elixir
stream = ExHLS.Client.generate_stream(client)
Enum.take(stream, 5)
# Returns:
# Returns:
# [
# %ExHLS.Chunk{
# payload: <<220, 0, 76, 97, 118, 99, 54, 49, 46, 51, 46, 49, 48, 48, 0, 66,
Expand Down
106 changes: 59 additions & 47 deletions lib/ex_hls/demuxing_engine/mpeg_ts.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do
alias Membrane.{AAC, H264, RemoteStream}
alias MPEG.TS.Demuxer

@enforce_keys [:demuxer, :last_tden_tag]
defstruct @enforce_keys ++ [track_timestamps_data: %{}]
@enforce_keys [:demuxer]
defstruct @enforce_keys ++ [track_timestamps_data: %{}, last_tden_tag: nil, packets_map: %{}]

# using it a boundary expressed in nanoseconds, instead of the usual 90kHz clock ticks,
# generates up to 1/10th of ms error per 26.5 hours of stream which is acceptable in
Expand All @@ -19,7 +19,8 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do

@type t :: %__MODULE__{
demuxer: Demuxer.t(),
last_tden_tag: String.t() | nil
last_tden_tag: String.t() | nil,
packets_map: %{(track_id :: non_neg_integer()) => Qex.t(MPEG.TS.Demuxer.Container.t())}
}

@impl true
Expand All @@ -28,31 +29,31 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do
# different demuxing engines
def new(_timestamp_offset_ms) do
demuxer = Demuxer.new()

# we need to explicitly override that `waiting_random_access_indicator` as otherwise Demuxer
# discards all the input data
# TODO - figure out how to do it properly
demuxer = %{demuxer | waiting_random_access_indicator: false}

%__MODULE__{demuxer: demuxer, last_tden_tag: nil}
%__MODULE__{demuxer: demuxer}
end

@impl true
def feed!(%__MODULE__{} = demuxing_engine, binary) do
demuxing_engine
|> Map.update!(:demuxer, &Demuxer.push_buffer(&1, binary))
{new_packets, demuxer} = Demuxer.demux(demuxing_engine.demuxer, binary)

packets_map =
Enum.reduce(new_packets, demuxing_engine.packets_map, fn new_packet, packets_map ->
Map.update(packets_map, new_packet.pid, Qex.new([new_packet]), &Qex.push(&1, new_packet))
end)

%{demuxing_engine | demuxer: demuxer, packets_map: packets_map}
end

@impl true
def get_tracks_info(%__MODULE__{} = demuxing_engine) do
with %{streams: streams} <- demuxing_engine.demuxer.pmt do
with %{streams: streams} <- demuxing_engine.demuxer do
tracks_info =
streams
|> Enum.flat_map(fn
{id, %{stream_type: :AAC}} ->
{id, %{stream_type: :AAC_ADTS}} ->
[{id, %RemoteStream{content_format: AAC}}]

{id, %{stream_type: :H264}} ->
{id, %{stream_type: :H264_AVC}} ->
[{id, %RemoteStream{content_format: H264}}]

{id, unsupported_stream_info} ->
Expand Down Expand Up @@ -80,47 +81,56 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do

@impl true
def pop_chunk(%__MODULE__{} = demuxing_engine, track_id) do
with {[packet], demuxer} <- Demuxer.take(demuxing_engine.demuxer, track_id) do
{maybe_tden_tag, demuxer} = maybe_read_tden_tag(demuxer, packet.pts)
tden_tag = maybe_tden_tag || demuxing_engine.last_tden_tag
with {[packet], demuxing_engine} <- take_packets(demuxing_engine, track_id) do
demuxing_engine = maybe_read_tden_tag(demuxing_engine, packet.payload.pts)

{demuxing_engine, packet} =
%{demuxing_engine | demuxer: demuxer, last_tden_tag: tden_tag}
|> handle_possible_timestamps_rollover(track_id, packet)
demuxing_engine |> handle_possible_timestamps_rollover(track_id, packet)

chunk = %ExHLS.Chunk{
payload: packet.data,
pts_ms: packet.pts |> packet_ts_to_millis(),
dts_ms: packet.dts |> packet_ts_to_millis(),
payload: packet.payload.data,
pts_ms: packet.payload.pts |> packet_ts_to_millis(),
dts_ms: packet.payload.dts |> packet_ts_to_millis(),
track_id: track_id,
metadata: %{
discontinuity: packet.discontinuity,
is_aligned: packet.is_aligned,
tden_tag: tden_tag
discontinuity: packet.payload.discontinuity,
is_aligned: packet.payload.is_aligned,
tden_tag: demuxing_engine.last_tden_tag
}
}

{:ok, chunk, demuxing_engine}
else
{[], demuxer} ->
{:error, :empty_track_data, %{demuxing_engine | demuxer: demuxer}}
{[], demuxing_engine} ->
{:error, :empty_track_data, demuxing_engine}
end
end

defp maybe_read_tden_tag(demuxer, packet_pts) do
defp take_packets(demuxing_engine, track_id) do
with {:ok, packets} <- Map.fetch(demuxing_engine.packets_map, track_id),
{{:value, packet}, rest} <- Qex.pop(packets) do
demuxing_engine = put_in(demuxing_engine.packets_map[track_id], rest)
{[packet], demuxing_engine}
else
_other -> {[], demuxing_engine}
end
end

defp maybe_read_tden_tag(demuxing_engine, packet_pts) do
withl no_id3_stream:
{id3_track_id, _stream_description} <-
demuxer.pmt.streams
demuxing_engine.demuxer.streams
|> Enum.find(fn {_pid, stream_description} ->
stream_description.stream_type == :METADATA_IN_PES
end),
no_id3_data: {[id3], demuxer} <- Demuxer.take(demuxer, id3_track_id),
id3_not_in_timerange: true <- id3.pts <= packet_pts do
{parse_tden_tag(id3.data), demuxer}
no_id3_data: {[id3], demuxing_engine} <- take_packets(demuxing_engine, id3_track_id),
id3_not_in_timerange: true <- id3.payload.pts <= packet_pts do
tden_tag = parse_tden_tag(id3.payload.data) || demuxing_engine.last_tden_tag
%{demuxing_engine | last_tden_tag: tden_tag}
else
no_id3_stream: nil -> {nil, demuxer}
no_id3_data: {[], updated_demuxer} -> {nil, updated_demuxer}
id3_not_in_timerange: false -> {nil, demuxer}
no_id3_stream: nil -> demuxing_engine
no_id3_data: {[], updated_demuxing_engine} -> updated_demuxing_engine
id3_not_in_timerange: false -> demuxing_engine
end
end

Expand All @@ -143,7 +153,7 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do

@impl true
def end_stream(%__MODULE__{} = demuxing_engine) do
demuxer = Demuxer.end_of_stream(demuxing_engine.demuxer)
{_flushed, demuxer} = Demuxer.flush(demuxing_engine.demuxer)
%{demuxing_engine | demuxer: demuxer}
end

Expand All @@ -155,32 +165,34 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do

rollovers_offset = rollovers_count * @timestamp_range_size_ns

packet =
packet
payload =
packet.payload
|> Map.update!(:pts, &add_offset_if_not_nil(&1, rollovers_offset))
|> Map.update!(:dts, &add_offset_if_not_nil(&1, rollovers_offset))

{demuxing_engine, packet} =
{demuxing_engine, payload} =
with last_ts when last_ts != nil <- last_dts || last_pts,
true <- last_ts > (packet.dts || packet.pts) do
true <- last_ts > (payload.dts || payload.pts) do
demuxing_engine =
demuxing_engine
|> update_in([:track_timestamps_data, track_id, :rollovers_count], &(&1 + 1))

packet =
packet
payload =
payload
|> Map.update!(:pts, &add_offset_if_not_nil(&1, @timestamp_range_size_ns))
|> Map.update!(:dts, &add_offset_if_not_nil(&1, @timestamp_range_size_ns))

{demuxing_engine, packet}
{demuxing_engine, payload}
else
_other -> {demuxing_engine, packet}
_other -> {demuxing_engine, payload}
end

demuxing_engine =
demuxing_engine
|> put_in([:track_timestamps_data, track_id, :last_pts], packet.pts)
|> put_in([:track_timestamps_data, track_id, :last_dts], packet.dts)
|> put_in([:track_timestamps_data, track_id, :last_pts], payload.pts)
|> put_in([:track_timestamps_data, track_id, :last_dts], payload.dts)

packet = %{packet | payload: payload}

{demuxing_engine, packet}
end
Expand Down
9 changes: 3 additions & 6 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule ExHLS.Mixfile do
use Mix.Project

@version "0.1.5"
@version "0.2.0"
@github_url "https://github.com/membraneframework/ex_hls"

def project do
Expand Down Expand Up @@ -43,13 +43,10 @@ defmodule ExHLS.Mixfile do
{:bunch, "~> 1.6"},
{:membrane_mp4_plugin, "~> 0.36.0"},
{:membrane_h26x_plugin, "~> 0.10.2"},
{:mpeg_ts,
github: "membraneframework-labs/kim_mpeg_ts",
branch: "varsill/fix_pes_optional_header_resolving"},
{:mpeg_ts, "~> 3.3.5"},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
{:dialyxir, ">= 0.0.0", only: :dev, runtime: false},
{:credo, ">= 0.0.0", only: :dev, runtime: false},
{:mock, "~> 0.3.0", only: :test}
{:credo, ">= 0.0.0", only: :dev, runtime: false}
]
end

Expand Down
4 changes: 1 addition & 3 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
"makeup": {:hex, :makeup, "1.2.1", "e90ac1c65589ef354378def3ba19d401e739ee7ee06fb47f94c687016e3713d1", [:mix], [{:nimble_parsec, "~> 1.4", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "d36484867b0bae0fea568d10131197a4c2e47056a6fbe84922bf6ba71c8d17ce"},
"makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"},
"makeup_erlang": {:hex, :makeup_erlang, "1.0.2", "03e1804074b3aa64d5fad7aa64601ed0fb395337b982d9bcf04029d68d51b6a7", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "af33ff7ef368d5893e4a267933e7744e46ce3cf1f61e2dccf53a111ed3aa3727"},
"meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"},
"membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"},
"membrane_cmaf_format": {:hex, :membrane_cmaf_format, "0.7.1", "9ea858faefdcb181cdfa8001be827c35c5f854e9809ad57d7062cff1f0f703fd", [:mix], [], "hexpm", "3c7b4ed2a986e27f6f336d2f19e9442cb31d93b3142fc024c019572faca54a73"},
"membrane_core": {:hex, :membrane_core, "1.2.4", "3f9fc78cef29b69acadd4f959c8ec23cbb1544c26c8e8474589b143ada9a0da2", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0 or ~> 4.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "ec7a77b7ab457267c0243338383365f6ef5ace2686ddc129939e502a58eba546"},
Expand All @@ -32,8 +31,7 @@
"membrane_timestamp_queue": {:hex, :membrane_timestamp_queue, "0.2.2", "1c831b2273d018a6548654aa9f7fa7c4b683f71d96ffe164934ef55f9d11f693", [:mix], [{:heap, "~> 2.0", [hex: :heap, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "7c830e760baaced0988421671cd2c83c7cda8d1bd2b61fd05332711675d1204f"},
"mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"},
"mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"},
"mpeg_ts": {:git, "https://github.com/membraneframework-labs/kim_mpeg_ts.git", "c8c770e0e7714c72b3faa7f20088fdbd76f5bade", [branch: "varsill/fix_pes_optional_header_resolving"]},
"mock": {:hex, :mock, "0.3.9", "10e44ad1f5962480c5c9b9fa779c6c63de9bd31997c8e04a853ec990a9d841af", [:mix], [{:meck, "~> 0.9.2", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "9e1b244c4ca2551bb17bb8415eed89e40ee1308e0fbaed0a4fdfe3ec8a4adbd3"},
"mpeg_ts": {:hex, :mpeg_ts, "3.3.5", "b0fd6714753da5ad51e686ddbdb4cf4a4480f43dd6df311c7e2da4359df1960f", [:mix], [], "hexpm", "503bf4f557057efb35433893ffdb8da2be4643b2be161e8721a0daec9825b600"},
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
Expand Down
68 changes: 27 additions & 41 deletions test/demuxing_engine_mpegts_test.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
defmodule ExHLS.DemuxingEngine.MPEGTS.Test do
use ExUnit.Case, async: false
import Mock

alias MPEG.TS.Demuxer

test "handling timestamp rollovers" do
timestamp_range = (2 ** 33 * 1_000_000_000) |> div(90_000)
Expand All @@ -16,52 +13,41 @@ defmodule ExHLS.DemuxingEngine.MPEGTS.Test do

%{
og_timestamp: og_timestamp,
pts: rolled_timestamp,
dts: rolled_timestamp,
data: <<>>,
discontinuity: false,
is_aligned: true
payload: %{
pts: rolled_timestamp,
dts: rolled_timestamp,
data: <<>>,
discontinuity: false,
is_aligned: true
}
}
end)

demuxer = %{
waiting_random_access_indicator: nil,
packet_buffers: %{1 => packets, 2 => packets},
pmt: %{streams: %{}}
}

new = fn -> demuxer end

take = fn demuxer, track_id ->
demuxer
|> get_and_update_in(
[:packet_buffers, track_id],
fn [head | tail] -> {[head], tail} end
)
end
demuxing_engine = ExHLS.DemuxingEngine.MPEGTS.new(0)

with_mock Demuxer, new: new, take: take do
demuxing_engine = ExHLS.DemuxingEngine.MPEGTS.new(0)

[1, 2]
|> Enum.reduce(demuxing_engine, fn track_id, demuxing_engine ->
{chunks, demuxing_engine} =
1..200
|> Enum.map_reduce(demuxing_engine, fn _i, demuxing_engine ->
{:ok, chunk, demuxing_engine} =
ExHLS.DemuxingEngine.MPEGTS.pop_chunk(demuxing_engine, track_id)
demuxing_engine = %{
demuxing_engine
| packets_map: %{1 => Qex.new(packets), 2 => Qex.new(packets)}
}

{chunk, demuxing_engine}
end)
[1, 2]
|> Enum.reduce(demuxing_engine, fn track_id, demuxing_engine ->
{chunks, demuxing_engine} =
1..200
|> Enum.map_reduce(demuxing_engine, fn _i, demuxing_engine ->
{:ok, chunk, demuxing_engine} =
ExHLS.DemuxingEngine.MPEGTS.pop_chunk(demuxing_engine, track_id)

Enum.zip(chunks, packets)
|> Enum.each(fn {chunk, packet} ->
assert chunk.pts_ms == div(packet.og_timestamp, 1_000_000)
assert chunk.dts_ms == div(packet.og_timestamp, 1_000_000)
{chunk, demuxing_engine}
end)

demuxing_engine
Enum.zip(chunks, packets)
|> Enum.each(fn {chunk, packet} ->
assert chunk.pts_ms == div(packet.og_timestamp, 1_000_000)
assert chunk.dts_ms == div(packet.og_timestamp, 1_000_000)
end)
end

demuxing_engine
end)
end
end