실시간 협동 드로잉(섹션 4): Redis PubSub+WebRTC 명령

실시간 통신, 개방형 연결 및 피어 간의 라우팅이 필요한 모든 시스템에서 이러한 문제가 발생할 수 있습니다. 즉, 모든 연결이 단일 서버에서 실행될 수 있는 것은 아닙니다.반대로 우리가 해야 할 일은 시스템을 구축하는 것이다. 이 시스템은 메시지 루트를 임의의 수량의 연결을 유지하는 임의의 수량의 서버에 연결할 수 있다.
이전 글에서 우리는 최근에 연결을 유지하고 열어 사용server sent events으로 재구성했다.그러나 만약에 우리가 다른 웹 서버를 도입하여 부하 균형을 만들면 클라이언트 연결이 서버를 뛰어넘어 접근할 수 없다는 문제에 직면하게 될 것이다.
우리는 공유 통신 서버/집단을 가지고 이 문제를 해결할 수 있으며, 이 서버/집단은 우리를 위해 이 모든 메시지를 처리할 수 있다.이를 위해 우리는 publisher-subscriber 모델을 사용하고 redis 을 이용하여 이 일을 완성할 것이다.

Redis 회사


Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache and message broker. It supports data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, hyperloglogs, geospatial indexes with radius queries and streams. Redis has built-in replication, Lua scripting, LRU eviction, transactions and different levels of on-disk persistence, and provides high availability via Redis Sentinel and automatic partitioning with Redis Cluster


