Skip to content

Commit d5c2082

Browse files
committed
Implement Cascading SFUs
A Client can now subscribe to a FOCI that it isn't connected with
1 parent 2db16e1 commit d5c2082

File tree

5 files changed

+410
-245
lines changed

5 files changed

+410
-245
lines changed

cascade.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
"fmt"
7+
"io/ioutil"
8+
"log"
9+
"net/http"
10+
"strings"
11+
12+
"github.com/pion/webrtc/v3"
13+
)
14+
15+
// Given a FOCI + CallID + DeviceID + Purpose establish a session and Subscribe. Take
16+
// the media from the remote and copy it to a `webrtc.TrackLocal` so we can re-send
17+
func remoteStreamLookup(msg dataChannelMessage) (webrtc.TrackLocal, webrtc.TrackLocal) {
18+
audioTrack, videoTrack := make(chan webrtc.TrackLocal, 1), make(chan webrtc.TrackLocal, 1)
19+
20+
peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{})
21+
if err != nil {
22+
panic(err)
23+
}
24+
25+
peerConnection.OnTrack(func(trackRemote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
26+
trackLocal, err := webrtc.NewTrackLocalStaticRTP(trackRemote.Codec().RTPCodecCapability, trackRemote.ID(), trackRemote.StreamID())
27+
if err != nil {
28+
panic(err)
29+
}
30+
31+
if strings.Contains(trackRemote.Codec().MimeType, "video") {
32+
videoTrack <- trackLocal
33+
} else {
34+
audioTrack <- trackLocal
35+
}
36+
37+
copyRemoteToLocal(trackRemote, trackLocal)
38+
})
39+
40+
dataChannel, err := peerConnection.CreateDataChannel("signaling", nil)
41+
if err != nil {
42+
panic(err)
43+
}
44+
45+
dataChannel.OnOpen(func() {
46+
if _, err := peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
47+
panic(err)
48+
}
49+
50+
if _, err := peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil {
51+
panic(err)
52+
}
53+
54+
offer, err := peerConnection.CreateOffer(nil)
55+
if err != nil {
56+
panic(err)
57+
}
58+
59+
if err := peerConnection.SetLocalDescription(offer); err != nil {
60+
panic(err)
61+
}
62+
63+
msg.SDP = offer.SDP
64+
marshaled, err := json.Marshal(msg)
65+
if err != nil {
66+
panic(err)
67+
}
68+
69+
if err = dataChannel.SendText(string(marshaled)); err != nil {
70+
panic(err)
71+
}
72+
})
73+
74+
dataChannel.OnMessage(func(m webrtc.DataChannelMessage) {
75+
if !m.IsString {
76+
log.Fatal("Inbound message is not string")
77+
}
78+
79+
cascadedMsg := &dataChannelMessage{}
80+
if err := json.Unmarshal(m.Data, cascadedMsg); err != nil {
81+
log.Fatal(err)
82+
}
83+
84+
switch cascadedMsg.Event {
85+
case "error":
86+
audioTrack <- nil
87+
videoTrack <- nil
88+
case "subscribe":
89+
if err := peerConnection.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeAnswer, SDP: cascadedMsg.SDP}); err != nil {
90+
panic(err)
91+
}
92+
93+
default:
94+
log.Fatalf("Unknown msg Event type %s", msg.Event)
95+
}
96+
})
97+
98+
offer, err := peerConnection.CreateOffer(nil)
99+
if err != nil {
100+
panic(err)
101+
}
102+
103+
if err := peerConnection.SetLocalDescription(offer); err != nil {
104+
panic(err)
105+
}
106+
107+
resp, err := http.Post("http://"+msg.FOCI+"/createSession", "application/text", bytes.NewBuffer([]byte(offer.SDP)))
108+
if err != nil {
109+
panic(err)
110+
}
111+
defer resp.Body.Close()
112+
113+
if resp.StatusCode != http.StatusOK {
114+
panic(fmt.Sprintf("Got HTTP Status code %d", resp.StatusCode))
115+
}
116+
117+
body, err := ioutil.ReadAll(resp.Body)
118+
if err != nil {
119+
panic(err)
120+
}
121+
122+
if err := peerConnection.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeAnswer, SDP: string(body)}); err != nil {
123+
panic(err)
124+
}
125+
126+
return <-audioTrack, <-videoTrack
127+
128+
}

