PostgreSQL 흐름 처리 응용 실천 - 중고 상품 실시 간 분류 (비동기 메시지 notify / listen, 읽 은 후 불 태 우기)...

13558 단어
라벨
PostgreSQL, rule, trigger, 분류, json, udf, 비동기 메시지, listen, notify
배경
중고 상품 은 이벤트, 경성 분류, 광고 등의 활동 이 많 지 않 아 신규 상품 만큼 구 매 나 판매 속도 가 빠 르 지 않다.중고 상품 의 판매 효율 을 높이 기 위해 서 는 분류 전략 을 제공 해 야 한다.
상품 이 새로 증가 하거나 상품 내용 이 변화 할 때 상품 속성 과 정 의 된 규칙 에 따라 실시 간 으로 상품 분류 (양어장, 범위 등) 를 하여 사용자 의 조 회 를 편리 하 게 해 야 한다.
구조 설계
1. 상품 ID, 속성
create table a (      
  id int8 primary key,   --   ID      
  att jsonb   --           
);      

속성 은 JSON 으로 설계 되 었 습 니 다. JSON 안 에는 K - V 의 속성 이 있 고 V 안 에는 배열 이 있 으 며 K 의 값 과 이 속성 을 포함 한 마지막 업데이트 시간 입 니 다.
업데이트 시간 은 merge insert 에 사 용 됩 니 다. 속성 이 변 했 을 때 만 업데이트 되 고 변화 가 없 을 때 업데이트 되 지 않 습 니 다.
그래서 제 이 슨 은 옮 겨 다 니 며 합병 처 리 를 해 야 한다.
JSON 속성 을 합 친 UDF
create or replace function merge_json(jsonb, jsonb) returns jsonb as $$    
  select jsonb_object_agg(key,value) from (    
  select     
    coalesce(a.key, b.key) as key,     
    case     
    when     
    coalesce(jsonb_array_element(a.value,1)::text::timestamp, '1970-01-01'::timestamp)     
    >     
    coalesce(jsonb_array_element(b.value,1)::text::timestamp, '1970-01-01'::timestamp)     
    then a.value    
    else b.value    
    end    
  from jsonb_each($1) a full outer join jsonb_each($2) b using (key)    
  ) t;      
$$ language sql strict ;    
    
    
postgres=# select merge_json('{"price":[10000, "2018-01-01 10:10:11"], "newatt":[120, "2017-01-01 12:22:00"]}',  '{"price":[8880, "2018-01-04 10:10:12"], "count":[100, "2017-01-01 10:10:00"]}');    
                                                       merge_json                                                            
-------------------------------------------------------------------------------------------------------------------------    
 {"count": [100, "2017-01-01 10:10:00"], "price": [8880, "2018-01-04 10:10:12"], "newatt": [120, "2017-01-01 12:22:00"]}    
(1 row)    

트리거 디자인
트리거 안에 분류 규칙 을 정의 합 니 다. 예 를 들 어 여기 서 가격 이 100 이상 인 상품 에 대해 메 시 지 를 뱉 습 니 다.
CREATE OR REPLACE FUNCTION notify1() returns trigger      
AS $function$      
declare      
begin      
  if jsonb_array_element(NEW.att->'price', 0)::text::float8 > 100 then   --   1,     100,            
     perform pg_notify(      
       'a',    --               
       format('CLASS:high price, ID:%s, ATT:%s', NEW.id, NEW.att)   --           
     );      
  -- elsif ... then            
  -- else            
  end if;      
return null;      
end;      
$function$ language plpgsql strict;      

after insert 또는 update 트리거 생 성
create trigger tg1 after insert or update on a for each row execute procedure notify1();      

기타 트리거 (규칙 설계 방법)
본문 미사 용
CREATE OR REPLACE FUNCTION notify1() returns trigger      
AS $function$      
declare      
begin      
  for key,value in select key, jsonb_array_element(value, 0)::text from jsonb_each(NEW.att)  --     JSONB    
  loop    
    --         
    -- if key='price' then ...; end if;    
    -- if key='count' then ...; end if;    
  end loop;    
