๐ŸŽ„ 12์ผ ๊ฐ„์˜ SMT ๐ŸŽ„ - 8์ผ ์ฐจ: TimestampConverter

TimestampConverter ๋‹จ์ผ ๋ฉ”์‹œ์ง€ ๋ณ€ํ™˜์„ ์‚ฌ์šฉํ•˜๋ฉด Kafka ๋ฉ”์‹œ์ง€์˜ ํƒ€์ž„์Šคํƒฌํ”„ ํ•„๋“œ๋กœ ์ž‘์—…ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋ฌธ์ž์—ด์„ ๊ธฐ๋ณธTimestamp ์œ ํ˜•(๋˜๋Š” Date ๋˜๋Š” Time )๊ณผ Unix epoch๋กœ ๋ณ€ํ™˜ํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ ๊ทธ ๋ฐ˜๋Œ€๋„ ๋งˆ์ฐฌ๊ฐ€์ง€์ž…๋‹ˆ๋‹ค.

์ด๊ฒƒ์€ Kafka์— ์ˆ˜์ง‘๋œ ๋ฐ์ดํ„ฐ๊ฐ€ ํƒ€์ž„์Šคํƒฌํ”„๋กœ ์˜ฌ๋ฐ”๋ฅด๊ฒŒ ์ €์žฅ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•˜๋Š” ๋ฐ ๋งค์šฐ ์œ ์šฉํ•˜๋ฉฐ(์žˆ๋Š” ๊ฒฝ์šฐ) ํƒ€์ž„์Šคํƒฌํ”„๋ฅผ ์„ ํƒํ•œ ๋ฌธ์ž์—ด ํ˜•์‹์œผ๋กœ ์‹ฑํฌ ์ปค๋„ฅํ„ฐ์— ์“ธ ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.
TimestampConverter๋Š” ์„ธ ๊ฐœ์˜ ์ธ์ˆ˜๋ฅผ ์ทจํ•ฉ๋‹ˆ๋‹ค. ํƒ€์ž„์Šคํƒฌํ”„๊ฐ€ ์žˆ๋Š” ํ•„๋“œ์˜ ์ด๋ฆ„, ๋ณ€ํ™˜ํ•˜๋ ค๋Š” ๋ฐ์ดํ„ฐ ์œ ํ˜•, ๊ตฌ๋ฌธ ๋ถ„์„ํ•  ํƒ€์ž„์Šคํƒฌํ”„ ํ˜•์‹(๋ฌธ์ž์—ด ๋ฐ target.type์„ ์ฝ๋Š” ๊ฒฝ์šฐ: unix/timestamp/date/time ) ๋˜๋Š” ์“ฐ๊ธฐ(๋ฌธ์ž์—ด์„ ์ž‘์„ฑ ์ค‘์ด๊ณ  target.type๊ฐ€ string์ธ ๊ฒฝ์šฐ). ํƒ€์ž„์Šคํƒฌํ”„ ํ˜•์‹์€ SimpleDateFormat ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•ฉ๋‹ˆ๋‹ค.

"transforms" : "convertTS",
"transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTS.field" : "txn_date",
"transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
"transforms.convertTS.target.type": "Timestamp"


๐Ÿ‘พ Demo code

๋ฌธ์ž์—ด ๊ธฐ๋ฐ˜ ํƒ€์ž„์Šคํƒฌํ”„๋ฅผ ํƒ€์ž„์Šคํƒฌํ”„๋กœ ๋ณ€๊ฒฝ



ํŽ˜์ด๋กœ๋“œ์—์„œ txn_date ํ•„๋“œ๋Š” ํƒ€์ž„์Šคํƒฌํ”„์ฒ˜๋Ÿผ ๋ณด์ž…๋‹ˆ๋‹ค.

$ docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day8-transactions -C -c5 -o-5 -u -q -J | \
  jq '.payload.Gen0.txn_date.string'

"Thu Dec 10 17:06:59 GMT 2020"
"Sat Dec 05 16:39:40 GMT 2020"
"Sat Dec 05 21:43:46 GMT 2020"
"Sun Dec 13 20:30:21 GMT 2020"
"Wed Dec 09 06:18:31 GMT 2020"


