PostgreSQL 흐름 처리 응용 실천 - 중고 상품 실시 간 분류 (비동기 메시지 notify / listen, 읽 은 후 불 태 우기)...
PostgreSQL, rule, trigger, 분류, json, udf, 비동기 메시지, listen, notify
배경
중고 상품 은 이벤트, 경성 분류, 광고 등의 활동 이 많 지 않 아 신규 상품 만큼 구 매 나 판매 속도 가 빠 르 지 않다.중고 상품 의 판매 효율 을 높이 기 위해 서 는 분류 전략 을 제공 해 야 한다.
상품 이 새로 증가 하거나 상품 내용 이 변화 할 때 상품 속성 과 정 의 된 규칙 에 따라 실시 간 으로 상품 분류 (양어장, 범위 등) 를 하여 사용자 의 조 회 를 편리 하 게 해 야 한다.
구조 설계
1. 상품 ID, 속성
create table a (
id int8 primary key, -- ID
att jsonb --
);
속성 은 JSON 으로 설계 되 었 습 니 다. JSON 안 에는 K - V 의 속성 이 있 고 V 안 에는 배열 이 있 으 며 K 의 값 과 이 속성 을 포함 한 마지막 업데이트 시간 입 니 다.
업데이트 시간 은 merge insert 에 사 용 됩 니 다. 속성 이 변 했 을 때 만 업데이트 되 고 변화 가 없 을 때 업데이트 되 지 않 습 니 다.
그래서 제 이 슨 은 옮 겨 다 니 며 합병 처 리 를 해 야 한다.
JSON 속성 을 합 친 UDF
create or replace function merge_json(jsonb, jsonb) returns jsonb as $$
select jsonb_object_agg(key,value) from (
select
coalesce(a.key, b.key) as key,
case
when
coalesce(jsonb_array_element(a.value,1)::text::timestamp, '1970-01-01'::timestamp)
>
coalesce(jsonb_array_element(b.value,1)::text::timestamp, '1970-01-01'::timestamp)
then a.value
else b.value
end
from jsonb_each($1) a full outer join jsonb_each($2) b using (key)
) t;
$$ language sql strict ;
postgres=# select merge_json('{"price":[10000, "2018-01-01 10:10:11"], "newatt":[120, "2017-01-01 12:22:00"]}', '{"price":[8880, "2018-01-04 10:10:12"], "count":[100, "2017-01-01 10:10:00"]}');
merge_json
-------------------------------------------------------------------------------------------------------------------------
{"count": [100, "2017-01-01 10:10:00"], "price": [8880, "2018-01-04 10:10:12"], "newatt": [120, "2017-01-01 12:22:00"]}
(1 row)
트리거 디자인
트리거 안에 분류 규칙 을 정의 합 니 다. 예 를 들 어 여기 서 가격 이 100 이상 인 상품 에 대해 메 시 지 를 뱉 습 니 다.
CREATE OR REPLACE FUNCTION notify1() returns trigger
AS $function$
declare
begin
if jsonb_array_element(NEW.att->'price', 0)::text::float8 > 100 then -- 1, 100,
perform pg_notify(
'a', --
format('CLASS:high price, ID:%s, ATT:%s', NEW.id, NEW.att) --
);
-- elsif ... then
-- else
end if;
return null;
end;
$function$ language plpgsql strict;
after insert 또는 update 트리거 생 성
create trigger tg1 after insert or update on a for each row execute procedure notify1();
기타 트리거 (규칙 설계 방법)
본문 미사 용
CREATE OR REPLACE FUNCTION notify1() returns trigger
AS $function$
declare
begin
for key,value in select key, jsonb_array_element(value, 0)::text from jsonb_each(NEW.att) -- JSONB
loop
--
-- if key='price' then ...; end if;
-- if key='count' then ...; end if;
end loop;
return null;
end;
$function$ language plpgsql strict;
--
create table tbl_rule (
key text, -- key
exp text, -- value
class text, -- exp ,
)
CREATE OR REPLACE FUNCTION notify1() returns trigger
AS $function$
declare
begin
for key,value in select key, jsonb_array_element(value, 0)::text from jsonb_each(NEW.att) -- JSONB
loop
-- tbl_rule ,
end loop;
return null;
end;
$function$ language plpgsql strict;
규칙 설명
json 속성 쌍 중, value 의 유형 이 많 을 수 있 으 며, 서로 다른 규칙 적 의미 에 대응 할 수 있 습 니 다.
1. 텍스트 라 이 크
2. 배열 IN
3. 등가
4. 수치 범위
5. 시간 범위
잠시 만 요. trigger 의 UDF 에 규칙 을 쓰 면 됩 니 다.
데이터 병합 기록 테스트
insert into a values
(1, '{"price":[10000, "2018-01-01 10:10:11"]}')
on conflict (id)
do update set
att = merge_json(a.att, excluded.att) -- , , UDF
where
a.att <> merge_json(a.att, excluded.att); -- , , CPU
postgres=# insert into a values
(1, '{"price":[1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}')
on conflict (id)
do update set
att = merge_json(a.att, excluded.att) -- , , UDF
where
a.att <> merge_json(a.att, excluded.att); -- , , CPU
INSERT 0 1
postgres=# select * from a;
id | att
----+-----------------------------------------------------------------------------
1 | {"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}
(1 row)
소식 을 감청 하 다
postgres=# listen a;
LISTEN
Asynchronous notification "a" with payload "ID:1, ATT:{"price": [10000, "2018-01-01 10:10:19"]}" received from server process with PID 51380.
https://jdbc.postgresql.org/documentation/head/listennotify.html
기타
상품 을 삭제 하면 DELETE 트리거 를 사용 하여 하류 에 알려 줄 수 있 습 니 다. 예 를 들 어 상품 이 거래 되 었 고 삭제 되 었 습 니 다.
CREATE OR REPLACE FUNCTION notify2() returns trigger
AS $function$
declare
begin
perform pg_notify(
'a', --
format('CLASS:delete, ID:%s, ATT:%s', OLD.id, OLD.att) --
);
return null;
end;
$function$ language plpgsql strict;
create trigger tg2 after delete on a for each row execute procedure notify2();
방안 2 - 흐름 식 대량 소비
비동기 메 시 지 를 사용 하 는 방식 으로 연결 이 끊 겼 을 때 다시 연결 한 후 다시 감청 해 야 하 며 연결 이 끊 긴 동안 의 메 시 지 는 버 려 집 니 다.그래서 신뢰성 이 떨 어 집 니 다.
또 비동기 메 시 지 는 한 번 에 몇 개 를 소비 하 는 지 통제 할 수 없고, 특별히 우호 적 이지 도 않다.
그래서 우 리 는 실제로 지구 화 표를 사용 하고 비동기 적 으로 대량으로 소비 하 는 방식 으로 소 비 를 하 는 다른 방법 도 있다.
성능 지표:
CASE
데이터 양
병발 하 다
TPS
평균 응답 시간
흐름 식 처리 - 열람 후 소각 - 소비
10 억, 395.2 만 줄 / s 소비
56
3952
14 밀리초
구 조 는 앞의 예 를 따른다.
1. 결과 표 한 장 추가 (표 한 장 을 새로 늘 릴 수도 있 고 업 무량 을 볼 수도 있 으 며 보통 한 장 이면 충분 합 니 다).
2. 트리거 내용 을 수정 하고 notify 를 작성 표 로 바 꿉 니 다.
3. 클 라 이언 트 를 수정 하여 감청 채널 을 비동기 소비 SQL 로 변경 합 니 다.
DEMO
1. 결과 표 추가
create table t_result(id serial8 primary key, class text, content text);
2. 트리거 안에 분류 규칙 을 정의 합 니 다. 예 를 들 어 여기 서 가격 이 100 이상 인 상품 에 대해 정 보 를 결과 표 에 뱉 습 니 다.
CREATE OR REPLACE FUNCTION notify1() returns trigger
AS $function$
declare
begin
if jsonb_array_element(NEW.att->'price', 0)::text::float8 > 100 then -- 1, 100,
insert into t_result(class,content) values (
'a', --
format('CLASS:high price, ID:%s, ATT:%s', NEW.id, NEW.att) --
);
-- elsif ... then
-- else
end if;
return null;
end;
$function$ language plpgsql strict;
3. after insert 또는 update 트리거 생 성
create trigger tg1 after insert or update on a for each row execute procedure notify1();
4. 데이터 통합 기록 테스트
insert into a values
(1, '{"price":[10000, "2018-01-01 10:10:11"]}')
on conflict (id)
do update set
att = merge_json(a.att, excluded.att) -- , , UDF
where
a.att <> merge_json(a.att, excluded.att); -- , , CPU
postgres=# insert into a values
(1, '{"price":[1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}')
on conflict (id)
do update set
att = merge_json(a.att, excluded.att) -- , , UDF
where
a.att <> merge_json(a.att, excluded.att); -- , , CPU
INSERT 0 1
postgres=# select * from a;
id | att
----+-----------------------------------------------------------------------------
1 | {"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}
(1 row)
5. 비동기 대량 소비 결과 표 의 내용 (열람 후 소각)
with a as (delete from t_result where ctid= any(array(
select ctid from t_result order by id limit 10 for update skip locked -- , ,
)) returning *)
select * from a;
id | class | content
----+-------+---------------------------------------------------------------------------------------------------------
1 | a | CLASS:high price, ID:1, ATT:{"price": [10000, "2018-01-01 10:10:11"]}
2 | a | CLASS:high price, ID:1, ATT:{"price": [1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}
(2 rows)
, ,
postgres=# select * from t_result;
id | class | content
----+-------+---------
(0 rows)
프로젝트 2 속 - row 급 트리거 대신 statement 급 트리거 사용
왜 row 급 트리거 대신 statement 급 트리거 를 사용 하 는 것 을 권장 합 니까? 참고:
< PostgreSQL 대량, 단일 쓰기 - row, statement 트리거 (중간 표), CTE 몇 가지 용법 성능 비교 >
트리거 함수 수정 은 다음 과 같 습 니 다.
CREATE OR REPLACE FUNCTION notify1() returns trigger
AS $function$
declare
begin
-- 1
insert into t_result(class,content) select
'a', --
format('CLASS:high price, ID:%s, ATT:%s', id, att) --
from new_table
where jsonb_array_element(att->'price', 0)::text::float8 > 100; -- 1, 100,
--
-- insert into t_result(class,content) select
-- ......
-- from new_table
-- where ... -- n
return null;
end;
$function$ language plpgsql strict;
트리거 수정 은 다음 과 같 습 니 다.
create trigger tg1 after insert on a REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure notify1();
create trigger tg2 after update on a REFERENCING NEW TABLE AS new_table for each STATEMENT execute procedure notify1();
postgres=# \d a
Table "public.a"
Column | Type | Collation | Nullable | Default
--------+---------+-----------+----------+---------
id | integer | | not null |
att | jsonb | | |
Indexes:
"pk" PRIMARY KEY, btree (id)
Triggers:
tg1 AFTER INSERT ON a REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT EXECUTE PROCEDURE notify1()
tg2 AFTER UPDATE ON a REFERENCING NEW TABLE AS new_table FOR EACH STATEMENT EXECUTE PROCEDURE notify1()
작은 매듭
비동기 메시지, UDF, 규칙 또는 트리거 를 사용 하여 실시 간 계산 문 제 를 매우 경 량 화 했다.
그러나 비동기 메 시 지 는 메 시 지 를 잃 어 버 릴 수 있 습 니 다. 예 를 들 어 감청 연결 이 끊 긴 후 다시 연결 할 때 감청 을 다시 시작 하고 연결 을 중단 할 때 소비 되 지 않 은 메 시 지 는 더 이상 소비 되 지 않 기 때문에 메 시 지 를 잃 어 버 린 셈 입 니 다.
개선 방법:
1. 메 시 지 를 잃 어 버 리 지 않도록 하려 면 notify 를 INSERT 로 바 꾸 고 결 과 를 미리 정 의 된 결과 표 에 기록 하 며 논리 적 DECODE 방식 으로 이 결과 표 와 관련 된 logical decode 정 보 를 분석 하여 변 화 량 을 얻 을 수 있 습 니 다. 아래 를 참고 하 십시오.
< PostgreSQL pg recvlogical 과 test decoding 사용자 정의, source table filter 지원, kafka, es 연결 등 >
2. 열람 후 소각 하 는 방법 을 사용 하여 본 방안 과 유사 2.
《 아 리 클 라 우 드 RDS PostgreSQL varbitx 실천 - 스 트림 태그 (읽 은 후 분류 식 대량 계산) - 조 급, 임 의 태그 링 사람, 밀리초 응답 》
《 HTAP 데이터베이스 PostgreSQL 장면 과 성능 테스트 의 32 - (OLTP) 고 삼 키 기 데이터 출입 (퇴적, 청소, 색인 필요 없 음) - 읽 은 후 소각 (JSON + 함수 흐름 식 계산) 》
《 HTAP 데이터베이스 PostgreSQL 장면 과 성능 테스트 의 31 - (OLTP) 고 삼투 데이터 드 나 들 기 (퇴적, 스 캔, 색인 필요 없 음) - 읽 은 후 소각 (읽 기 및 쓰기 대 삼투 및 측정) 》
《 HTAP 데이터베이스 PostgreSQL 장면 과 성능 테스트 의 27 - (OLTP) 사물 인터넷 - FEED 로그, 흐름 처리 와 열람 후 소각 (CTE) 》
< PostgreSQL 에서 update | delete limit - CTID 스 캔 실천 실현 (고 효능 후 소각) >
레 퍼 런 스
https://www.postgresql.org/docs/11/static/functions-json.html
https://www.postgresql.org/docs/11/static/datatype-json.html
https://jdbc.postgresql.org/documentation/head/listennotify.html
https://www.postgresql.org/docs/11/static/sql-notify.html
https://www.postgresql.org/docs/11/static/sql-listen.html
https://www.postgresql.org/docs/11/static/sql-unlisten.html
https://www.postgresql.org/docs/11/static/libpq-notify.html
https://www.postgresql.org/docs/11/static/sql-notify.html#id-1.9.3.157.7.5
https://www.postgresql.org/docs/11/static/functions-info.html
https://www.postgresql.org/docs/11/static/plpgsql-trigger.html
https://github.com/impossibl/pgjdbc-ng
https://www.openmakesoftware.com/postgresql-listen-notify-events-example/
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.