return null;    
end;    
$function$ language plpgsql strict;      
--          
    
create table tbl_rule (    
  key text,  -- key     
  exp text,  -- value           
  class text,  --   exp ,          
)    
    
CREATE OR REPLACE FUNCTION notify1() returns trigger      
AS $function$      
declare      
begin      
  for key,value in select key, jsonb_array_element(value, 0)::text from jsonb_each(NEW.att)  --     JSONB    
  loop    
    --   tbl_rule        ,      
  end loop;    
return null;    
end;    
$function$ language plpgsql strict;      

규칙 설명
json 속성 쌍 중, value 의 유형 이 많 을 수 있 으 며, 서로 다른 규칙 적 의미 에 대응 할 수 있 습 니 다.
1. 텍스트 라 이 크
2. 배열 IN
3. 등가
4. 수치 범위
5. 시간 범위
잠시 만 요. trigger 의 UDF 에 규칙 을 쓰 면 됩 니 다.
데이터 병합 기록 테스트
insert into a values       
  (1, '{"price":[10000, "2018-01-01 10:10:11"]}')       
  on conflict (id)       
  do update set       
  att = merge_json(a.att, excluded.att)  --      ,     ,      UDF         
  where       
  a.att <> merge_json(a.att, excluded.att);  --          ,         ,   CPU     
    
    
postgres=# insert into a values    
  (1, '{"price":[1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}')    
  on conflict (id)    
  do update set    
  att = merge_json(a.att, excluded.att)  --      ,     ,      UDF       
  where    
  a.att <> merge_json(a.att, excluded.att);   --          ,         ,   CPU    
INSERT 0 1    
    
    
postgres=# select * from a;    
 id |                                     att                                         