๊ทธ๋Ÿฌ๋‚˜ ๊ฐ’์— ๋Œ€ํ•œ ์‹ค์ œ ์Šคํ‚ค๋งˆ๋ฅผ ์‚ดํŽด๋ณด๋ฉด(Avro๋กœ ์ง๋ ฌํ™”๋˜๋ฏ€๋กœ Protobuf ๋˜๋Š” JSON ์Šคํ‚ค๋งˆ์—๋„ ๋™์ผํ•˜๊ฒŒ ์ ์šฉ๋จ) ํƒ€์ž„์Šคํƒฌํ”„์ฒ˜๋Ÿผ ๋ณด์ผ ์ˆ˜ ์žˆ๊ณ  ํƒ€์ž„์Šคํƒฌํ”„์ฒ˜๋Ÿผ ๋ŒํŒ”์ด์ฒ˜๋Ÿผ ๋ณด์ผ ์ˆ˜ ์žˆ์ง€๋งŒ ์‹ค์ œ๋กœ๋Š” ํƒ€์ž„์Šคํƒฌํ”„ ์ž์ฒด๊ฐ€ ์•„๋‹ˆ๋ผ ๋ฌธ์ž์—ด:

$ curl -s "http://localhost:8081/subjects/day8-transactions-value/versions/latest" | jq '.schema|fromjson[]'

"null"
{
  "type": "record",
  "name": "Gen0",
  "namespace": "io.mdrogalis",
  "fields": [
[โ€ฆ]
    {
      "name": "txn_date",
      "type": [
        "null",
        "string"
      ]
[โ€ฆ]


์ฆ‰, Kafka Connect ์‹ฑํฌ์™€ ๊ฐ™์€ ์†Œ๋น„์ž๊ฐ€ ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ ๋Œ€์ƒ ๊ฐœ์ฒด ์œ ํ˜•์ด ์ผ๋ฐ˜์ ์œผ๋กœ ๋™์ผํ•˜๊ฒŒ ์ƒ์†ํ•˜๋Š” ๊ฒฐ๊ณผ์™€ ํ•จ๊ป˜ ์—ฌ์ „ํžˆ ๋ฌธ์ž์—ด๋กœ ์ฒ˜๋ฆฌ๋ฉ๋‹ˆ๋‹ค. ๋‹ค์Œ์€ JDBC sink connector ( ๐ŸŽฅ Kafka Connect in Action : JDBC Sink (๐Ÿ‘พ demo code ) ๋ฐ ๐ŸŽฅ ksqlDB & Kafka Connect JDBC Sink in action (๐Ÿ‘พ demo code ))์˜ ์˜ˆ์ž…๋‹ˆ๋‹ค.

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-00/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password": "mysqlpw",
          "topics" : "day8-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true"
        }'



mysql> describe `day8-transactions`;
+------------------+------+------+-----+---------+-------+
| Field            | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| customer_remarks | text | YES  |     | NULL    |       |
| item             | text | YES  |     | NULL    |       |
| cost             | text | YES  |     | NULL    |       |
| card_type        | text | YES  |     | NULL    |       |
| txn_date         | text | YES  |     | NULL    |       |
+------------------+------+------+-----+---------+-------+
5 rows in set (0.00 sec)


txn_date๋Š” text ํ•„๋“œ๋กœ ํƒ€์ž„์Šคํƒฌํ”„๋กœ ์‚ฌ์šฉํ•˜๋ ค๋Š” ์‚ฌ๋žŒ์—๊ฒŒ๋Š” ์•„๋ฌด ์†Œ์šฉ์ด ์—†์Šต๋‹ˆ๋‹ค.

์—ฌ๊ธฐ์—์„œ TimestampConverter ์ด ๋‚˜์˜ต๋‹ˆ๋‹ค. ์ด ์˜ˆ์—์„œ๋Š” ์ œ๊ณต๋œ ๋‚ ์งœ ๋ฐ ์‹œ๊ฐ„ ํ˜•์‹ ๋ฌธ์ž์—ด์„ ์‚ฌ์šฉํ•˜์—ฌ Kafka Connect๋ฅผ ํ†ต๊ณผํ•  ๋•Œ ๋ฌธ์ž์—ด์„ ํƒ€์ž„์Šคํƒฌํ”„๋กœ ์บ์ŠคํŒ…ํ•ฉ๋‹ˆ๋‹ค. ๋˜ํ•œ ์ด๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์—ํฌํฌ ํƒ€์ž„์Šคํƒฌํ”„ ๊ฐ’ ๊ฐ„์— ๋ณ€ํ™˜ํ•˜๊ณ  ๋ฌธ์ž์—ด, ์—ํฌํฌ, ๋‚ ์งœ ๋˜๋Š” ์‹œ๊ฐ„(์‹ค์ œ ํƒ€์ž„์Šคํƒฌํ”„๋„ ํฌํ•จ)์„ ๋Œ€์ƒ์œผ๋กœ ์ง€์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-01/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day8-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true",
          "transforms" : "convertTS,changeTopic",
          "transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
          "transforms.convertTS.field" : "txn_date",
          "transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
          "transforms.convertTS.target.type": "Timestamp",
          "transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.changeTopic.regex" : "(.*)",
          "transforms.changeTopic.replacement": "$1_withTS"
        }'


๋‹ค์Œ์€ MySQL์˜ ๊ฒฐ๊ณผ ํ…Œ์ด๋ธ”์ž…๋‹ˆ๋‹ค.

mysql> describe `day8-transactions_withTS`;
+------------------+-------------+------+-----+---------+-------+
| Field            | Type        | Null | Key | Default | Extra |
+------------------+-------------+------+-----+---------+-------+
| customer_remarks | text        | YES  |     | NULL    |       |
| item             | text        | YES  |     | NULL    |       |
| cost             | text        | YES  |     | NULL    |       |
| card_type        | text        | YES  |     | NULL    |       |
| txn_date         | datetime(3) | YES  |     | NULL    |       |
+------------------+-------------+------+-----+---------+-------+
5 rows in set (0.00 sec)


์œ„์—์„œ ์–ธ๊ธ‰ํ–ˆ๋“ฏ์ด target.type ๋ฅผ ๋ณ€๊ฒฝํ•˜์—ฌ ํƒ€์ž„์Šคํƒฌํ”„์˜ ๋‚ ์งœ ๋˜๋Š” ์‹œ๊ฐ„ ๊ตฌ์„ฑ ์š”์†Œ๋งŒ ์ถ”์ถœํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

๋‚ ์งœ๋งŒ




curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-02/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day8-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true",
          "transforms" : "convertTS,changeTopic",
          "transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
          "transforms.convertTS.field" : "txn_date",
          "transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
          "transforms.convertTS.target.type": "Date",
          "transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.changeTopic.regex" : "(.*)",
          "transforms.changeTopic.replacement": "$1_withDate"
        }'


