|
1 | 1 | As explained in the [Architecture chapter](08_RTSP_Architecture.md), the pipeline will consist of a RTSP Source and a HLS Sink. |
2 | 2 |
|
3 | | -The initial pipeline will consist only of the `RTSP Source` and it'll start establishing the connection with the RTSP server. |
4 | | - |
5 | | -<!--The flow of the pipeline will consist of three steps. First, when the pipeline is initialized we will start the Connection Manager, which will set up the RTP stream via the RTSP.--> |
6 | | -<!--Once that is finished, we will set up two initial elements in the pipeline - the `UDP Source` and `RTP SessionBin`, which will allow us to receive RTP packets and process them.--> |
7 | | -<!--When the SessionBin detects that the RTP stream has been started, it will notify the pipeline with the `:new_rtp_stream` notification. Later on, we will add the remaining elements to the pipeline, allowing for the whole conversion process to take place.--> |
8 | | - |
9 | | -<!--Those steps take place, respectively, in the: `handle_init/1`, `handle_other/3` and `handle_notification/4` callbacks. While the `handle_init/1` is rather intuitive, we will describe in detail what's happening in the other callbacks.--> |
10 | | - |
11 | | -<!--Let us explain what's going on in the `handle_other` callback:--> |
| 3 | +The initial pipeline will consist of the `RTSP Source`, which will start establishing the connection with the RTSP server, and the `HLS Sink Bin`. For now we won't connect this elements in any way, since we don't have information about what tracks we'll receive from the RTSP server which we're connecting with. |
12 | 4 |
|
13 | 5 | ##### lib/pipeline.ex |
14 | 6 | ```elixir |
15 | 7 | @impl true |
16 | 8 | def handle_init(_context, options) do |
17 | | -Logger.debug("Source handle_init options: #{inspect(options)}") |
18 | | - |
19 | | -spec = [ |
20 | | - child(:source, %Membrane.RTSP.Source{ |
21 | | - transport: {:udp, options.port, options.port + 5}, |
22 | | - allowed_media_types: [:video, :audio], |
23 | | - stream_uri: options.stream_url, |
24 | | - on_connection_closed: :send_eos |
25 | | - }) |
26 | | -] |
27 | | - |
28 | | -{[spec: spec], |
29 | | - %{ |
30 | | - output_path: options.output_path, |
31 | | - parent_pid: options.parent_pid, |
32 | | - tracks_left_to_link: nil, |
33 | | - track_specs: [] |
34 | | - }} |
35 | | -end |
36 | | -``` |
37 | | - |
38 | | -Once we receive the `{:set_up_tracks, tracks}` notification from the source we have the information what tracks have been set up during connection establishment and what we should expect. We take this information and store it, so that we link the source to the `HLS Sink Bin` correctly. |
39 | | - |
40 | | -```elixir |
41 | | -@impl true |
42 | | -def handle_child_notification({:set_up_tracks, tracks}, :source, _ctx, state) do |
43 | | - tracks_left_to_link = |
44 | | - [:audio, :video] |
45 | | - |> Enum.filter(fn media_type -> Enum.any?(tracks, &(&1.type == media_type)) end) |
| 9 | + spec = [ |
| 10 | + child(:source, %Membrane.RTSP.Source{ |
| 11 | + transport: {:udp, options.port, options.port + 5}, |
| 12 | + allowed_media_types: [:video, :audio], |
| 13 | + stream_uri: options.stream_url, |
| 14 | + on_connection_closed: :send_eos |
| 15 | + }), |
| 16 | + child(:hls, %Membrane.HTTPAdaptiveStream.SinkBin{ |
| 17 | + target_window_duration: Membrane.Time.seconds(120), |
| 18 | + manifest_module: Membrane.HTTPAdaptiveStream.HLS, |
| 19 | + storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{ |
| 20 | + directory: options.output_path |
| 21 | + } |
| 22 | + }) |
| 23 | + ] |
46 | 24 |
|
47 | | - {[], %{state | tracks_left_to_link: tracks_left_to_link}} |
| 25 | + {[spec: spec], %{parent_pid: options.parent_pid}} |
48 | 26 | end |
49 | 27 | ``` |
50 | 28 |
|
51 | | -When a `PLAY` request is eventually sent by the source, we should be prepared to receive streams that have been set up. When a new RTP stream is received and identified by the source, a message is set to the parent - the pipeline in our case - containing information necessary to handle the stream. |
52 | | -When we receive the `rtsp_setup_complete` message, we first define the new children for the pipeline, and links between them - the UDP Source and the RTP SessionBin. We also create the HLS Sink, however we won't be linking it just yet. With the message we receive the sps and pps inside the options, and we add them to the pipeline's state. |
53 | | - |
54 | | -Only after we receive the `:new_rtp_stream` notification we add the rest of the elements and link them with each other: |
| 29 | +Once we receive the `{:set_up_tracks, tracks}` notification from the source we have the information what tracks have been set up during connection establishment and what we should expect. First we filter these tracks, so that we have at most one video and audio track each. Then we can create specs that will connect output pads of the source with input pads of the sink appropriately - audio to audio and video to video. |
55 | 30 |
|
56 | 31 | ##### lib/pipeline.ex |
57 | 32 | ```elixir |
58 | 33 | @impl true |
59 | | -def handle_notification({:new_rtp_stream, ssrc, 96, _extensions}, :rtp, _ctx, state) do |
60 | | - actions = |
61 | | - if Map.has_key?(state, :rtp_started) do |
62 | | - [] |
63 | | - else |
64 | | - children = %{ |
65 | | - video_nal_parser: %Membrane.H264.FFmpeg.Parser{ |
66 | | - sps: state.video.sps, |
67 | | - pps: state.video.pps, |
68 | | - skip_until_keyframe?: true, |
69 | | - framerate: {30, 1}, |
70 | | - alignment: :au, |
71 | | - attach_nalus?: true |
72 | | - }, |
73 | | - video_payloader: Membrane.MP4.Payloader.H264, |
74 | | - video_cmaf_muxer: Membrane.MP4.Muxer.CMAF |
75 | | - } |
76 | | - |
77 | | - links = [ |
78 | | - link(:rtp) |
79 | | - |> via_out(Pad.ref(:output, ssrc), |
80 | | - options: [depayloader: Membrane.RTP.H264.Depayloader] |
81 | | - ) |
82 | | - |> to(:video_nal_parser) |
83 | | - |> to(:video_payloader) |
84 | | - |> to(:video_cmaf_muxer) |
85 | | - |> via_in(:input) |
86 | | - |> to(:hls) |
87 | | - ] |
88 | | - |
89 | | - [spec: %ParentSpec{children: children, links: links}] |
90 | | - end |
91 | | - |
92 | | - { {:ok, actions}, Map.put(state, :rtp_started, true) } |
| 34 | +def handle_child_notification({:set_up_tracks, tracks}, :source, _ctx, state) do |
| 35 | + track_specs = |
| 36 | + Enum.uniq_by(tracks, & &1.type) |
| 37 | + |> Enum.filter(&(&1.type in [:audio, :video])) |
| 38 | + |> Enum.map(fn track -> |
| 39 | + encoding = |
| 40 | + case track do |
| 41 | + %{type: :audio} -> :AAC |
| 42 | + %{type: :video} -> :H264 |
| 43 | + end |
| 44 | + |
| 45 | + get_child(:source) |
| 46 | + |> via_out(Pad.ref(:output, track.control_path)) |
| 47 | + |> via_in(:input, |
| 48 | + options: [encoding: encoding, segment_duration: Membrane.Time.seconds(4)] |
| 49 | + ) |
| 50 | + |> get_child(:hls) |
| 51 | + end) |
| 52 | + |
| 53 | + {[spec: track_specs], state} |
93 | 54 | end |
94 | 55 | ``` |
95 | 56 |
|
96 | | -First we check, if the stream hasn't started yet. That's because if we are restarting the pipeline there might be a previous RTP stream still being sent, so we might receive the `:new_rtp_stream` notification twice - once for the old and then for the new stream. We want to ignore any notification after the first one, as we want only a single copy of each media processing element. |
97 | | -Notice the sps and pps being passed to the H264 parser - they are necessary for decoding the stream. |
| 57 | +By doing this we are prepared to receive the streams when a `PLAY` request is eventually sent by the source and the server starts streaming. |
0 commit comments