Spring을 사용한 RTP 에이전트 구축

25231 단어 rtpspringjava

Originally published here.


나는 줄곧 오디오/영상 흐름이 어떻게 작동하는지 깊이 이해하고 있다.이 세상의 일부분은 RTP다.그것은 RFC 3550에서 정의된 것이다. 나는 그것을 읽은 적이 있지만, 그것들이 나를 위해 일하는 것을 보고 나서야 비로소 그것들을 진정으로 이해하게 되었다. 그러므로 우리는 이렇게 하자.이것이 바로 우리가 건설하고 있는 것이다.
  • ffmpeg가 오디오 파일
  • 에서 RTP 스트림 생성
  • RTP 수신기는 이 흐름을 사용하여 웹소켓 흐름을 만들어 웹소켓 서버에 전송
  • 웹소켓 서버 사용 흐름 및 파일에 쓰기
  • 이 실험의 코드는 https://github.com/lucaspin/spring-replication-proxy에서 찾을 수 있다.

    RTP 헤드 구조


    다음은 RTP 헤드 구조입니다RFC.
    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
    |V=2|P|X|  CC   |M|     PT      |       sequence number         |
    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
    |                           timestamp                           |
    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
    |           synchronization source (SSRC) identifier            |
    +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
    |            contributing source (CSRC) identifiers             |
    |                             ....                              |
    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
    
    여기에는 몇 가지 주의가 필요합니다.
  • version (V): 패킷의 처음 두 바이트입니다.이는 항상 2이므로 RTP의 새로운 버전이 제시되기 전까지는 이렇습니다.생각할 점은 비트 2개만 사용했기 때문에 버전 3만 존재할 수 있는가?
  • contributing sources count (CC): 헤더의 CSRC 표지부 부분과 함께 데이터 패키지에 몇 개의 표지부가 있는지 정의합니다.4자리만 사용할 수 있으므로 최대 15개의 CSC 식별자를 사용할 수 있습니다.
  • payload type (PT): 이 패키지의 미디어에 사용되는 형식입니다.이것은 설정 파일의 상하문에서만 의미가 있습니다.RFC 3551은 좋은 예를 정의합니다.구성 파일은 어떤 값이 어떤 형식에 비치는지 정의합니다.유효한 하중 유형은 정적 또는 동적일 수 있습니다.정적 유효 부하 형식에 대해 발송자가 사용하는 설정 파일만 알면 됩니다.동적 부하 형식에 대해 설정 파일과 다른 정보를 필요로 합니다.일반적으로 SDP의 설명은'다른 것'이다.
  • sequence number: 발송자는 발송하기 전에 모든 데이터 패키지에 서열 번호를 놓아서 수신자가 정확한 순서에 따라 미디어 데이터 패키지를 재조합할 수 있도록 한다.이따가 왜 그게 중요한지 들으실 거예요.
  • timestamp: 왜 시퀀스 번호와 시간 스탬프가 필요합니까?
  • SSRC identifier: 동기화 소스;서로 다른 출처에서 온 미디어 흐름을 구분하는 유일한 id입니다.
  • CSRC identifiers: 공헌원천;RTP 번역자가 같은 미디어 흐름에 혼합된 원시 원본을 보존하는 방법
  • P(padding),X(extension)와M(marker) 필드는 패키지 구조의 변화를 지시하는 표지로서 현재는 우리에게 중요하지 않다.따라서 RTP 헤더는 다음과 같이 정의할 수 있습니다.
    public class RTPPacket {
        private final int version;
        private final boolean padding;
        private final boolean extension;
        private final int contributingSourcesCount;
        private final boolean marker;
        private final int payloadType;
        private final int sequenceNumber;
        private final int timestamp;
        private final int synchronizationSourceId;
        private final byte[] payload;
    }
    

    헤더 확인


    만약 나와 마찬가지로 바이너리를 유창하게 말하지 못한다면, 필요한 정보를 얻기 위해 위치를 어떻게 조작하는지에 대한 작은 힌트가 필요하다.
    예를 들어, 우리는 데이터 패키지의 버전을 얻으려고 한다.우리는 이 필드에서 패키지의 첫 번째 바이트의 두 번째 바이트를 찾는 것을 안다.그러나 첫 번째 바이트에는 세 개의 다른 필드가 있다.그럼 저희가 어떻게 이 버전을 얻을 수 있을까요?AND 및 Shift를 사용하여 비트별 작업을 수행합니다.
    첫 번째 바이트가 10010010라고 가정해 보세요.맨 왼쪽의 두 자리만 얻기 위해서, 우리는 다른 여섯 자리를 지워야 한다.이를 위해 AND 작업을 사용할 수 있습니다. 여기서 삭제할 위치와 같은 위치의 위치는 0입니다.
             10010010
         AND 11000000
         ------------
             10000000
    
    기억해라: 0을 더한 결과는 모두 0이다.지금 우리는 그것들을 0으로 돌리지만, 우리가 원하는 위치는 여전히 가장 왼쪽 위치에 있다.이제 오른쪽으로 6번 이동합니다.
             10000000
             --------
        (1x) 01000000
        (2x) 00100000
        (3x) 00010000
        (4x) 00001000
        (5x) 00000100
        (6x) 00000010
    
    지금 우리는00000010, 혹은 신기한 십진법 중의 2가 있다.이 아이디어를 고려하여 바이트 배열을 RTPPacket 객체로 변환하는 ParsePacket () 방법을 만듭니다.
    public static RTPPacket parsePacket(byte[] packet) {
        return RTPPacket.builder()
            .version((packet[0] & 0b11000000) >>> 6)
            .padding(((packet[0] & 0b00100000) >> 5) == 1)
            .extension(((packet[0] & 0b00010000) >> 4) == 1)
            .contributingSourcesCount(packet[0] & 0b00001111)
            .marker(((packet[1] & 0b10000000) >> 7) == 1)
            .payloadType(packet[1] & 0b01111111)
            .sequenceNumber(ByteBuffer.wrap(packet, 2, 2).getShort())
            .timestamp(ByteBuffer.wrap(packet, 4, 4).getInt())
            .synchronizationSourceId(ByteBuffer.wrap(packet, 8, 4).getInt())
            .payload(Arrays.copyOfRange(packet, 12, packet.length))
            .build();
    }
    

    UDP 인바운드 수신기


    RTP 이해에만 관심이 있으므로 Spring이 UDP 복잡성을 처리합니다.Spring has support for TCP and UDP 포트 1111에 UDP 인바운드 수신기를 생성하는 방법은 다음과 같습니다.
    IntegrationFlows.from(new UnicastReceivingChannelAdapter(11111))
        .handle(new RTPMessageHandler(rtpManager))
        .get();
    
    RTPMessageHandler 클래스 확장Spring’s AbstractMessageHandler
  • RTP 패킷 확인
  • RTP 관리자에게 전달
  • public class RTPMessageHandler extends AbstractMessageHandler {
        private final RTPManager rtpManager;
    
        @Override
        protected void handleMessageInternal(Message<?> message) {
            RTPPacket packet = parsePacket((byte[]) message.getPayload());
            rtpManager.onPacketReceived(packet);
        }
    }
    
    RTPManager UDP를 사용하는 RTP 수신기의 매우 기본적인 부분인 패킷 재배열을 책임진다.

    패킷 순서재정리


    RTP는 TCP(일반적이지 않음) 또는 UDP(일반적)를 통해 사용할 수 있습니다.전송 프로토콜이 제공할 수 있는 전달 순서의 정확성에 의존하지 않기 때문에 데이터 패키지를 정렬하는 방법이 필요하다.RTP 헤드에 일련 번호가 있는 이유입니다.미디어 흐름을 재생하는 RTP 수신기는 수신된 패키지를 다시 정렬해야 합니다. 그렇지 않으면 미디어의 소리/외관이 좋지 않습니다.우리의 예시에서, 우리는 그것을 재생하지 않지만, 웹소켓 사용자는 재생할 수 있기 때문에, 우리는 그것들을 위해 데이터 패키지를 다시 정렬해야 한다.RTPManager 두 가지 일을 맡는다.
  • 웹소켓
  • 을 통해 패키지를 보내기 전에 패키지를 다시 정렬합니다
  • SSRC id별로 전송된 패킷 그룹화
  • public class RTPManager {
        private final Map<Integer, SyncSourceStatus> syncSources = new HashMap<>();
    
        public synchronized void onPacketReceived(RTPPacket packet) {
            if (syncSources.containsKey(packet.getSynchronizationSourceId())) {
                SyncSourceStatus status = syncSources.get(packet.getSynchronizationSourceId());
                synchronized (status.getLock()) {
                    status.addPacket(packet);
                    if (status.getPackets().size() > MAX_PACKETS_BEFORE_FLUSHING) {
                        status.flush();
                    }
                }
            } else {
                syncSources.put(packet.getSynchronizationSourceId(), SyncSourceStatus.builder()
                        .syncSourceId(packet.getSynchronizationSourceId())
                        .packets(new ArrayList<>(List.of(packet)))
                        .webSocket(initializeSocket())
                        .lock(new Object())
                        .build());
            }
        }
    }
    
    여기서 나는 SyncSourceStatus라는 다른 종류를 사용하여 내가 이 일을 완성하는 것을 돕는다.
    static class SyncSourceStatus {
        private int syncSourceId;
        private List<RTPPacket> packets;
        private Socket webSocket;
        private final Object lock;
    
        public void addPacket(RTPPacket packet) {
            packets.add(packet);
        }
    
        public void flush() {
            packets = packets.stream().sorted().collect(Collectors.toList());
            packets.forEach(packet -> webSocket.send(packet.getPayload()));
            packets = new ArrayList<>();
        }
    }
    
    SyncSourceStatus.flush() 웹소켓을 통해 패키지를 보내기 전에 패키지를 정렬하기 때문에 RTPPacket실현Comparable해야 합니다.
    public class RTPPacket implements Comparable<RTPPacket> {
        // Fields and getters
    
        @Override
        public int compareTo(RTPPacket o) {
            return Integer.compare(getSequenceNumber(), o.getSequenceNumber());
        }
    }
    

    모든 것을 꺼내다


    WebSocket 서버와 클라이언트에 대해서는 socket을 사용합니다.이오.Google 서버는 웹소켓 흐름의 모든 내용을 가져와 파일에 저장합니다.
    const http = require('http').Server();
    const fs = require('fs');
    const io = require('socket.io')(http);
    
    io.on('connection', (socket) => {
      const wstream = fs.createWriteStream('/tmp/audio-from-ffmpeg');
    
      socket.on('disconnect', () => {
        wstream.end();
      });
    
      socket.on('message', msg => {
        wstream.write(Buffer.from(msg));
      });
    });
    
    http.listen(4010, () => {
      console.log('listening on *:4010');
    });
    
    ffmpeg를 사용하여 RTP 스트림을 생성하고 RTP 수신기는 다음과 같은 스트림을 사용합니다.
    ffmpeg \
        -re \
        -i media/pcm_s16le-44100hz-s16-10s.wav \
        -c:a copy \
        -f rtp \
        "rtp://127.0.0.1:11111"
    
    ffmpeg 명령이 종료되면 서버 쓰기 /tmp/audio-from-ffmpeg 의 오디오를 확인하고 재생이 잘 되는지 확인할 수 있습니다.

    순서재정리 또는 순서재정리 안 함


    패킷 순서재정리의 중요성을 설명하려면 다음과 같이 하십시오.
  • 패킷을 통해 복제된 미디어 스트림 순서 재정리: https://lucaspin.github.io/public/media/reordering-yes.ogg
  • 패킷을 순서재정리하지 않고 복제하는 미디어 흐름: https://lucaspin.github.io/public/media/reordering-no.ogg
  • 차이가 많이 나죠?RTP 수신기는 도착한 패킷을 전송한 순서대로 배치하는 방법이 필요하기 때문에 일련 번호 필드가 필요한 이유입니다.
    오늘 여기까지 읽어주셔서 감사합니다!

    좋은 웹페이지 즐겨찾기