foci.go

Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
package main
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"io"
7+
"log"
8+
"net/http"
9+
"strings"
10+
"sync"
11+
"time"
12+
13+
"github.com/pion/rtcp"
14+
"github.com/pion/webrtc/v3"
15+
)
16+
17+
type streamDetail struct {
18+
callID, deviceID, purpose string
19+
track *webrtc.TrackLocalStaticRTP
20+
}
21+
22+
type setStreamDetails func(newCallID, newDeviceID, newPurpose string)
23+
24+
type foci struct {
25+
name string
26+
streamDetailsMu sync.RWMutex
27+
streamDetails []streamDetail
28+
}
29+
30+
func (f *foci) localStreamLookup(msg dataChannelMessage) (audioTrack, videoTrack webrtc.TrackLocal) {
31+
f.streamDetailsMu.Lock()
32+
defer f.streamDetailsMu.Unlock()
33+
34+
for _, s := range f.streamDetails {
35+
if s.callID == msg.CallID && s.deviceID == msg.DeviceID && s.purpose == msg.Purpose {
36+
if s.track.Kind() == webrtc.RTPCodecTypeAudio {
37+
audioTrack = s.track
38+
} else {
39+
videoTrack = s.track
40+
}
41+
}
42+
}
43+
return
44+
}
45+
46+
func (f *foci) dataChannelHandler(peerConnection *webrtc.PeerConnection, d *webrtc.DataChannel, setPublishDetails setStreamDetails) {
47+
sendError := func(errMsg string) {
48+
marshaled, err := json.Marshal(&dataChannelMessage{
49+
Event: "error",
50+
Message: errMsg,
51+
})
52+
if err != nil {
53+
panic(err)
54+
}
55+
56+
if err = d.SendText(string(marshaled)); err != nil {
57+
panic(err)
58+
}
59+
}
60+
61+
d.OnMessage(func(m webrtc.DataChannelMessage) {
62+
if !m.IsString {
63+
log.Fatal("Inbound message is not string")
64+
}
65+
66+
msg := &dataChannelMessage{}
67+
if err := json.Unmarshal(m.Data, msg); err != nil {
68+
log.Fatal(err)
69+
}
70+
71+
switch msg.Event {
72+
case "publish":
73+
if err := peerConnection.SetRemoteDescription(webrtc.SessionDescription{
74+
Type: webrtc.SDPTypeOffer,
75+
SDP: msg.SDP,
76+
}); err != nil {
77+
panic(err)
78+
}
79+
80+
answer, err := peerConnection.CreateAnswer(nil)
81+
if err != nil {
82+
panic(err)
83+
}
84+
85+
if err := peerConnection.SetLocalDescription(answer); err != nil {
86+
panic(err)
87+
}
88+
89+
setPublishDetails(msg.CallID, msg.DeviceID, msg.Purpose)
90+
91+
msg.SDP = answer.SDP
92+
marshaled, err := json.Marshal(msg)
93+
if err != nil {
94+
panic(err)
95+
}
96+
97+
if err = d.SendText(string(marshaled)); err != nil {
98+
panic(err)
99+
}
100+
case "subscribe":
101+
var audioTrack, videoTrack webrtc.TrackLocal
102+
103+
if msg.FOCI == f.name {
104+
audioTrack, videoTrack = f.localStreamLookup(*msg)
105+
} else {
106+
audioTrack, videoTrack = remoteStreamLookup(*msg)
107+
}
108+
109+
if audioTrack == nil && videoTrack == nil {
110+
sendError("No Such Stream")
111+
return
112+
}
113+
114+
if err := peerConnection.SetRemoteDescription(webrtc.SessionDescription{
115+
Type: webrtc.SDPTypeOffer,
116+
SDP: msg.SDP,
117+
}); err != nil {
118+
panic(err)
119+
}
120+
121+
if audioTrack != nil {
122+
if _, err := peerConnection.AddTrack(audioTrack); err != nil {
123+
panic(err)
124+
}
125+
}
126+
127+
if videoTrack != nil {
128+
if _, err := peerConnection.AddTrack(videoTrack); err != nil {
129+
panic(err)
130+
}
131+
}
132+
133+
answer, err := peerConnection.CreateAnswer(nil)
134+
if err != nil {
135+
panic(err)
136+
}
137+
138+
if err := peerConnection.SetLocalDescription(answer); err != nil {
139+
panic(err)
140+
}
141+
142+
msg.SDP = answer.SDP
143+
marshaled, err := json.Marshal(msg)
144+
if err != nil {
145+
panic(err)
146+
}
147+
148+
if err = d.SendText(string(marshaled)); err != nil {
149+
panic(err)
150+
}
151+
default:
152+
log.Fatalf("Unknown msg Event type %s", msg.Event)
153+
}
154+
})
155+
}
156+
157+
func (f *foci) handleCreateSession(w http.ResponseWriter, r *http.Request) error {
158+
offer, err := io.ReadAll(r.Body)
159+
if err != nil {
160+
return err
161+
}
162+
163+
peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{})
164+
if err != nil {
165+
return err
166+
}
167+
168+
var (
169+
publishDetailsMu sync.RWMutex
170+
callID, deviceID, purpose string
171+
)
172+
setPublishDetails := func(newCallID, newDeviceID, newPurpose string) {
173+
publishDetailsMu.Lock()
174+
defer publishDetailsMu.Unlock()
175+
176+
callID = newCallID
177+
deviceID = newDeviceID
178+
purpose = newPurpose
179+
}
180+
181+
peerConnection.OnTrack(func(trackRemote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
182+
id := "audio"
183+
if strings.Contains(trackRemote.Codec().MimeType, "video") {
184+
id = "video"
185+
186+
// Send a PLI on an interval so that the publisher is pushing a keyframe every rtcpPLIInterval
187+
go func() {
188+
ticker := time.NewTicker(time.Millisecond * 200)
189+
for range ticker.C {
190+
if errSend := peerConnection.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(trackRemote.SSRC())}}); errSend != nil {
191+
fmt.Println(errSend)
192+
}
193+
}
194+
}()
195+
196+
}
197+
198+
publishDetailsMu.Lock()
199+
f.streamDetailsMu.Lock()
200+
trackLocal, err := webrtc.NewTrackLocalStaticRTP(trackRemote.Codec().RTPCodecCapability, id, fmt.Sprintf("%s-%s-%s", callID, deviceID, purpose))
201+
if err != nil {
202+
panic(err)
203+
}
204+
205+
f.streamDetails = append(f.streamDetails, streamDetail{
206+
callID: callID,
207+
deviceID: deviceID,
208+
purpose: purpose,
209+
track: trackLocal,
210+
})
211+
f.streamDetailsMu.Unlock()
212+
publishDetailsMu.Unlock()
213+
214+
copyRemoteToLocal(trackRemote, trackLocal)
215+
})
216+
217+
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
218+
f.dataChannelHandler(peerConnection, d, setPublishDetails)
219+
})
220+
221+
peerConnection.SetRemoteDescription(webrtc.SessionDescription{
222+
Type: webrtc.SDPTypeOffer,
223+
SDP: string(offer),
224+
})
225+
226+
answer, err := peerConnection.CreateAnswer(nil)
227+
if err != nil {
228+
return err
229+
}
230+
231+
gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
232+
if err = peerConnection.SetLocalDescription(answer); err != nil {
233+
return err
234+
}
235+
<-gatherComplete
236+
237+
_, err = fmt.Fprintf(w, peerConnection.LocalDescription().SDP)
238+
return err
239+
}
240+
241+
func copyRemoteToLocal(trackRemote *webrtc.TrackRemote, trackLocal *webrtc.TrackLocalStaticRTP) {
242+
buff := make([]byte, 1500)
243+
for {
244+
i, _, err := trackRemote.Read(buff)
245+
if err != nil {
246+
panic(err)
247+
}
248+
249+
if _, err = trackLocal.Write(buff[:i]); err != nil {
250+
panic(err)
251+
}
252+
}
253+
254+
}

0 commit comments

Comments
 (0)