websocket 및 grpc 대기 시간 비교 메모

19237 단어 websocketgRPC

배경



음성을 클라이언트에서 서버로 스트리밍한다는 이슈가 있어 조사를 하고 있었습니다.
스트리밍의 방법으로서 websocket 과 grpc 를 후보로 하고 있어 이번은, 처리의 레이턴시로 각각을 비교해 보았으므로 그 메모입니다.

실험



절차


  • 클라이언트에서 의사 음성 보내기
  • 100ms마다 길이 3200의 바이트 배열을 보낸다
  • 실제 내용에는 생성된 타임스탬프가 포함됩니다

  • 서버는 전송 된 바이트 배열을 문자로 반환합니다.
  • 클라이언트에서 반환 된 데이터의 타임 스탬프를보고 왕복에 몇 초가 걸렸는지보십시오.

    환경


  • 클라이언트: 회사의 Macbook Pro
  • 수는 1대

  • 서버: AWS@TokyoRegion

  • 결과



    우선 GRPC/WebSocket 각각 30 초 측정했습니다. (GRPC/WebSocket)
    결과 WebSocket만 일부 시간에 묘하게 시간이 걸렸기 때문에 그 후 각각 60초 다시 했습니다. (GRPC2/WebSocket2)


    GRPC/WebSocket 각각 늦어진 시간을 잘라내어 그래프화하면 아래와 같이 됩니다.


    이 상태의 평균은
  • GRPC: 0.012sec
  • WebSocket: 0.016sec

  • 정도인 것 같습니다.
    별로 큰 데이터로 실험하지 않았거나 여러 연결을 했을 때 어떻게 될지 실험은 할 수 없지만, 이 실험 설정의 경우는 전체적으로 GRPC 쪽이 다소 빠르고, 또한 대기 시간의 진폭도 마음 밖에 GRPC 쪽이 안정되어 있는 것처럼 보입니다.

    실험에 사용한 코드



    (을 적절히 공개할 수 있도록 깎은 것)
    MicrophoneStream 은 Google Speech to Text 코드 상당의 것입니다.
    그렇다고 해도 0.1초에 1회 처리가 돌아 오는 것만으로 좋기 때문에 time.sleep(0.1) 를 루프 안에 넣는 것만으로도 좋을지도 모릅니다.

    GRPC



    client
    import os
    import time
    import grpc
    from myproject.grpc import voice_rpc_pb2, voice_rpc_pb2_grpc
    
    HOST = 'XXXXXX'
    PORT = 50051
    
    
    class VoiceClient:
        def data_generator(self):
            sample_rate = 16000
            chunk = int(sample_rate / 10)  # 100ms
            with MicrophoneStream(sample_rate, chunk) as stream:
                for wave in stream.generator():
                    wave = str(time.time())
                    wave = wave + '|' + '-' * (3200 - len(wave) - 1)
                    wave = wave.encode('utf-8')
                    yield voice_rpc_pb2.SendVoiceRequest(wave=wave)
    
        def run(self):
            with grpc.insecure_channel('{}:{}'.format(HOST, PORT)) as channel:
                stub = voice_rpc_pb2_grpc.VoiceRPCStub(channel)
                response_iter = stub.SendVoice(self.data_generator())
                for response in response_iter:
                    send_time = float(response.text.split('|')[0])
                    print(time.time() - send_time)
    
    
    if __name__ == '__main__':
        VoiceClient().run()
    
    import time
    import grpc
    from concurrent import futures
    from myproject.grpc import voice_rpc_pb2, voice_rpc_pb2_grpc
    
    _ONE_DAY_IN_SECONDS = 60 * 60 * 24
    
    
    class VoiceRPC(voice_rpc_pb2_grpc.VoiceRPCServicer):
        def SendVoice(self, request_iterator, context):
            for request in request_iterator:
                wave = request.wave
                yield voice_rpc_pb2.SendVoiceResponse(
                    text=wave.decode('utf-8'),
                )
    
    def serve():
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        voice_rpc_pb2_grpc.add_VoiceRPCServicer_to_server(VoiceRPC(), server)
        server.add_insecure_port('[::]:50051')
        server.start()
        try:
            while True:
                time.sleep(_ONE_DAY_IN_SECONDS)
        except KeyboardInterrupt:
            server.stop(0)
    
    
    if __name__ == '__main__':
        serve()
    

    WebSocket



    client
    import websocket
    import time
    from multiprocessing import Process
    
    URL = 'ws://XXXXX:12345/voice'
    
    class VoiceClient:
        def run(self):
            self.ws = websocket.WebSocketApp(
                URL,
                on_open=self.on_open,
                on_message=self.on_message,
            )
    
            try:
                self.ws.run_forever()
            except KeyboardInterrupt:
                self.ws.close()
    
        def listen_voice(self):
            sample_rate = 16000
            chunk = int(sample_rate / 10)  # 100ms
            with MicrophoneStream(sample_rate, chunk) as stream:
                for voice_data in stream.generator():
                    wave = str(time.time())
                    wave = wave + '|' + '-' * (3200 - len(wave) - 1)
                    wave = wave.encode('utf-8')
                    self.ws.send(wave, opcode=websocket.ABNF.OPCODE_BINARY)
    
        def on_open(self):
            self.listen_process = Process(target=self.listen_voice)
            self.listen_process.start()
    
        def on_message(self, message):
            send_time = float(message.split('|')[0])
            print(time.time() - send_time)
    
    
    if __name__ == '__main__':
        VoiceClient().run()
    

    server
    from flask import Flask
    from flask_sockets import Sockets
    from gevent import pywsgi
    from geventwebsocket.handler import WebSocketHandler
    
    
    if __name__ == '__main__':
        app = Flask(__name__)
        sockets = Sockets(app)
    
    
    @sockets.route('/voice')
    def voice(ws):
        while not ws.closed:
            wave = ws.receive()
            ws.send(wave.decode('utf-8'))
    
    
    if __name__ == '__main__':
        wsgiserver = pywsgi.WSGIServer(('0.0.0.0', 12345), app, handler_class=WebSocketHandler)
        wsgiserver.serve_forever()
    

    좋은 웹페이지 즐겨찾기