MySQL์˜ ๊ฒฐ๊ณผ ํ…Œ์ด๋ธ”:

mysql> describe `day8-transactions_withDate`;
+------------------+------+------+-----+---------+-------+
| Field            | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| customer_remarks | text | YES  |     | NULL    |       |
| item             | text | YES  |     | NULL    |       |
| cost             | text | YES  |     | NULL    |       |
| card_type        | text | YES  |     | NULL    |       |
| txn_date         | date | YES  |     | NULL    |       |
+------------------+------+------+-----+---------+-------+
5 rows in set (0.01 sec)



mysql> select txn_date from `day8-transactions_withDate` LIMIT 5;
+------------+
| txn_date   |
+------------+
| 2020-01-04 |
| 2020-01-04 |
| 2019-12-29 |
| 2020-01-01 |
| 2019-12-29 |
+------------+
5 rows in set (0.00 sec)


์‹œ๊ฐ„๋งŒ




curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-03/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day8-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true",
          "transforms" : "convertTS,changeTopic",
          "transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
          "transforms.convertTS.field" : "txn_date",
          "transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
          "transforms.convertTS.target.type": "Time",
          "transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.changeTopic.regex" : "(.*)",
          "transforms.changeTopic.replacement": "$1_withTime"
        }'


MySQL์˜ ๊ฒฐ๊ณผ ํ…Œ์ด๋ธ”:

mysql> describe `day8-transactions_withTime`;
+------------------+---------+------+-----+---------+-------+
| Field            | Type    | Null | Key | Default | Extra |
+------------------+---------+------+-----+---------+-------+
| customer_remarks | text    | YES  |     | NULL    |       |
| item             | text    | YES  |     | NULL    |       |
| cost             | text    | YES  |     | NULL    |       |
| card_type        | text    | YES  |     | NULL    |       |
| txn_date         | time(3) | YES  |     | NULL    |       |
+------------------+---------+------+-----+---------+-------+
5 rows in set (0.00 sec)



mysql> select txn_date from `day8-transactions_withTime` LIMIT 5;
+--------------+
| txn_date     |
+--------------+
| 14:05:19.000 |
| 14:09:11.000 |
| 19:18:25.000 |
| 03:22:06.000 |
| 09:57:44.000 |
+--------------+
5 rows in set (0.00 sec)


