๐Ÿ˜๐Ÿš€ JSON ํŽ˜์ด๋กœ๋“œ์—์„œ ์—…๋ฐ์ดํŠธ/์‚ฝ์ž…/์ผ์‹œ ์‚ญ์ œ

21706 ๋‹จ์–ด databasepostgresyugabytedbsql
์‚ฌ์šฉ์ž๋Š” ์ž์‹ ์˜ "ETL"์‚ฌ์šฉ ์‚ฌ๋ก€๋ฅผ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ๋…ธ์ถœํ–ˆ์Šต๋‹ˆ๋‹ค.
  • ๊ฑฐ๋ž˜ ํ…Œ์ด๋ธ”์€ ID(๋‚ด ์˜ˆ์—์„œ๋Š” k1 int)๋ฅผ ๋ณด์œ ํ•ฉ๋‹ˆ๋‹ค. ์—ฌ๋Ÿฌ ๋ ˆ์ฝ”๋“œ(ํ‚ค์— ๋Œ€ํ•œ k2 int ๋ฐ ๊ฐ’์— ๋Œ€ํ•œ v1 int , v2 int ๋‚ด ์ž‘์€ ์˜ˆ์—์„œ)
  • ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ์ƒˆ ๋ฐ์ดํ„ฐ๋ฅผ ์„ค๋ช…ํ•˜๋Š” JSON ํ˜•์‹์˜ ํŽ˜์ด๋กœ๋“œ๋ฅผ ์ˆ˜์‹ ํ•ฉ๋‹ˆ๋‹ค
  • .
  • ์ƒˆ ID๊ฐ€ ์‚ฝ์ž…๋ฉ๋‹ˆ๋‹ค
  • .
  • ๊ธฐ์กด ID๊ฐ€ ์—…๋ฐ์ดํŠธ๋จ
  • ๋ˆ„๋ฝ๋œ ID๋Š” ์‚ญ์ œ๋œ ๊ฒƒ์œผ๋กœ ํ‘œ์‹œ๋ฉ๋‹ˆ๋‹ค( deleted boolean )
  • ๋งˆ์ง€๋ง‰ ๋ณ€๊ฒฝ ์‹œ๊ฐ„
  • ์„ ๊ธฐ๋กํ•˜๊ธฐ ์œ„ํ•ด ํƒ€์ž„์Šคํƒฌํ”„ts timestamptz๋ฅผ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.

    ๊ทธ๋“ค์˜ ํ˜„์žฌ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์€ Python Pandas๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํ˜„์žฌ ๋ ˆ์ฝ”๋“œ๋ฅผ ๊ฒ€์ƒ‰ํ•˜๊ณ  ์ƒˆ ํŽ˜์ด๋กœ๋“œ์™€ ๋น„๊ตํ•˜๊ณ  ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ๋‹ค์‹œ ์”๋‹ˆ๋‹ค. YugabyteDB์™€ ๊ฐ™์€ ๋ถ„์‚ฐ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์—์„œ ์ด๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฒƒ์ด ๋” ํšจ์œจ์ ์ž…๋‹ˆ๋‹ค. ์™œ๋ƒํ•˜๋ฉด ํ™•์žฅ์ด ๊ฐ€๋Šฅํ•˜๊ณ (๋ชจ๋“  ๋…ธ๋“œ์— ์—ฐ๊ฒฐํ•  ์ˆ˜ ์žˆ์Œ) ํ•˜๋‚˜์˜ ์„œ๋ฒ„ ์ธก ํŠธ๋žœ์žญ์…˜์—์„œ ์‹คํ–‰๋  ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์ž…๋‹ˆ๋‹ค(ํด๋Ÿญ ์Šคํ์˜ ๊ฒฝ์šฐ ํˆฌ๋ช…ํ•œ ์žฌ์‹œ๋„๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ๊ฐ€ ๋” ์‰ฝ์Šต๋‹ˆ๋‹ค). ์™•๋ณต์„ ์ค„์ž…๋‹ˆ๋‹ค(ํ–‰ ๋ฐฐ์น˜ ์ž‘์—…). ๊ทธ๋ฆฌ๊ณ  PostgreSQL ํ˜ธํ™˜์„ฑ์„ ํ†ตํ•ด JSON ์ฒ˜๋ฆฌ ๋ฐ INSERT ... ON CONFLICT ... UPDATE ๋™์ž‘์€ ๊นจ๋—ํ•˜๊ณ  ๊ฐ„๋‹จํ•œ SQL๋กœ ๋™์ผํ•œ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

    ์˜ˆ์‹œ



    ๋‹ค์Œ์€ ๋‚ด ํ…Œ์ด๋ธ”์„ ์„ ์–ธํ•˜๋Š” ๊ฐ„๋‹จํ•œ ์˜ˆ์ž…๋‹ˆ๋‹ค.

    create table demo (
     k1 int, k2 int, v1 int, v2 text
     , deleted boolean, ts timestamptz
     , primary key ( k1,k2 )
    );
    


    ์ด๊ฒƒ์€ PostgreSQL ๋ฐ YugabyteDB์—์„œ ์ž‘๋™ํ•ฉ๋‹ˆ๋‹ค. YugabyteDB์—์„œ ๊ธฐ๋ณธ ํ‚ค๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ lsm (k1 HASH, k2 ASC)๋กœ ์„ค์ •๋˜์–ด ์žˆ์–ด ID์— ๋ฐฐํฌํ•˜๊ณ  ๋ ˆ์ฝ”๋“œ๋ฅผ ๊ฐ™์€ ์œ„์น˜์— ๋ฐฐ์น˜ํ•˜๋Š” ์ œ ์‚ฌ์šฉ ์‚ฌ๋ก€์— ์ ํ•ฉํ•ฉ๋‹ˆ๋‹ค.

    ํ•˜๋‚˜์˜ SQL ์ฟผ๋ฆฌ์—์„œ ์ „์ฒด ๋…ผ๋ฆฌ๋ฅผ ๊ตฌํ˜„ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค. WITH ์ ˆ(์ผ๋ช… CTE - Common Table Expressions)์€ ๊ฐ€๋…์„ฑ์„ ์œ ์ง€ํ•ฉ๋‹ˆ๋‹ค.
  • payload๋Š” $2 ๋•๋ถ„์— ๋ ˆ์ฝ”๋“œ์— jsonb_to_recordset๋กœ ์ „๋‹ฌ๋œ ๋‚ด JSON ํŽ˜์ด๋กœ๋“œ๋ฅผ ์ฝ๊ณ  ID$1 as k1๋ฅผ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.
  • to_upsert๋Š” payload๋ฅผ ๋Œ€์ƒ ํ˜•์‹์œผ๋กœ ์ง€์ •ํ•˜๊ณ  is_deleted๋ฅผ false๋กœ ์„ค์ •ํ•˜๊ณ  ํƒ€์ž„์Šคํƒฌํ”„
  • ๋ฅผ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.
  • to_soft_delete๋Š” ํ˜„์žฌ ๋ ˆ์ฝ”๋“œ๋ฅผ ๊ฒ€์ƒ‰ํ•˜๊ณ  payload์™€ ๋น„๊ตํ•˜์—ฌ is_deleted๊ฐ€ true
  • ๋กœ ์„ค์ •๋œ ์ผ์‹œ ์‚ญ์ œ๋กœ ํ˜•์‹์„ ์ง€์ •ํ•ฉ๋‹ˆ๋‹ค.
  • ๋งˆ์ง€๋ง‰์œผ๋กœ to_upsert์™€ to_soft_delete์˜ ๊ฒฐํ•ฉ์ด INSERT ... ON CONFLICT ... UPDATE
  • ์™€ ๋ณ‘ํ•ฉ๋ฉ๋‹ˆ๋‹ค.

    ๋‹ค์Œ์€ ์ค€๋น„๋œ ์ง„์ˆ ์ž…๋‹ˆ๋‹ค.

    prepare etl (int, jsonb) as
    with
    payload as (
     select $1 as k1,* from jsonb_to_recordset($2) as payload( k2 int, v1 int, v2 text )
    ),
    to_upsert as (
     select k1, k2, v1, v2, false as deleted, now() as ts from payload
    ),
    to_soft_delete as (
     select k1, k2, v1, v2, true  as deleted, now() as ts from demo
     where  k1=$1
     and (k1, k2) not in ( select k1, k2 from payload)
    )
    insert into demo select * from to_upsert union select * from to_soft_delete
    on conflict (k1, k2) do
    update set v1=excluded.v1, v2=excluded.v2, deleted=excluded.deleted, ts=excluded.ts
    ;
    


    ์ด๊ฒƒ์€ ์ €์žฅ ํ”„๋กœ์‹œ์ €๋กœ ์ฝ”๋”ฉ๋  ์ˆ˜ ์žˆ์ง€๋งŒ ์ค€๋น„๋œ ๋ช…๋ น๋ฌธ์€ ๊ฐ ํ˜ธ์ถœ์— ๋Œ€ํ•ด ์ด๋ฅผ ๊ตฌ๋ฌธ ๋ถ„์„ํ•˜๊ณ  ์ตœ์ ํ™”ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ์—ฐ๊ฒฐ ํ’€ ์ดˆ๊ธฐํ™” ๋ช…๋ น์—์„œ ์‰ฝ๊ฒŒ ์ค€๋น„ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

    ๋‚ด ํ…Œ์ด๋ธ”์ด ๋น„์–ด ์žˆ์Šต๋‹ˆ๋‹ค. 3๊ฐœ์˜ ๋ ˆ์ฝ”๋“œ๊ฐ€ ์žˆ๋Š” k1=0์— ๋Œ€ํ•œ ํŽ˜์ด๋กœ๋“œ๋ฅผ ์‚ฝ์ž…ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

    execute etl(0, $$
    [
    {"k2":1,"v1":1,"v2":"my first insert"},
    {"k2":2,"v1":1,"v2":"my first insert"},
    {"k2":3,"v1":1,"v2":"my first insert"}
    ]
    $$::jsonb);
    


    3๊ฐœ์˜ ํ–‰์ด ์‚ฝ์ž…๋˜์—ˆ์Šต๋‹ˆ๋‹ค.

    yugabyte=# select * from demo;
     k1 | k2 | v1 |       v2        | deleted |              ts
    ----+----+----+-----------------+---------+-------------------------------
      0 |  1 |  1 | my first insert | f       | 2022-04-29 14:39:08.467744+00
      0 |  2 |  1 | my first insert | f       | 2022-04-29 14:39:08.467744+00
      0 |  3 |  1 | my first insert | f       | 2022-04-29 14:39:08.467744+00
    (3 rows)
    


    ์ด์ œ ๋™์ผํ•œk1=0์—์„œ ์ƒˆ ํŽ˜์ด๋กœ๋“œ๊ฐ€ 2๊ฐœ์˜ ํ–‰์„ ์—…๋ฐ์ดํŠธํ•˜๊ณ  ๋‹ค๋ฅธ ํ–‰์„ ์ œ๊ฑฐํ•˜๊ณ  ์ƒˆ ํ–‰์„ ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.

    execute etl(0,$$
    [
    {"k2":1,"v1":1,"v2":"my update"},
    {"k2":2,"v1":1,"v2":"my update"},
    {"k2":4,"v1":1,"v2":"my second insert"}
    ]
    $$::jsonb);
    


    ๊ฒฐ๊ณผ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

    yugabyte=# select * from demo;
     k1 | k2 | v1 |        v2        | deleted |              ts
    ----+----+----+------------------+---------+-------------------------------
      0 |  1 |  1 | my update        | f       | 2022-04-29 14:41:00.109561+00
      0 |  2 |  1 | my update        | f       | 2022-04-29 14:41:00.109561+00
      0 |  3 |  1 | my first insert  | t       | 2022-04-29 14:41:00.109561+00
      0 |  4 |  1 | my second insert | f       | 2022-04-29 14:41:00.109561+00
    (4 rows)
    


    ์„ฑ๋Šฅ



    ํ•ญ์ƒ ์‹คํ–‰ ๊ณ„ํš์„ ํ™•์ธํ•˜์‹ญ์‹œ์˜ค.

    yugabyte=# explain (costs off, analyze) execute etl(0,$$                                                          [9/3108]
    [
    {"k2":1,"v1":1,"v2":"my update"},
    {"k2":2,"v1":1,"v2":"my update"},
    {"k2":4,"v1":1,"v2":"my second insert"}
    ]
    $$::jsonb);
                                                     QUERY PLAN
    ------------------------------------------------------------------------------------------------------------
     Insert on demo (actual time=11.787..11.787 rows=0 loops=1)
       Conflict Resolution: UPDATE
       Conflict Arbiter Indexes: demo_pkey
       Tuples Inserted: 0
       Conflicting Tuples: 4
       CTE payload
         ->  Function Scan on jsonb_to_recordset payload (actual time=0.011..0.012 rows=3 loops=1)
       CTE to_upsert
         ->  CTE Scan on payload payload_1 (actual time=0.013..0.014 rows=3 loops=1)
       CTE to_soft_delete
         ->  Index Scan using demo_pkey on demo demo_1 (actual time=1.079..1.081 rows=1 loops=1)
               Index Cond: (k1 = 0)
               Filter: (NOT (hashed SubPlan 3))
               Rows Removed by Filter: 3
               SubPlan 3
                 ->  CTE Scan on payload payload_2 (actual time=0.001..0.001 rows=3 loops=1)
       ->  HashAggregate (actual time=1.105..1.110 rows=4 loops=1)
             Group Key: to_upsert.k1, to_upsert.k2, to_upsert.v1, to_upsert.v2, to_upsert.deleted, to_upsert.ts
             ->  Append (actual time=0.014..1.099 rows=4 loops=1)
                   ->  CTE Scan on to_upsert (actual time=0.014..0.016 rows=3 loops=1)
                   ->  CTE Scan on to_soft_delete (actual time=1.080..1.082 rows=1 loops=1)
     Planning Time: 0.262 ms
     Execution Time: 11.864 ms
    (23 rows)
    


    ์ด๊ฒƒ์€ Index Cond: (k1 = 0) ์— ๋Œ€ํ•œ ํ•˜๋‚˜์˜ ํƒœ๋ธ”๋ฆฟ๋งŒ ์ฝ๊ณ  ๋ชจ๋“  ํ–‰์ด ํ•จ๊ป˜ ๋ฐฐ์น˜๋œ ๊ธฐ๋ณธ ํ‚ค์— Index Scan ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ์—ฌ๊ธฐ์„œ (k1 HASH, k2 ASC)๊ฐ€ ์ค‘์š”ํ•ฉ๋‹ˆ๋‹ค.

    ๋ฌผ๋ก  ์ด๊ฒƒ์€ ๋™์ผํ•œ ๊ฐ’์„ ์—…๋ฐ์ดํŠธํ•˜์ง€ ์•Š๋Š” ๊ฒƒ๊ณผ ๊ฐ™์ด ์ตœ์ ํ™”๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ์ฝ”๋“œ๋ฅผ ๋‹จ์ˆœํ•˜๊ฒŒ ์œ ์ง€ํ•˜๋Š” ๊ฒƒ์ด ์ตœ์„ ์˜ ์„ ํƒ์ผ ๊ฒƒ์ž…๋‹ˆ๋‹ค. ์ตœ์ ํ™”๋Š” ๋ณด๋‹ค ์ •ํ™•ํ•œ ์‚ฌ์šฉ ์‚ฌ๋ก€์— ๋”ฐ๋ผ ๋‹ฌ๋ผ์ง‘๋‹ˆ๋‹ค.

    ์ด ์ฟผ๋ฆฌ๋Š” ์ •๋ณด ์Šคํ‚ค๋งˆ์—์„œ ์‰ฝ๊ฒŒ ์ƒ์„ฑํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค. ์—ด ์ •์˜( k1 HASH, k2 ASC ), ์—ด ๋ชฉ๋ก( k1, k2, v1, v2 ), ํ‚ค ์—ด( k1, k2 ) ๋ฐ ์ง‘ํ•ฉ ์ ˆ( v1=excluded.v1, v2=excluded.v2 )์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค.

    ์ข‹์€ ์›นํŽ˜์ด์ง€ ์ฆ๊ฒจ์ฐพ๊ธฐ