[이동] SSE와 함께 Pion/WebRTC를 사용해 보세요

82384 단어 webrtcpiongo

소개



이번에는 Pion/WebRTC를 해보겠습니다.
  • pion/webrtc - GitHub

  • Pion에는 이미 좋은 예제가 있기 때문에 example-webrtc-applications의 SFU-WebSocket을 기반으로 샘플을 만들겠습니다.
  • example-webrtc-applications/sfu-ws - pion/example-webrtc-applications - GitHub

  • 이 점을 변경해 보겠습니다.
  • 신호에 SSE(Server-Sent Events) 사용
  • 수동으로 연결 시작

  • 마지막 샘플 프로젝트에 WebRTC 기능을 추가하겠습니다.


  • 그리고 나는 또한 이 게시물(특히 클라이언트 측)을 참조합니다.


  • 환경


  • Go ver.go1.18.2 windows/amd64
  • Node.js 버전 18.1.0

  • WebRTC SFU와 연결



    , 서버 측 응용 프로그램은 방금 신호를 위해 작동했습니다.
    신호를 보낸 후 클라이언트는 서로 직접 연결되었습니다.



    이번에는 서버 측 애플리케이션에만 연결됩니다.
    연결 후 클라이언트는 비디오 트랙과 오디오 트랙을 서버측 애플리케이션으로 보냅니다.
    그리고 서버 측 응용 프로그램은 다른 클라이언트의 트랙을 원격 트랙으로 클라이언트에 보냅니다.



    샘플



    이번에는 샘플 프로젝트를 GitHub에 게시합니다.
  • webappsample - GitHub

  • 고객 입장에서



    연결 프로세스는 서버 측 애플리케이션에서 시작되기 때문입니다.

    따라서 클라이언트 측에서는 오퍼 및 ICE 후보 이벤트만 처리하면 됩니다.
    그리고 시작할 때 RTCPeerConnection을 생성합니다.

    main.page.ts




    ...
    function handleReceivedMessage(value: string) {
        const message = JSON.parse(value);
        if(!checkIsClientMessage(message)) {
            return;
        }
        switch(message.event) {
            case "text":
                view.addReceivedText({ user: message.userName, message: message.data });
                break;
            case "offer":
                webrtc.handleOffer(JSON.parse(message.data));
                break;
            case "candidate":
                webrtc.handleCandidate(JSON.parse(message.data));
                break;
        }
    }
    function sendAnswer(data: RTCSessionDescriptionInit) {
        if(!hasAnyTexts(userName)) {
            return;
        }
        sse.sendMessage({userName, event: "answer", data: JSON.stringify(data)});
    }
    function sendCandidate(data: RTCIceCandidate) {
        if(!hasAnyTexts(userName)) {
            return;
        }
        sse.sendMessage({userName, event: "candidate", data: JSON.stringify(data)});
    }
    function checkIsClientMessage(value: any): value is ClientMessage {
        // All messages from the server-side application are sent as "ClientMessage".
        if(value == null) {
            return false;
        }
        if(("event" in value &&
            typeof value["event"] === "string") === false) {
            return false;
        }
        if(("data" in value &&
            typeof value["data"] === "string") === false) {
            return false;
        }
        return true;
    }
    init();
    


    webrtc.controller.ts




    export class WebRtcController {
        private webcamStream: MediaStream|null = null; 
        private peerConnection: RTCPeerConnection|null = null;
        private answerSentEvent: ((data: RTCSessionDescriptionInit) => void)|null = null;
        private candidateSentEvent: ((data: RTCIceCandidate) => void)|null = null;
        public init() {
            const localVideo = document.getElementById("local_video") as HTMLVideoElement;
            localVideo.addEventListener("canplay", () => {
                const width = 320;
                const height = localVideo.videoHeight / (localVideo.videoWidth/width);          
                localVideo.setAttribute("width", width.toString());
                localVideo.setAttribute("height", height.toString());
              }, false);
            navigator.mediaDevices.getUserMedia({ video: true, audio: true })
              .then(stream => {
                  localVideo.srcObject = stream;
                  localVideo.play();
                  this.webcamStream = stream;
                  // create a RTCPeerConnection after getting local MediaStream
                  this.connect();
              })
              .catch(err => console.error(`An error occurred: ${err}`));
        }
    ...
        /** handle received offer and send answer */
        public handleOffer(data: RTCSessionDescription|null|undefined) {
            if(this.peerConnection == null ||
                    data == null) {
                return;
            }
            this.peerConnection.setRemoteDescription(data);
            this.peerConnection.createAnswer()
                .then(answer => {
                    if(this.peerConnection != null) {
                        this.peerConnection.setLocalDescription(answer);
                    }
                    if(this.answerSentEvent != null) {
                        this.answerSentEvent(answer);
                    }
                });
        }
        /** add ICE Candidate */
        public handleCandidate(data: RTCIceCandidate|null|undefined) {
            if(this.peerConnection == null ||
                data == null) {
                return;
            }
            this.peerConnection.addIceCandidate(data);
        }
        private connect() {
            if(this.webcamStream == null) {
                return;
            }
            this.peerConnection = new RTCPeerConnection({
                iceServers: [{
                    urls: `stun:stun.l.google.com:19302`,  // A STUN server              
                }]
            });
            // Add remote video tracks as new video elements.
            this.peerConnection.ontrack = (ev) => {
                if (ev.track.kind === "audio" ||
                    ev.streams[0] == null) {
                  return;
                }    
                const remoteVideo = document.createElement("video");
                remoteVideo.srcObject = ev.streams[0];
                remoteVideo.autoplay = true;
                remoteVideo.controls = true;
                const videoArea = document.getElementById("remote_video_area") as HTMLElement;
                videoArea.appendChild(remoteVideo);
                ev.track.onmute = () => {
                    remoteVideo.play();
                };
                ev.streams[0].onremovetrack = () => {
                  if (remoteVideo.parentNode) {
                    remoteVideo.parentNode.removeChild(remoteVideo);
                  }
                };
              };
            this.webcamStream.getTracks().forEach(track => {
                if(this.peerConnection == null ||
                    this.webcamStream == null) {
                    return;
                }
                this.peerConnection.addTrack(track, this.webcamStream)
            });
            this.peerConnection.onicecandidate = ev => {
                if (ev.candidate == null ||
                    this.candidateSentEvent == null) {
                  return;
                }
                this.candidateSentEvent(ev.candidate);
            };
        }   
    }
    


    서버 측



    sseClient.go




    package main
    
    import (
        "encoding/json"
        "fmt"
        "io/ioutil"
        "log"
        "net/http"
    
        "github.com/pion/webrtc/v3"
    )
    
    type SSEClient struct {
        candidateFound        chan *webrtc.ICECandidate
        changeConnectionState chan webrtc.PeerConnectionState
        addTrack              chan *webrtc.TrackRemote
        userName              string
        w                     http.ResponseWriter
    }
    
    func newSSEClient(userName string, w http.ResponseWriter) *SSEClient {
        return &SSEClient{
            candidateFound:        make(chan *webrtc.ICECandidate),
            changeConnectionState: make(chan webrtc.PeerConnectionState),
            addTrack:              make(chan *webrtc.TrackRemote),
            userName:              userName,
            w:                     w,
        }
    }
    
    func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
        userName, err := getParam(r, "user")
        if err != nil {
            log.Println(err.Error())
            fmt.Fprint(w, "The parameters have no names")
            return
        }
        newClient := newSSEClient(userName, w)
        ps, err := NewPeerConnectionState(newClient)
        if err != nil {
            log.Println(err.Error())
            fmt.Fprint(w, "Failed connection")
            return
        }
        w.Header().Set("Content-Type", "text/event-stream")
        w.Header().Set("Cache-Control", "no-cache")
        w.Header().Set("Connection", "keep-alive")
    
        hub.register <- ps
    
        // For pushing data to clients, I call "flusher.Flush()"
        flusher, _ := w.(http.Flusher)
        defer func() {
            hub.unregister <- ps
            if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
                ps.peerConnection.Close()
            }
            close(newClient.candidateFound)
            close(newClient.changeConnectionState)
            close(newClient.addTrack)
        }()
        for {
            // handle PeerConnection events and close SSE event.
            select {
            case candidate := <-newClient.candidateFound:
                jsonValue, err := NewCandidateMessageJSON(newClient.userName, candidate)
                if err != nil {
                    log.Println(err.Error())
                    return
                }
                fmt.Fprintf(w, "data: %s\n\n", jsonValue)
                flusher.Flush()
            case track := <-newClient.addTrack:
                hub.addTrack <- track
            case connectionState := <-newClient.changeConnectionState:
                switch connectionState {
                case webrtc.PeerConnectionStateFailed:
                    return
                case webrtc.PeerConnectionStateClosed:
                    return
                }
            case <-r.Context().Done():
                // when "es.close()" is called, this loop operation will be ended.
                return
            }
        }
    }
    func sendSSEMessage(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
        w.Header().Set("Content-Type", "application/json")
        body, err := ioutil.ReadAll(r.Body)
    
        if err != nil {
            log.Println(err.Error())
            j, _ := json.Marshal(GetFailed("Failed reading values from body"))
            w.Write(j)
            return
        }
        message := &ClientMessage{}
        err = json.Unmarshal(body, &message)
        if err != nil {
            log.Println(err.Error())
            j, _ := json.Marshal(GetFailed("Failed converting to ClientMessage"))
            w.Write(j)
            return
        }
        w.WriteHeader(200)
        hub.broadcast <- *message
        data, _ := json.Marshal(GetSucceeded())
        w.Write(data)
    }
    


    peerConnectionState.go




    package main
    
    import (
        "github.com/pion/webrtc/v3"
    )
    
    type PeerConnectionState struct {
        peerConnection *webrtc.PeerConnection
        client         *SSEClient
    }
    
    func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
        peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{
            ICEServers: []webrtc.ICEServer{
                {
                    URLs: []string{
                        "stun:stun.l.google.com:19302",
                    },
                },
            },
        })
        if err != nil {
            return nil, err
        }
        for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} {
            if _, err := peerConnection.AddTransceiverFromKind(typ, webrtc.RTPTransceiverInit{
                Direction: webrtc.RTPTransceiverDirectionRecvonly,
            }); err != nil {
                return nil, err
            }
        }
        // Add event handlers.
        peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
            if i == nil {
                return
            }
            client.candidateFound <- i
        })
        peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
            // avoid panic after closing channel
            if p == webrtc.PeerConnectionStateClosed {
                _, ok := <-client.changeConnectionState
                if ok {
                    client.changeConnectionState <- p
                }
                return
            }
            client.changeConnectionState <- p
        })
        peerConnection.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
            client.addTrack <- t
        })
    
        return &PeerConnectionState{
            peerConnection: peerConnection,
            client:         client,
        }, nil
    }
    


    sseHub.go




    package main
    
    import (
        "encoding/json"
        "fmt"
        "log"
        "net/http"
        "time"
    
        "github.com/pion/rtcp"
        "github.com/pion/webrtc/v3"
    )
    
    type SSEHub struct {
        clients     map[*PeerConnectionState]bool
        broadcast   chan ClientMessage
        register    chan *PeerConnectionState
        unregister  chan *PeerConnectionState
        trackLocals map[string]*webrtc.TrackLocalStaticRTP
        addTrack    chan *webrtc.TrackRemote
    }
    
    func newSSEHub() *SSEHub {
        return &SSEHub{
            clients:     make(map[*PeerConnectionState]bool),
            broadcast:   make(chan ClientMessage),
            register:    make(chan *PeerConnectionState),
            unregister:  make(chan *PeerConnectionState),
            trackLocals: map[string]*webrtc.TrackLocalStaticRTP{},
            addTrack:    make(chan *webrtc.TrackRemote),
        }
    }
    func (h *SSEHub) run() {
        go func() {
            for range time.NewTicker(time.Second * 3).C {
                dispatchKeyFrame(h)
            }
        }()
        for {
            select {
            case client := <-h.register:
                h.clients[client] = true
                signalPeerConnections(h)
            case client := <-h.unregister:
                if _, ok := h.clients[client]; ok {
                    delete(h.clients, client)
                    signalPeerConnections(h)
                }
            case track := <-h.addTrack:
                trackLocal, err := webrtc.NewTrackLocalStaticRTP(track.Codec().RTPCodecCapability,
                    track.ID(), track.StreamID())
                if err != nil {
                    log.Println(err.Error())
                    return
                }
                h.trackLocals[track.ID()] = trackLocal
                signalPeerConnections(h)
                go updateTrackValue(h, track)
    
            case message := <-h.broadcast:
                handleReceivedMessage(h, message)
            }
        }
    }
    func updateTrackValue(h *SSEHub, track *webrtc.TrackRemote) {
        defer func() {
            delete(h.trackLocals, track.ID())
            signalPeerConnections(h)
        }()
    
        buf := make([]byte, 1500)
    
        for {
            i, _, err := track.Read(buf)
            if err != nil {
                return
            }
            if _, err = h.trackLocals[track.ID()].Write(buf[:i]); err != nil {
                return
            }
        }
    }
    func handleReceivedMessage(h *SSEHub, message ClientMessage) {
        switch message.Event {
        case TextEvent:
            m, _ := json.Marshal(message)
            jsonText := string(m)
    
            for client := range h.clients {
                flusher, _ := client.client.w.(http.Flusher)
    
                fmt.Fprintf(client.client.w, "data: %s\n\n", jsonText)
                flusher.Flush()
            }
        case CandidateEvent:
            candidate := webrtc.ICECandidateInit{}
            if err := json.Unmarshal([]byte(message.Data), &candidate); err != nil {
                log.Println(err)
                return
            }
            for pc := range h.clients {
                if pc.client.userName == message.UserName {
                    if err := pc.peerConnection.AddICECandidate(candidate); err != nil {
                        log.Println(err)
                        return
                    }
                }
            }
        case AnswerEvent:
            answer := webrtc.SessionDescription{}
            if err := json.Unmarshal([]byte(message.Data), &answer); err != nil {
                log.Println(err)
                return
            }
            for pc := range h.clients {
                if pc.client.userName == message.UserName {
                    if err := pc.peerConnection.SetRemoteDescription(answer); err != nil {
                        log.Println(err)
                        return
                    }
                }
            }
    
        }
    }
    func signalPeerConnections(h *SSEHub) {
        defer func() {
            dispatchKeyFrame(h)
        }()
        for syncAttempt := 0; ; syncAttempt++ {
            if syncAttempt == 25 {
                // Release the lock and attempt a sync in 3 seconds. We might be blocking a RemoveTrack or AddTrack
                go func() {
                    time.Sleep(time.Second * 3)
                    signalPeerConnections(h)
                }()
                return
            }
            // For ignoring errors like below, execute attemptSync multiple times.
            // InvalidModificationError: invalid proposed signaling state transition: have-local-offer->SetLocal(offer)->have-local-offer
            if !attemptSync(h) {
                break
            }
        }
    }
    // Share received tracks to all connected peers.
    func attemptSync(h *SSEHub) bool {
        for ps := range h.clients {
            if ps.peerConnection.ConnectionState() == webrtc.PeerConnectionStateClosed {
                delete(h.clients, ps)
                // We modified the slice, start from the beginning
                return true
            }
            existingSenders := map[string]bool{}
    
            for _, sender := range ps.peerConnection.GetSenders() {
                if sender.Track() == nil {
                    continue
                }
                existingSenders[sender.Track().ID()] = true
    
                if _, ok := h.trackLocals[sender.Track().ID()]; !ok {
                    if err := ps.peerConnection.RemoveTrack(sender); err != nil {
                        return true
                    }
                }
            }
            for _, receiver := range ps.peerConnection.GetReceivers() {
                if receiver.Track() == nil {
                    continue
                }
                existingSenders[receiver.Track().ID()] = true
            }
            for trackID := range h.trackLocals {
                if _, ok := existingSenders[trackID]; !ok {
                    if _, err := ps.peerConnection.AddTrack(h.trackLocals[trackID]); err != nil {
                        return true
                    }
                }
            }
    
            offer, err := ps.peerConnection.CreateOffer(nil)
            if err != nil {
                return true
            }
            messageJSON, err := NewOfferMessageJSON(ps.client.userName, offer)
            if err != nil {
                return true
            }
    
            if err = ps.peerConnection.SetLocalDescription(offer); err != nil {
                return true
            }
            flusher, _ := ps.client.w.(http.Flusher)
    
            fmt.Fprintf(ps.client.w, "data: %s\n\n", messageJSON)
            flusher.Flush()
        }
        return false
    }
    func dispatchKeyFrame(h *SSEHub) {
        for ps := range h.clients {
            for _, receiver := range ps.peerConnection.GetReceivers() {
                if receiver.Track() == nil {
                    continue
                }
    
                _ = ps.peerConnection.WriteRTCP([]rtcp.Packet{
                    &rtcp.PictureLossIndication{
                        MediaSSRC: uint32(receiver.Track().SSRC()),
                    },
                })
            }
        }
    }
    


    채널



    SSEClient 및 SSEHub에서 채널을 생성합니다.
    먼저 SSEHub에서 메시지를 보내기 위해 SSEClient에 일부 채널을 추가해 보았습니다.

    하지만 그렇게 하면 WebRTC에 연결한 후 문자 메시지를 보낼 때 응용 프로그램이 중단됩니다.
    순환 참조가 원인이라고 생각하기 때문에 이러한 채널을 제거하고 SSEHub에서 메시지를 보냅니다.

    자원


  • pion/webrtc - GitHub
  • Pion
  • example-webrtc-applications/sfu-ws - pion/example-webrtc-applications - GitHub
  • WebRTC For The Curious
  • 実用 Go言語――システム開発の現場で知っておきたいアドバイス
  • 좋은 웹페이지 즐겨찾기