----+-----------------------------------------------------------------------------    
  1 | {"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}    
(1 row)    

소식 을 감청 하 다
postgres=# listen a;      
LISTEN      
Asynchronous notification "a" with payload "ID:1, ATT:{"price": [10000, "2018-01-01 10:10:19"]}" received from server process with PID 51380.      

https://jdbc.postgresql.org/documentation/head/listennotify.html
기타
상품 을 삭제 하면 DELETE 트리거 를 사용 하여 하류 에 알려 줄 수 있 습 니 다. 예 를 들 어 상품 이 거래 되 었 고 삭제 되 었 습 니 다.
CREATE OR REPLACE FUNCTION notify2() returns trigger      
AS $function$      
declare      
begin      
     perform pg_notify(      
       'a',                                                     --               
       format('CLASS:delete, ID:%s, ATT:%s', OLD.id, OLD.att)   --           
     );      
return null;      
end;      
$function$ language plpgsql strict;      
    
create trigger tg2 after delete on a for each row execute procedure notify2();      

방안 2 - 흐름 식 대량 소비
비동기 메 시 지 를 사용 하 는 방식 으로 연결 이 끊 겼 을 때 다시 연결 한 후 다시 감청 해 야 하 며 연결 이 끊 긴 동안 의 메 시 지 는 버 려 집 니 다.그래서 신뢰성 이 떨 어 집 니 다.
또 비동기 메 시 지 는 한 번 에 몇 개 를 소비 하 는 지 통제 할 수 없고, 특별히 우호 적 이지 도 않다.
그래서 우 리 는 실제로 지구 화 표를 사용 하고 비동기 적 으로 대량으로 소비 하 는 방식 으로 소 비 를 하 는 다른 방법 도 있다.
성능 지표:
CASE
데이터 양
병발 하 다
TPS
평균 응답 시간
흐름 식 처리 - 열람 후 소각 - 소비
10 억, 395.2 만 줄 / s 소비
56
3952
14 밀리초
구 조 는 앞의 예 를 따른다.
1. 결과 표 한 장 추가 (표 한 장 을 새로 늘 릴 수도 있 고 업 무량 을 볼 수도 있 으 며 보통 한 장 이면 충분 합 니 다).
2. 트리거 내용 을 수정 하고 notify 를 작성 표 로 바 꿉 니 다.
3. 클 라 이언 트 를 수정 하여 감청 채널 을 비동기 소비 SQL 로 변경 합 니 다.
DEMO
1. 결과 표 추가
create table t_result(id serial8 primary key, class text, content text);    

2. 트리거 안에 분류 규칙 을 정의 합 니 다. 예 를 들 어 여기 서 가격 이 100 이상 인 상품 에 대해 정 보 를 결과 표 에 뱉 습 니 다.
CREATE OR REPLACE FUNCTION notify1() returns trigger      
AS $function$      
declare      
begin      
  if jsonb_array_element(NEW.att->'price', 0)::text::float8 > 100 then   --   1,     100,           
     insert into t_result(class,content) values (    
       'a',    --       
       format('CLASS:high price, ID:%s, ATT:%s', NEW.id, NEW.att)   --           
     );    
  -- elsif ... then            
  -- else            
  end if;      
return null;      
end;      
$function$ language plpgsql strict;      

3. after insert 또는 update 트리거 생 성
create trigger tg1 after insert or update on a for each row execute procedure notify1();      

4. 데이터 통합 기록 테스트
insert into a values       
  (1, '{"price":[10000, "2018-01-01 10:10:11"]}')       
  on conflict (id)       
  do update set       
  att = merge_json(a.att, excluded.att)  --      ,     ,      UDF         
  where       
  a.att <> merge_json(a.att, excluded.att);   --          ,         ,   CPU    
    
    
postgres=# insert into a values    
  (1, '{"price":[1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}')    
  on conflict (id)    
  do update set    
  att = merge_json(a.att, excluded.att)  --      ,     ,      UDF       
  where    
  a.att <> merge_json(a.att, excluded.att); --          ,         ,   CPU     
    
INSERT 0 1    
    
postgres=# select * from a;    
 id |                                     att                                         
----+-----------------------------------------------------------------------------    
  1 | {"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}    
(1 row)    

5. 비동기 대량 소비 결과 표 의 내용 (열람 후 소각)
with a as (delete from t_result where ctid= any(array(     
  select ctid from t_result order by id limit 10 for update skip locked  --       ,      ,              
)) returning *)    
select * from a;    
 id | class |                                                 content                                                     
----+-------+---------------------------------------------------------------------------------------------------------    
  1 | a     | CLASS:high price, ID:1, ATT:{"price": [10000, "2018-01-01 10:10:11"]}    
  2 | a     | CLASS:high price, ID:1, ATT:{"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}    
(2 rows)    
    
    
    ,    ,             
    
postgres=# select * from t_result;    
 id | class | content     
----+-------+---------    
(0 rows)    

프로젝트 2 속 - row 급 트리거 대신 statement 급 트리거 사용
왜 row 급 트리거 대신 statement 급 트리거 를 사용 하 는 것 을 권장 합 니까? 참고:
< PostgreSQL 대량, 단일 쓰기 - row, statement 트리거 (중간 표), CTE 몇 가지 용법 성능 비교 >
트리거 함수 수정 은 다음 과 같 습 니 다.
CREATE OR REPLACE FUNCTION notify1() returns trigger        
AS $function$        
declare        
begin        
  --   1  
  insert into t_result(class,content) select   
    'a',    --       
    format('CLASS:high price, ID:%s, ATT:%s', id, att)   --          
  from new_table   
  where jsonb_array_element(att->'price', 0)::text::float8 > 100;    --   1,     100,         
    
  --       
  -- insert into t_result(class,content) select   
  -- ......  
  --   from new_table   
  -- where ...  --   n  
    
  return null;        
end;        
$function$ language plpgsql strict;      

트리거 수정 은 다음 과 같 습 니 다.
create trigger tg1 after insert on a REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure notify1();      
create trigger tg2 after update on a REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure notify1();      
postgres=# \d a  
                 Table "public.a"  
 Column |  Type   | Collation | Nullable | Default   
--------+---------+-----------+----------+---------  
 id     | integer |           | not null |   
 att    | jsonb   |           |          |   
Indexes:  
    "pk" PRIMARY KEY, btree (id)  
Triggers:  
    tg1 AFTER INSERT ON a REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT EXECUTE PROCEDURE notify1()  
    tg2 AFTER UPDATE ON a REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT EXECUTE PROCEDURE notify1()  

작은 매듭
비동기 메시지, UDF, 규칙 또는 트리거 를 사용 하여 실시 간 계산 문 제 를 매우 경 량 화 했다.
그러나 비동기 메 시 지 는 메 시 지 를 잃 어 버 릴 수 있 습 니 다. 예 를 들 어 감청 연결 이 끊 긴 후 다시 연결 할 때 감청 을 다시 시작 하고 연결 을 중단 할 때 소비 되 지 않 은 메 시 지 는 더 이상 소비 되 지 않 기 때문에 메 시 지 를 잃 어 버 린 셈 입 니 다.
개선 방법:
1. 메 시 지 를 잃 어 버 리 지 않도록 하려 면 notify 를 INSERT 로 바 꾸 고 결 과 를 미리 정 의 된 결과 표 에 기록 하 며 논리 적 DECODE 방식 으로 이 결과 표 와 관련 된 logical decode 정 보 를 분석 하여 변 화 량 을 얻 을 수 있 습 니 다. 아래 를 참고 하 십시오.
< PostgreSQL pg recvlogical 과 test decoding 사용자 정의, source table filter 지원, kafka, es 연결 등 >
2. 열람 후 소각 하 는 방법 을 사용 하여 본 방안 과 유사 2.
《 아 리 클 라 우 드 RDS PostgreSQL varbitx 실천 - 스 트림 태그 (읽 은 후 분류 식 대량 계산) - 조 급, 임 의 태그 링 사람, 밀리초 응답 》
《 HTAP 데이터베이스 PostgreSQL 장면 과 성능 테스트 의 32 - (OLTP) 고 삼 키 기 데이터 출입 (퇴적, 청소, 색인 필요 없 음) - 읽 은 후 소각 (JSON + 함수 흐름 식 계산) 》
《 HTAP 데이터베이스 PostgreSQL 장면 과 성능 테스트 의 31 - (OLTP) 고 삼투 데이터 드 나 들 기 (퇴적, 스 캔, 색인 필요 없 음) - 읽 은 후 소각 (읽 기 및 쓰기 대 삼투 및 측정) 》
《 HTAP 데이터베이스 PostgreSQL 장면 과 성능 테스트 의 27 - (OLTP) 사물 인터넷 - FEED 로그, 흐름 처리 와 열람 후 소각 (CTE) 》
< PostgreSQL 에서 update | delete limit - CTID 스 캔 실천 실현 (고 효능 후 소각) >
레 퍼 런 스
https://www.postgresql.org/docs/11/static/functions-json.html
https://www.postgresql.org/docs/11/static/datatype-json.html
https://jdbc.postgresql.org/documentation/head/listennotify.html
https://www.postgresql.org/docs/11/static/sql-notify.html
https://www.postgresql.org/docs/11/static/sql-listen.html
https://www.postgresql.org/docs/11/static/sql-unlisten.html
https://www.postgresql.org/docs/11/static/libpq-notify.html
https://www.postgresql.org/docs/11/static/sql-notify.html#id-1.9.3.157.7.5
https://www.postgresql.org/docs/11/static/functions-info.html
https://www.postgresql.org/docs/11/static/plpgsql-trigger.html
https://github.com/impossibl/pgjdbc-ng
https://www.openmakesoftware.com/postgresql-listen-notify-events-example/

좋은 웹페이지 즐겨찾기