-
Notifications
You must be signed in to change notification settings - Fork 0
Adjust to mpeg_ts v3 API #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
9fe0311
1a3a369
a2d09cc
f8d5067
fc3160f
3211a4b
88d3264
853248a
c16156e
2ec02c9
8c48722
ca855f3
8965ae2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,8 +9,8 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do | |
| alias Membrane.{AAC, H264, RemoteStream} | ||
| alias MPEG.TS.Demuxer | ||
|
|
||
| @enforce_keys [:demuxer, :last_tden_tag] | ||
| defstruct @enforce_keys ++ [track_timestamps_data: %{}] | ||
| @enforce_keys [:demuxer] | ||
| defstruct @enforce_keys ++ [track_timestamps_data: %{}, last_tden_tag: nil, packets_map: %{}] | ||
|
|
||
| # using it a boundary expressed in nanoseconds, instead of the usual 90kHz clock ticks, | ||
| # generates up to 1/10th of ms error per 26.5 hours of stream which is acceptable in | ||
|
|
@@ -19,7 +19,8 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do | |
|
|
||
| @type t :: %__MODULE__{ | ||
| demuxer: Demuxer.t(), | ||
| last_tden_tag: String.t() | nil | ||
| last_tden_tag: String.t() | nil, | ||
| packets_map: %{(track_id :: non_neg_integer()) => MPEG.TS.Demuxer.Container.t()} | ||
| } | ||
|
|
||
| @impl true | ||
|
|
@@ -28,31 +29,31 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do | |
| # different demuxing engines | ||
| def new(_timestamp_offset_ms) do | ||
| demuxer = Demuxer.new() | ||
|
|
||
| # we need to explicitly override that `waiting_random_access_indicator` as otherwise Demuxer | ||
| # discards all the input data | ||
| # TODO - figure out how to do it properly | ||
| demuxer = %{demuxer | waiting_random_access_indicator: false} | ||
|
|
||
| %__MODULE__{demuxer: demuxer, last_tden_tag: nil} | ||
| %__MODULE__{demuxer: demuxer} | ||
| end | ||
|
|
||
| @impl true | ||
| def feed!(%__MODULE__{} = demuxing_engine, binary) do | ||
| demuxing_engine | ||
| |> Map.update!(:demuxer, &Demuxer.push_buffer(&1, binary)) | ||
| {new_packets, demuxer} = Demuxer.demux(demuxing_engine.demuxer, binary) | ||
|
|
||
| packets_map = | ||
| Enum.reduce(new_packets, demuxing_engine.packets_map, fn new_packet, packets_map -> | ||
| Map.update(packets_map, new_packet.pid, [new_packet], &(&1 ++ [new_packet])) | ||
| end) | ||
|
|
||
| %{demuxing_engine | demuxer: demuxer, packets_map: packets_map} | ||
| end | ||
|
|
||
| @impl true | ||
| def get_tracks_info(%__MODULE__{} = demuxing_engine) do | ||
| with %{streams: streams} <- demuxing_engine.demuxer.pmt do | ||
| with %{streams: streams} <- demuxing_engine.demuxer do | ||
| tracks_info = | ||
| streams | ||
| |> Enum.flat_map(fn | ||
| {id, %{stream_type: :AAC}} -> | ||
| {id, %{stream_type: :AAC_ADTS}} -> | ||
| [{id, %RemoteStream{content_format: AAC}}] | ||
|
|
||
| {id, %{stream_type: :H264}} -> | ||
| {id, %{stream_type: :H264_AVC}} -> | ||
| [{id, %RemoteStream{content_format: H264}}] | ||
|
|
||
| {id, unsupported_stream_info} -> | ||
|
|
@@ -80,47 +81,58 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do | |
|
|
||
| @impl true | ||
| def pop_chunk(%__MODULE__{} = demuxing_engine, track_id) do | ||
| with {[packet], demuxer} <- Demuxer.take(demuxing_engine.demuxer, track_id) do | ||
| {maybe_tden_tag, demuxer} = maybe_read_tden_tag(demuxer, packet.pts) | ||
| with {[packet], demuxing_engine} <- take_packets(demuxing_engine, track_id) do | ||
| {maybe_tden_tag, demuxing_engine} = maybe_read_tden_tag(demuxing_engine, packet.payload.pts) | ||
| tden_tag = maybe_tden_tag || demuxing_engine.last_tden_tag | ||
|
||
|
|
||
| {demuxing_engine, packet} = | ||
| %{demuxing_engine | demuxer: demuxer, last_tden_tag: tden_tag} | ||
| %{demuxing_engine | last_tden_tag: tden_tag} | ||
| |> handle_possible_timestamps_rollover(track_id, packet) | ||
|
|
||
| chunk = %ExHLS.Chunk{ | ||
| payload: packet.data, | ||
| pts_ms: packet.pts |> packet_ts_to_millis(), | ||
| dts_ms: packet.dts |> packet_ts_to_millis(), | ||
| payload: packet.payload.data, | ||
| pts_ms: packet.payload.pts |> packet_ts_to_millis(), | ||
| dts_ms: packet.payload.dts |> packet_ts_to_millis(), | ||
| track_id: track_id, | ||
| metadata: %{ | ||
| discontinuity: packet.discontinuity, | ||
| is_aligned: packet.is_aligned, | ||
| discontinuity: packet.payload.discontinuity, | ||
| is_aligned: packet.payload.is_aligned, | ||
| tden_tag: tden_tag | ||
| } | ||
| } | ||
|
|
||
| {:ok, chunk, demuxing_engine} | ||
| else | ||
| {[], demuxer} -> | ||
| {:error, :empty_track_data, %{demuxing_engine | demuxer: demuxer}} | ||
| {[], demuxing_engine} -> | ||
| {:error, :empty_track_data, demuxing_engine} | ||
| end | ||
| end | ||
|
|
||
| defp maybe_read_tden_tag(demuxer, packet_pts) do | ||
| defp take_packets(demuxing_engine, track_id) do | ||
| case Map.get(demuxing_engine.packets_map, track_id) do | ||
| [packet | rest] -> | ||
| demuxing_engine = put_in(demuxing_engine.packets_map[track_id], rest) | ||
| {[packet], demuxing_engine} | ||
|
|
||
| _other -> | ||
| {[], demuxing_engine} | ||
| end | ||
|
||
| end | ||
|
|
||
| defp maybe_read_tden_tag(demuxing_engine, packet_pts) do | ||
| withl no_id3_stream: | ||
| {id3_track_id, _stream_description} <- | ||
| demuxer.pmt.streams | ||
| demuxing_engine.demuxer.streams | ||
| |> Enum.find(fn {_pid, stream_description} -> | ||
| stream_description.stream_type == :METADATA_IN_PES | ||
| end), | ||
| no_id3_data: {[id3], demuxer} <- Demuxer.take(demuxer, id3_track_id), | ||
| id3_not_in_timerange: true <- id3.pts <= packet_pts do | ||
| {parse_tden_tag(id3.data), demuxer} | ||
| no_id3_data: {[id3], demuxing_engine} <- take_packets(demuxing_engine, id3_track_id), | ||
| id3_not_in_timerange: true <- id3.payload.pts <= packet_pts do | ||
| {parse_tden_tag(id3.payload.data), demuxing_engine} | ||
| else | ||
| no_id3_stream: nil -> {nil, demuxer} | ||
| no_id3_data: {[], updated_demuxer} -> {nil, updated_demuxer} | ||
| id3_not_in_timerange: false -> {nil, demuxer} | ||
| no_id3_stream: nil -> {nil, demuxing_engine} | ||
| no_id3_data: {[], updated_demuxing_engine} -> {nil, updated_demuxing_engine} | ||
| id3_not_in_timerange: false -> {nil, demuxing_engine} | ||
| end | ||
| end | ||
|
|
||
|
|
@@ -143,7 +155,7 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do | |
|
|
||
| @impl true | ||
| def end_stream(%__MODULE__{} = demuxing_engine) do | ||
| demuxer = Demuxer.end_of_stream(demuxing_engine.demuxer) | ||
| {_flushed, demuxer} = Demuxer.flush(demuxing_engine.demuxer) | ||
| %{demuxing_engine | demuxer: demuxer} | ||
| end | ||
|
|
||
|
|
@@ -155,32 +167,34 @@ defmodule ExHLS.DemuxingEngine.MPEGTS do | |
|
|
||
| rollovers_offset = rollovers_count * @timestamp_range_size_ns | ||
|
|
||
| packet = | ||
| packet | ||
| payload = | ||
| packet.payload | ||
| |> Map.update!(:pts, &add_offset_if_not_nil(&1, rollovers_offset)) | ||
| |> Map.update!(:dts, &add_offset_if_not_nil(&1, rollovers_offset)) | ||
|
|
||
| {demuxing_engine, packet} = | ||
| {demuxing_engine, payload} = | ||
| with last_ts when last_ts != nil <- last_dts || last_pts, | ||
| true <- last_ts > (packet.dts || packet.pts) do | ||
| true <- last_ts > (payload.dts || payload.pts) do | ||
| demuxing_engine = | ||
| demuxing_engine | ||
| |> update_in([:track_timestamps_data, track_id, :rollovers_count], &(&1 + 1)) | ||
|
|
||
| packet = | ||
| packet | ||
| payload = | ||
| payload | ||
| |> Map.update!(:pts, &add_offset_if_not_nil(&1, @timestamp_range_size_ns)) | ||
| |> Map.update!(:dts, &add_offset_if_not_nil(&1, @timestamp_range_size_ns)) | ||
|
|
||
| {demuxing_engine, packet} | ||
| {demuxing_engine, payload} | ||
| else | ||
| _other -> {demuxing_engine, packet} | ||
| _other -> {demuxing_engine, payload} | ||
| end | ||
|
|
||
| demuxing_engine = | ||
| demuxing_engine | ||
| |> put_in([:track_timestamps_data, track_id, :last_pts], packet.pts) | ||
| |> put_in([:track_timestamps_data, track_id, :last_dts], packet.dts) | ||
| |> put_in([:track_timestamps_data, track_id, :last_pts], payload.pts) | ||
| |> put_in([:track_timestamps_data, track_id, :last_dts], payload.dts) | ||
|
|
||
| packet = %{packet | payload: payload} | ||
|
|
||
| {demuxing_engine, packet} | ||
| end | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use
QexThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, done