์œ ๋‹‰์Šค ์‹œ๋Œ€



์œ ๋‹‰์Šค ์‹œ๋Œ€๋ฅผ ์ž‘์„ฑํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

curl -i -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-04/config \
    -d '{
          "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
          "connection.url" : "jdbc:mysql://mysql:3306/demo",
          "connection.user" : "mysqluser",
          "connection.password" : "mysqlpw",
          "topics" : "day8-transactions",
          "tasks.max" : "4",
          "auto.create" : "true",
          "auto.evolve" : "true",
          "transforms" : "convertTS,changeTopic",
          "transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
          "transforms.convertTS.field" : "txn_date",
          "transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
          "transforms.convertTS.target.type": "unix",
          "transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
          "transforms.changeTopic.regex" : "(.*)",
          "transforms.changeTopic.replacement": "$1_withUnixEpoch"
        }'


MySQL์˜ ๊ฒฐ๊ณผ ํ…Œ์ด๋ธ”:

mysql> describe `day8-transactions_withUnixEpoch`;
+------------------+--------+------+-----+---------+-------+
| Field            | Type   | Null | Key | Default | Extra |
+------------------+--------+------+-----+---------+-------+
| customer_remarks | text   | YES  |     | NULL    |       |
| item             | text   | YES  |     | NULL    |       |
| cost             | text   | YES  |     | NULL    |       |
| card_type        | text   | YES  |     | NULL    |       |
| txn_date         | bigint | YES  |     | NULL    |       |
+------------------+--------+------+-----+---------+-------+
5 rows in set (0.00 sec)



mysql> select txn_date from `day8-transactions_withUnixEpoch` LIMIT 5;
+---------------+
| txn_date      |
+---------------+
| 1577973919000 |
| 1577714951000 |
| 1577819905000 |
| 1577762526000 |
| 1577786264000 |
+---------------+
5 rows in set (0.00 sec)


์œ ๋‹‰์Šค ์—ํฌํฌ(bigint)์˜ ํƒ€์ž„์Šคํƒฌํ”„๊ฐ€ ์†Œ์Šค๋กœ ์žˆ๋Š” ๊ฒฝ์šฐ TimestampConverter๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํƒ€์ž„์Šคํƒฌํ”„/๋‚ ์งœ/์‹œ๊ฐ„ ๋ฐ ๋ฌธ์ž์—ด๋กœ๋„ ์“ธ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ํ›„์ž๋ฅผ ์ˆ˜ํ–‰ํ•˜๋ฉด format ๊ตฌ์„ฑ์ด ๋‹ค์Œ์— ์ ์šฉ๋ฉ๋‹ˆ๋‹ค. ๋ฌธ์ž์—ด์ด ๊ธฐ๋ก๋  ํ˜•์‹์ž…๋‹ˆ๋‹ค.

์ค‘์ฒฉ ํ•„๋“œ์˜ ํƒ€์ž„์Šคํƒฌํ”„ ์•ก์„ธ์Šค



๋ถˆํ–‰ํžˆ๋„ TimestampConverter๋Š” ๋ฃจํŠธ ์ˆ˜์ค€ ์š”์†Œ์—์„œ๋งŒ ์ž‘๋™ํ•ฉ๋‹ˆ๋‹ค. ๋‹ค๋ฅธ ํ•„๋“œ์— ์ค‘์ฒฉ๋œ ํƒ€์ž„์Šคํƒฌํ”„ ํ•„๋“œ์—๋Š” ์‚ฌ์šฉํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค. ๋จผ์ € Flatten ์„ ์‚ฌ์šฉํ•˜๊ฑฐ๋‚˜ ๊ณ ์œ ํ•œ ๋ณ€ํ™˜์„ ์ž‘์„ฑํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

๊ทธ๊ฒƒ์„ ๋ฐ–์œผ๋กœ ์‹œ๋„!



Docker Compose๋ฅผ ํฌํ•จํ•˜์—ฌ ์ด๋ฅผ ์‹œ๋„ํ•˜๊ธฐ ์œ„ํ•œ ์ „์ฒด ์ฝ”๋“œ๋ฅผ ์ฐพ์„ ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ ๋กœ์ปฌ ์‹œ์Šคํ…œ์—์„œ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค๐Ÿ‘พ here.

์ข‹์€ ์›นํŽ˜์ด์ง€ ์ฆ๊ฒจ์ฐพ๊ธฐ