Redis는 믿을 수 없는 프로젝트입니다. extremely fast 그리고 가장 적은 cpu를 사용합니다.버전 1 이래로 이 프로젝트는 줄곧 뒤로 호환되는 예술품이다.유지보수 요원들은 여러 해 동안 이 프로젝트를 만들었고 이를 정말 믿기지 않는 것으로 만들었다.Redis는 거의 모든 데이터 구조를 최고의 기능과 조작으로 지원합니다.
  • string manipulation
  • hashes
  • hyperloglog
  • sets
  • sorted sets
  • geospatial indexes
  • streams
  • pubsub
  • 그것은 심지어 지원된다 cluster.last-write-wins CRDT로 사용할 수도 있습니다.내 업무에서, 우리는 대기열, 초로그, 정렬 집합, 캐시의 모든 기능을 사용했다.이전의 프로젝트에서 나는redis를 사용하여 roshi모델을 사용하여 click stream시스템을 구축한 적이 있다.

    활성 구매 Redis PubSub


    우리는 redis라는 작은 기능을 사용하여 서버의 연결 사이에서 메시지를 공유할 것입니다.redis 서버 설정이 있다고 가정하십시오.우리 그림 프로그램의 의존 항목으로 redis 을 추가해야 합니다.
    npm install --save redis bluebird
    
    우리는 bluebird 을 사용하여 모든redis 클라이언트 기능을 실현할 것이다.이것은 우리가 대량의 리셋이 아니라 pubsub 로 코드를 작성하는 데 도움을 줄 것이다.

    모든 것을 약속하다 / SSE 연결 및 Redis 가입


    생각해 보면, 우리의express 서버는 메모리에 연결과 채널의 캐시만 저장합니다.우리는 먼저 /connect 함수를 업데이트하여redispubsub 클라이언트로부터 받은 메시지를 구독할 것입니다.이를 위해 클라이언트 생성 코드를 업데이트하고 추가합니다redis.createClient.그리고 redis.subscribe('messages:' + client.id) 구독을 통해 받은 특정 고객 id의 메시지입니다.redis.on('message', (channel, message) => ...)를 통해 메시지를 받을 때마다 서버에서 보낸 이벤트 흐름으로 간단하게 보낼 수 있습니다.
    var redis = require('redis');
    var bluebird = require('bluebird');
    bluebird.promisifyAll(redis);
    
    app.get('/connect', auth, (req,res) => {
        if (req.headers.accept !== 'text/event-stream') {
            return res.sendStatus(404);
        }
    
        // write the event stream headers
        res.setHeader('Cache-Control', 'no-cache');
        res.setHeader('Content-Type', 'text/event-stream');
        res.setHeader("Access-Control-Allow-Origin", "*");
        res.flushHeaders();
    
        // setup a client
        let client = {
            id: req.user.id,
            user: req.user,
            redis: redis.createClient(),
            emit: (event, data) => {
                res.write(`id: ${uuid.v4()}\n`);
                res.write(`event: ${event}\n`);
                res.write(`data: ${JSON.stringify(data)}\n\n`);
            }
        };
    
        // cache the current connection until it disconnects
        clients[client.id] = client;
    
        // subscribe to redis events for user
        client.redis.on('message', (channel, message) => {
            let msg = JSON.parse(message);
            client.emit(msg.event, msg.data);
        });
        client.redis.subscribe(`messages:${client.id}`);
    
        // emit the connected state
        client.emit('connected', { user: req.user });
    
        // ping to the client every so often
        setInterval(() => {
            client.emit('ping');
        }, 10000);
    
        req.on('close', () => {
            disconnected(client);
        });
    });
    
    또한 10초 간격으로 클라이언트ping에 간격을 추가했습니다.이것은 완전히 필요하지 않을 수도 있지만, 내가 그것을 추가한 것은 우리의 연결 상태가 어떠한 이유로도 의외로 중단되지 않도록 하기 위해서이다.

    비동기식 / 대기 등가 가입, 등가 신령


    우리가 유일하게 변경해야 할 다른 기능은 한 쌍방이 방에 가입할 때, 한 쌍방이 다른 쌍방으로 신호 메시지를 보낼 때, 그리고 한 쌍방이 서버와 연결을 끊을 때이다.auth,:roomId와 같은 다른 함수는 변하지 않습니다.아래의 연결 함수를 업데이트합시다.주의, 이 클라이언트는 서버와 유니버설 redis 통신을 하는 redis 클라이언트를 추적해야 합니다.
    const redisClient = redis.createClient();
    
    app.post('/:roomId/join', auth, async (req, res) => {
        let roomId = req.params.roomId;
    
        await redisClient.saddAsync(`${req.user.id}:channels`, roomId);
    
        let peerIds = await redisClient.smembersAsync(`channels:${roomId}`);
        peerIds.forEach(peerId => {
            redisClient.publish(`messages:${peerId}`, JSON.stringify({
                event: 'add-peer',
                data: {
                    peer: req.user,
                    roomId,
                    offer: false
                }
            }));
            redisClient.publish(`messages:${req.user.id}`, JSON.stringify({
                event: 'add-peer',
                data: {
                    peer: { id: peerId },
                    roomId,
                    offer: true
                }
            }));
        });
    
        await redisClient.saddAsync(`channels:${roomId}`, req.user.id);
        return res.sendStatus(200);
    });
    
    특정 RoomId의 사용자를 추적하기 위해서, 우리는 를 사용하고 현재 사용자의 채널 집합에 RoomId를 추가할 것입니다.다음은 channels:{roomId} 의 구성원을 찾고 등가 ID를 교체합니다.모든 등가 id에 대해 우리는 현재 사용자가 가입한 등가로 메시지를 효과적으로 전달하고 등가 id를 요청으로 연결합니다.사용자마지막으로, 우리는 우리의 요구를 추가했다.사용자가 redis에서 설정합니다channels:{roomId}.
    다음은 중계 코드를 업데이트합시다.이것은 더욱 간단할 것이다. 왜냐하면 우리가 해야 할 일은 이 대등 id에 메시지를 발표하는 것뿐이기 때문이다.
    app.post('/relay/:peerId/:event', auth, (req, res) => {
        let peerId = req.params.peerId;
        let msg = {
            event: req.params.event,
            data: {
                peer: req.user,
                data: req.body
            }
        };
        redisClient.publish(`messages:${peerId}`, JSON.stringify(msg));
        return res.sendStatus(200);
    });
    

    분리


    Disconnect는 사용자가 있는 방을 정리하고 이 방을 돌아다니며 이 방의 대등점 목록을 얻어야 하기 때문에 더욱 복잡합니다. 그리고 이 방의 모든 대등점에 대등점이 연결이 끊어졌다는 신호를 보내야 합니다.
    async function disconnected(client) {
        delete clients[client.id];
        await redisClient.delAsync(`messages:${client.id}`);
    
        let roomIds = await redisClient.smembersAsync(`${client.id}:channels`);
        await redisClient.delAsync(`${client.id}:channels`);
    
        await Promise.all(roomIds.map(async roomId => {
            await redisClient.sremAsync(`channels:${roomId}`, client.id);
            let peerIds = await redisClient.smembersAsync(`channels:${roomId}`);
            let msg = JSON.stringify({
                event: 'remove-peer',
                data: {
                    peer: client.user,
                    roomId: roomId
                }
            });
            await Promise.all(peerIds.forEach(async peerId => {
                if (peerId !== client.id) {
                    await redisClient.publish(`messages:${peerId}`, msg);
                }
            }));
        }));
    }
    
    redis sets
    성공

    결론


    현재 우리는 Redis PubSub에 대한 지원을 추가했습니다. 우리는 서비스를 임의의 서버 노드로 확장할 수 있습니다. (서로 통신할 수 있는 Redis 서버가 있다면)모든 노드 프로세스의 연결은 열려 있는 상태를 유지하고, 메시지와 채널 통신은redis 루트를 통해 모든 메시지가 적당한 서버를 통해 전송되는 이벤트 흐름을 확보합니다.
    관심 가져주셔서 감사합니다!
    건배!🍻

    코드


    이 시리즈의 코드에 관심이 있다면 GitHub에서 내 저장소를 확인하십시오.

    다시 한 번 감사합니다!

    좋은 웹페이지 즐겨찾기