Postgres의 비동기 알림

저는 Postgres에 매료되었습니다. 그것에 대해 더 많이 배울수록 제가 아직도 모르는 것이 얼마나 많은지 더 많이 깨닫게 됩니다. 최근에 나는 asynchronous communication capabilities 을 발견했는데, 분명히 오랫동안 주변에 있었습니다 ¯\(ツ)/¯

이 항목과 관련된 두 가지 가장 흥미로운 명령인 NOTIFY LISTEN 을 살펴보겠습니다. 다음은 문서에서 설명하는 내용입니다.

NOTIFY provides a simple interprocess communication mechanism for a collection of processes accessing the same PostgreSQL database. A payload string can be sent along with the notification, and higher-level mechanisms for passing structured data can be built by using tables in the database to pass additional data from notifier to listener(s).

Whenever the command NOTIFY channel is invoked, either by this session or another one connected to the same database, all the sessions currently listening on that notification channel are notified, and each will in turn notify its connected client application.

LISTEN registers the current session as a listener on the notification channel named channel. If the current session is already registered as a listener for this notification channel, nothing is done.



데이터베이스 수준에서 게시-구독하는 것처럼 들립니다. 흥미롭습니다! 여러 가지를 시도해 보고 코드를 작성함으로써 가장 잘 배울 수 있으므로 본격적으로 살펴보겠습니다.

알림을 위한 Postgres 설정



테스트 목적으로 기본 키를 제외하고 주문한 사람을 식별하기 위한 이메일 주소와 총 주문 금액을 센트 단위로 저장하기 위한 orders 필드를 포함하는 지나치게 단순화된 bigint 테이블을 생성해 보겠습니다.

CREATE TABLE orders (
  id SERIAL PRIMARY KEY,  
  email TEXT NOT NULL,
  total BIGINT NOT NULL
);


다음으로 트리거를 반환하는 함수를 정의해야 합니다.

CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$
  DECLARE 
    record RECORD;  
    payload JSON;
  BEGIN
    IF (TG_OP = 'DELETE') THEN
      record = OLD;
    ELSE
      record = NEW;
    END IF;

    payload = json_build_object('table', TG_TABLE_NAME, 
                                'action', TG_OP, 
                                'data', row_to_json(record));

    PERFORM pg_notify('events', payload::text);

    RETURN NULL; 
  END;    
$$ LANGUAGE plpgsql;


위의 내용은 매우 간단합니다.
  • 나중에 사용하기 위해 일부 변수를 선언합니다.
  • 직렬화할 행의 버전을 결정하려면 TG_OP special variable을 켭니다.
  • json_build_object and row_to_json 을 사용하여 알림 페이로드를 생성합니다.
  • pg_notify 을 사용하여 events 채널에서 메시지를 브로드캐스트합니다.
  • NULL 트리거이므로 AFTER를 반환합니다.

  • 이제 notify_order_event 테이블에서 CRUD 작업을 수행한 후 이 함수를 호출하는 orders 트리거를 만들 수 있습니다.

    CREATE TRIGGER notify_order_event
    AFTER INSERT OR UPDATE OR DELETE ON orders
      FOR EACH ROW EXECUTE PROCEDURE notify_event();
    


    이를 통해 이제 이벤트를 수신할 수 있습니다. events 채널의 알림에 관심이 있음을 Postgres에 알립니다.

    LISTEN events;
    


    이제 레코드를 삽입, 업데이트 또는 삭제할 때마다 알림을 받게 됩니다.

    INSERT into orders (email, total) VALUES ('[email protected]', 10000);
    INSERT 0 1
    Asynchronous notification "events" with payload "{"table" : "orders", "action" : "INSERT", "data" : {"id":1,"email":"test@example.com","total":10000}}" received from server process with PID 5315.
    


    좋습니다. 방금 첫 번째 비동기 알림을 받았지만 동일한 세션psql 내에서 특히 유용하지 않으므로 다른 리스너를 추가하겠습니다.

    다른 프로세스에서 듣기



    다음 예에서는 Jeremy Evan의 우수Sequel gem를 사용합니다.

    require 'sequel'
    
    DB = Sequel.connect('postgres://user@localhost/notify-test')
    
    puts 'Listening for DB events...'
    DB.listen(:events, loop: true) do |_channel, _pid, payload| 
      puts payload
    end
    


    위의 코드는 먼저 데이터베이스에 연결한 다음 Sequel::Postgres::Database#listen 을 사용하여 루프에서 이벤트를 수신합니다.

    이 스크립트를 시작하고 데이터베이스에 레코드를 삽입하면 JSON 페이로드가 콘솔에 출력됩니다.

    → ruby test.rb
    Listening for DB events...
    {"table" : "orders", "action" : "INSERT", "data" : {"id":2,"email":"[email protected]","total":10000}}
    


    훌륭하지만 여전히 그다지 유용하지는 않습니다. 이 알림을 웹 브라우저에 전달하여 대시보드 같은 것을 만들 수 있다면 좋지 않을까요? 웹 소켓을 입력하십시오.

    프런트엔드 연결



    이것은 완전한 앱이 아니라 간단한 데모일 뿐이므로 Rails + ActionCable이 과할 것 같아서 대신 Rack을 사용하여 매우 간단한 Faye 앱을 작성하기로 결정했습니다.

    require 'faye/websocket'
    require 'sequel'
    
    DB = Sequel.connect('postgres://user@localhost/notify-test')
    
    App = lambda do |env|
      ws = Faye::WebSocket.new(env)
    
      DB.listen(:events, loop: true) do |_channel, _pid, payload|
        ws.send(payload)
        ws.rack_response
      end
    end
    


    모든 Rack "앱"과 마찬가지로 이것은 call 를 지원하는 객체일 뿐입니다. 이 경우에는 a lambda 입니다. 여기에서 Postgres에서 받은 메시지를 브라우저로 전달하는 데 사용하는 웹 소켓을 만듭니다. 거기에서 그것들을 받으려면 페이로드를 콘솔에 기록하는 Websocket 처리기로 onmessage 을 인스턴스화하기만 하면 됩니다.

    socket = new WebSocket("ws://localhost:9292")
    socket.onmessage = (event) => console.log(event.data)
    


    이제 orders 테이블에서 새 레코드를 생성, 수정 또는 삭제할 때마다 브라우저에 알림이 표시됩니다.



    요약



    Postgres' LISTEN NOTIFY 은 게시-구독 패턴을 통해 손쉬운 프로세스 간 통신을 제공합니다. ETL을 통한 로깅에서 실시간 대시보드에 이르기까지 이 메커니즘에 대한 많은 잠재적 사용 사례가 있습니다.

    좋은 웹페이지 즐겨찾기