[이동] SSE와 함께 Pion/WebRTC를 사용해 보세요
소개
이번에는 Pion/WebRTC를 해보겠습니다.
Pion에는 이미 좋은 예제가 있기 때문에 example-webrtc-applications의 SFU-WebSocket을 기반으로 샘플을 만들겠습니다.
이 점을 변경해 보겠습니다.
마지막 샘플 프로젝트에 WebRTC 기능을 추가하겠습니다.
그리고 나는 또한 이 게시물(특히 클라이언트 측)을 참조합니다.
환경
WebRTC SFU와 연결
, 서버 측 응용 프로그램은 방금 신호를 위해 작동했습니다.
신호를 보낸 후 클라이언트는 서로 직접 연결되었습니다.
이번에는 서버 측 애플리케이션에만 연결됩니다.
연결 후 클라이언트는 비디오 트랙과 오디오 트랙을 서버측 애플리케이션으로 보냅니다.
그리고 서버 측 응용 프로그램은 다른 클라이언트의 트랙을 원격 트랙으로 클라이언트에 보냅니다.
샘플
이번에는 샘플 프로젝트를 GitHub에 게시합니다.
고객 입장에서
연결 프로세스는 서버 측 애플리케이션에서 시작되기 때문입니다.
따라서 클라이언트 측에서는 오퍼 및 ICE 후보 이벤트만 처리하면 됩니다.
그리고 시작할 때 RTCPeerConnection을 생성합니다.
main.page.ts
...
function handleReceivedMessage(value: string) {
const message = JSON.parse(value);
if(!checkIsClientMessage(message)) {
return;
}
switch(message.event) {
case "text":
view.addReceivedText({ user: message.userName, message: message.data });
break;
case "offer":
webrtc.handleOffer(JSON.parse(message.data));
break;
case "candidate":
webrtc.handleCandidate(JSON.parse(message.data));
break;
}
}
function sendAnswer(data: RTCSessionDescriptionInit) {
if(!hasAnyTexts(userName)) {
return;
}
sse.sendMessage({userName, event: "answer", data: JSON.stringify(data)});
}
function sendCandidate(data: RTCIceCandidate) {
if(!hasAnyTexts(userName)) {
return;
}
sse.sendMessage({userName, event: "candidate", data: JSON.stringify(data)});
}
function checkIsClientMessage(value: any): value is ClientMessage {
// All messages from the server-side application are sent as "ClientMessage".
if(value == null) {
return false;
}
if(("event" in value &&
typeof value["event"] === "string") === false) {
return false;
}
if(("data" in value &&
typeof value["data"] === "string") === false) {
return false;
}
return true;
}
init();
webrtc.controller.ts
export class WebRtcController {
private webcamStream: MediaStream|null = null;
private peerConnection: RTCPeerConnection|null = null;
private answerSentEvent: ((data: RTCSessionDescriptionInit) => void)|null = null;
private candidateSentEvent: ((data: RTCIceCandidate) => void)|null = null;
public init() {
const localVideo = document.getElementById("local_video") as HTMLVideoElement;
localVideo.addEventListener("canplay", () => {
const width = 320;
const height = localVideo.videoHeight / (localVideo.videoWidth/width);
localVideo.setAttribute("width", width.toString());
localVideo.setAttribute("height", height.toString());
}, false);
navigator.mediaDevices.getUserMedia({ video: true, audio: true })
.then(stream => {
localVideo.srcObject = stream;
localVideo.play();
this.webcamStream = stream;
// create a RTCPeerConnection after getting local MediaStream
this.connect();
})
.catch(err => console.error(`An error occurred: ${err}`));
}
...
/** handle received offer and send answer */
public handleOffer(data: RTCSessionDescription|null|undefined) {
if(this.peerConnection == null ||
data == null) {
return;
}
this.peerConnection.setRemoteDescription(data);
this.peerConnection.createAnswer()
.then(answer => {
if(this.peerConnection != null) {
this.peerConnection.setLocalDescription(answer);
}
if(this.answerSentEvent != null) {
this.answerSentEvent(answer);
}
});
}
/** add ICE Candidate */
public handleCandidate(data: RTCIceCandidate|null|undefined) {
if(this.peerConnection == null ||
data == null) {
return;
}
this.peerConnection.addIceCandidate(data);
}
private connect() {
if(this.webcamStream == null) {
return;
}
this.peerConnection = new RTCPeerConnection({
iceServers: [{
urls: `stun:stun.l.google.com:19302`, // A STUN server
}]
});
// Add remote video tracks as new video elements.
this.peerConnection.ontrack = (ev) => {
if (ev.track.kind === "audio" ||
ev.streams[0] == null) {
return;
}
const remoteVideo = document.createElement("video");
remoteVideo.srcObject = ev.streams[0];
remoteVideo.autoplay = true;
remoteVideo.controls = true;
const videoArea = document.getElementById("remote_video_area") as HTMLElement;
videoArea.appendChild(remoteVideo);
ev.track.onmute = () => {
remoteVideo.play();
};
ev.streams[0].onremovetrack = () => {
if (remoteVideo.parentNode) {
remoteVideo.parentNode.removeChild(remoteVideo);
}
};
};
this.webcamStream.getTracks().forEach(track => {
if(this.peerConnection == null ||
this.webcamStream == null) {
return;
}
this.peerConnection.addTrack(track, this.webcamStream)
});
this.peerConnection.onicecandidate = ev => {
if (ev.candidate == null ||
this.candidateSentEvent == null) {
return;
}
this.candidateSentEvent(ev.candidate);
};
}
}
서버 측
sseClient.go
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"github.com/pion/webrtc/v3"
)
type SSEClient struct {
candidateFound chan *webrtc.ICECandidate
changeConnectionState chan webrtc.PeerConnectionState
addTrack chan *webrtc.TrackRemote
userName string
w http.ResponseWriter
}
func newSSEClient(userName string, w http.ResponseWriter) *SSEClient {
return &SSEClient{
candidateFound: make(chan *webrtc.ICECandidate),
changeConnectionState: make(chan webrtc.PeerConnectionState),
addTrack: make(chan *webrtc.TrackRemote),
userName: userName,
w: w,
}
}
func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
userName, err := getParam(r, "user")
if err != nil {
log.Println(err.Error())
fmt.Fprint(w, "The parameters have no names")
return
}
newClient := newSSEClient(userName, w)
ps, err := NewPeerConnectionState(newClient)
if err != nil {
log.Println(err.Error())
fmt.Fprint(w, "Failed connection")
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
hub.register <- ps
// For pushing data to clients, I call "flusher.Flush()"
flusher, _ := w.(http.Flusher)
defer func() {
hub.unregister <- ps
if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
ps.peerConnection.Close()
}
close(newClient.candidateFound)
close(newClient.changeConnectionState)
close(newClient.addTrack)
}()
for {
// handle PeerConnection events and close SSE event.
select {
case candidate := <-newClient.candidateFound:
jsonValue, err := NewCandidateMessageJSON(newClient.userName, candidate)
if err != nil {
log.Println(err.Error())
return
}
fmt.Fprintf(w, "data: %s\n\n", jsonValue)
flusher.Flush()
case track := <-newClient.addTrack:
hub.addTrack <- track
case connectionState := <-newClient.changeConnectionState:
switch connectionState {
case webrtc.PeerConnectionStateFailed:
return
case webrtc.PeerConnectionStateClosed:
return
}
case <-r.Context().Done():
// when "es.close()" is called, this loop operation will be ended.
return
}
}
}
func sendSSEMessage(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
w.Header().Set("Content-Type", "application/json")
body, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println(err.Error())
j, _ := json.Marshal(GetFailed("Failed reading values from body"))
w.Write(j)
return
}
message := &ClientMessage{}
err = json.Unmarshal(body, &message)
if err != nil {
log.Println(err.Error())
j, _ := json.Marshal(GetFailed("Failed converting to ClientMessage"))
w.Write(j)
return
}
w.WriteHeader(200)
hub.broadcast <- *message
data, _ := json.Marshal(GetSucceeded())
w.Write(data)
}
peerConnectionState.go
package main
import (
"github.com/pion/webrtc/v3"
)
type PeerConnectionState struct {
peerConnection *webrtc.PeerConnection
client *SSEClient
}
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{
"stun:stun.l.google.com:19302",
},
},
},
})
if err != nil {
return nil, err
}
for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} {
if _, err := peerConnection.AddTransceiverFromKind(typ, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
}); err != nil {
return nil, err
}
}
// Add event handlers.
peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
if i == nil {
return
}
client.candidateFound <- i
})
peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
// avoid panic after closing channel
if p == webrtc.PeerConnectionStateClosed {
_, ok := <-client.changeConnectionState
if ok {
client.changeConnectionState <- p
}
return
}
client.changeConnectionState <- p
})
peerConnection.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
client.addTrack <- t
})
return &PeerConnectionState{
peerConnection: peerConnection,
client: client,
}, nil
}
sseHub.go
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
)
type SSEHub struct {
clients map[*PeerConnectionState]bool
broadcast chan ClientMessage
register chan *PeerConnectionState
unregister chan *PeerConnectionState
trackLocals map[string]*webrtc.TrackLocalStaticRTP
addTrack chan *webrtc.TrackRemote
}
func newSSEHub() *SSEHub {
return &SSEHub{
clients: make(map[*PeerConnectionState]bool),
broadcast: make(chan ClientMessage),
register: make(chan *PeerConnectionState),
unregister: make(chan *PeerConnectionState),
trackLocals: map[string]*webrtc.TrackLocalStaticRTP{},
addTrack: make(chan *webrtc.TrackRemote),
}
}
func (h *SSEHub) run() {
go func() {
for range time.NewTicker(time.Second * 3).C {
dispatchKeyFrame(h)
}
}()
for {
select {
case client := <-h.register:
h.clients[client] = true
signalPeerConnections(h)
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
signalPeerConnections(h)
}
case track := <-h.addTrack:
trackLocal, err := webrtc.NewTrackLocalStaticRTP(track.Codec().RTPCodecCapability,
track.ID(), track.StreamID())
if err != nil {
log.Println(err.Error())
return
}
h.trackLocals[track.ID()] = trackLocal
signalPeerConnections(h)
go updateTrackValue(h, track)
case message := <-h.broadcast:
handleReceivedMessage(h, message)
}
}
}
func updateTrackValue(h *SSEHub, track *webrtc.TrackRemote) {
defer func() {
delete(h.trackLocals, track.ID())
signalPeerConnections(h)
}()
buf := make([]byte, 1500)
for {
i, _, err := track.Read(buf)
if err != nil {
return
}
if _, err = h.trackLocals[track.ID()].Write(buf[:i]); err != nil {
return
}
}
}
func handleReceivedMessage(h *SSEHub, message ClientMessage) {
switch message.Event {
case TextEvent:
m, _ := json.Marshal(message)
jsonText := string(m)
for client := range h.clients {
flusher, _ := client.client.w.(http.Flusher)
fmt.Fprintf(client.client.w, "data: %s\n\n", jsonText)
flusher.Flush()
}
case CandidateEvent:
candidate := webrtc.ICECandidateInit{}
if err := json.Unmarshal([]byte(message.Data), &candidate); err != nil {
log.Println(err)
return
}
for pc := range h.clients {
if pc.client.userName == message.UserName {
if err := pc.peerConnection.AddICECandidate(candidate); err != nil {
log.Println(err)
return
}
}
}
case AnswerEvent:
answer := webrtc.SessionDescription{}
if err := json.Unmarshal([]byte(message.Data), &answer); err != nil {
log.Println(err)
return
}
for pc := range h.clients {
if pc.client.userName == message.UserName {
if err := pc.peerConnection.SetRemoteDescription(answer); err != nil {
log.Println(err)
return
}
}
}
}
}
func signalPeerConnections(h *SSEHub) {
defer func() {
dispatchKeyFrame(h)
}()
for syncAttempt := 0; ; syncAttempt++ {
if syncAttempt == 25 {
// Release the lock and attempt a sync in 3 seconds. We might be blocking a RemoveTrack or AddTrack
go func() {
time.Sleep(time.Second * 3)
signalPeerConnections(h)
}()
return
}
// For ignoring errors like below, execute attemptSync multiple times.
// InvalidModificationError: invalid proposed signaling state transition: have-local-offer->SetLocal(offer)->have-local-offer
if !attemptSync(h) {
break
}
}
}
// Share received tracks to all connected peers.
func attemptSync(h *SSEHub) bool {
for ps := range h.clients {
if ps.peerConnection.ConnectionState() == webrtc.PeerConnectionStateClosed {
delete(h.clients, ps)
// We modified the slice, start from the beginning
return true
}
existingSenders := map[string]bool{}
for _, sender := range ps.peerConnection.GetSenders() {
if sender.Track() == nil {
continue
}
existingSenders[sender.Track().ID()] = true
if _, ok := h.trackLocals[sender.Track().ID()]; !ok {
if err := ps.peerConnection.RemoveTrack(sender); err != nil {
return true
}
}
}
for _, receiver := range ps.peerConnection.GetReceivers() {
if receiver.Track() == nil {
continue
}
existingSenders[receiver.Track().ID()] = true
}
for trackID := range h.trackLocals {
if _, ok := existingSenders[trackID]; !ok {
if _, err := ps.peerConnection.AddTrack(h.trackLocals[trackID]); err != nil {
return true
}
}
}
offer, err := ps.peerConnection.CreateOffer(nil)
if err != nil {
return true
}
messageJSON, err := NewOfferMessageJSON(ps.client.userName, offer)
if err != nil {
return true
}
if err = ps.peerConnection.SetLocalDescription(offer); err != nil {
return true
}
flusher, _ := ps.client.w.(http.Flusher)
fmt.Fprintf(ps.client.w, "data: %s\n\n", messageJSON)
flusher.Flush()
}
return false
}
func dispatchKeyFrame(h *SSEHub) {
for ps := range h.clients {
for _, receiver := range ps.peerConnection.GetReceivers() {
if receiver.Track() == nil {
continue
}
_ = ps.peerConnection.WriteRTCP([]rtcp.Packet{
&rtcp.PictureLossIndication{
MediaSSRC: uint32(receiver.Track().SSRC()),
},
})
}
}
}
채널
SSEClient 및 SSEHub에서 채널을 생성합니다.
먼저 SSEHub에서 메시지를 보내기 위해 SSEClient에 일부 채널을 추가해 보았습니다.
하지만 그렇게 하면 WebRTC에 연결한 후 문자 메시지를 보낼 때 응용 프로그램이 중단됩니다.
순환 참조가 원인이라고 생각하기 때문에 이러한 채널을 제거하고 SSEHub에서 메시지를 보냅니다.
자원
Reference
이 문제에 관하여([이동] SSE와 함께 Pion/WebRTC를 사용해 보세요), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/masanori_msl/go-try-pionwebrtc-with-sse-582텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)