๐ 12์ผ ๊ฐ์ SMT ๐ - 8์ผ ์ฐจ: TimestampConverter
TimestampConverter
๋จ์ผ ๋ฉ์์ง ๋ณํ์ ์ฌ์ฉํ๋ฉด Kafka ๋ฉ์์ง์ ํ์์คํฌํ ํ๋๋ก ์์
ํ ์ ์์ต๋๋ค. ๋ฌธ์์ด์ ๊ธฐ๋ณธTimestamp ์ ํ(๋๋ Date ๋๋ Time )๊ณผ Unix epoch๋ก ๋ณํํ ์ ์์ผ๋ฉฐ ๊ทธ ๋ฐ๋๋ ๋ง์ฐฌ๊ฐ์ง์
๋๋ค.์ด๊ฒ์ Kafka์ ์์ง๋ ๋ฐ์ดํฐ๊ฐ ํ์์คํฌํ๋ก ์ฌ๋ฐ๋ฅด๊ฒ ์ ์ฅ๋์๋์ง ํ์ธํ๋ ๋ฐ ๋งค์ฐ ์ ์ฉํ๋ฉฐ(์๋ ๊ฒฝ์ฐ) ํ์์คํฌํ๋ฅผ ์ ํํ ๋ฌธ์์ด ํ์์ผ๋ก ์ฑํฌ ์ปค๋ฅํฐ์ ์ธ ์๋ ์์ต๋๋ค.
TimestampConverter
๋ ์ธ ๊ฐ์ ์ธ์๋ฅผ ์ทจํฉ๋๋ค. ํ์์คํฌํ๊ฐ ์๋ ํ๋์ ์ด๋ฆ, ๋ณํํ๋ ค๋ ๋ฐ์ดํฐ ์ ํ, ๊ตฌ๋ฌธ ๋ถ์ํ ํ์์คํฌํ ํ์(๋ฌธ์์ด ๋ฐ target.type
์ ์ฝ๋ ๊ฒฝ์ฐ: unix
/timestamp
/date
/time
) ๋๋ ์ฐ๊ธฐ(๋ฌธ์์ด์ ์์ฑ ์ค์ด๊ณ target.type
๊ฐ string
์ธ ๊ฒฝ์ฐ). ํ์์คํฌํ ํ์์ SimpleDateFormat
์ ๊ธฐ๋ฐ์ผ๋ก ํฉ๋๋ค."transforms" : "convertTS",
"transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTS.field" : "txn_date",
"transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
"transforms.convertTS.target.type": "Timestamp"
๐พ Demo code
๋ฌธ์์ด ๊ธฐ๋ฐ ํ์์คํฌํ๋ฅผ ํ์์คํฌํ๋ก ๋ณ๊ฒฝ
ํ์ด๋ก๋์์
txn_date
ํ๋๋ ํ์์คํฌํ์ฒ๋ผ ๋ณด์
๋๋ค.$ docker exec kafkacat kafkacat -b broker:29092 -r http://schema-registry:8081 -s key=s -s value=avro -t day8-transactions -C -c5 -o-5 -u -q -J | \
jq '.payload.Gen0.txn_date.string'
"Thu Dec 10 17:06:59 GMT 2020"
"Sat Dec 05 16:39:40 GMT 2020"
"Sat Dec 05 21:43:46 GMT 2020"
"Sun Dec 13 20:30:21 GMT 2020"
"Wed Dec 09 06:18:31 GMT 2020"
๊ทธ๋ฌ๋ ๊ฐ์ ๋ํ ์ค์ ์คํค๋ง๋ฅผ ์ดํด๋ณด๋ฉด(Avro๋ก ์ง๋ ฌํ๋๋ฏ๋ก Protobuf ๋๋ JSON ์คํค๋ง์๋ ๋์ผํ๊ฒ ์ ์ฉ๋จ) ํ์์คํฌํ์ฒ๋ผ ๋ณด์ผ ์ ์๊ณ ํ์์คํฌํ์ฒ๋ผ ๋ํ์ด์ฒ๋ผ ๋ณด์ผ ์ ์์ง๋ง ์ค์ ๋ก๋ ํ์์คํฌํ ์์ฒด๊ฐ ์๋๋ผ ๋ฌธ์์ด:
$ curl -s "http://localhost:8081/subjects/day8-transactions-value/versions/latest" | jq '.schema|fromjson[]'
"null"
{
"type": "record",
"name": "Gen0",
"namespace": "io.mdrogalis",
"fields": [
[โฆ]
{
"name": "txn_date",
"type": [
"null",
"string"
]
[โฆ]
์ฆ, Kafka Connect ์ฑํฌ์ ๊ฐ์ ์๋น์๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ฌ์ฉํ ๋ ๋์ ๊ฐ์ฒด ์ ํ์ด ์ผ๋ฐ์ ์ผ๋ก ๋์ผํ๊ฒ ์์ํ๋ ๊ฒฐ๊ณผ์ ํจ๊ป ์ฌ์ ํ ๋ฌธ์์ด๋ก ์ฒ๋ฆฌ๋ฉ๋๋ค. ๋ค์์ JDBC sink connector ( ๐ฅ Kafka Connect in Action : JDBC Sink (๐พ
demo code
) ๋ฐ ๐ฅ ksqlDB & Kafka Connect JDBC Sink in action (๐พ demo code
))์ ์์
๋๋ค.curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-00/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password": "mysqlpw",
"topics" : "day8-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true"
}'
mysql> describe `day8-transactions`;
+------------------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| customer_remarks | text | YES | | NULL | |
| item | text | YES | | NULL | |
| cost | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
| txn_date | text | YES | | NULL | |
+------------------+------+------+-----+---------+-------+
5 rows in set (0.00 sec)
txn_date
๋ text
ํ๋๋ก ํ์์คํฌํ๋ก ์ฌ์ฉํ๋ ค๋ ์ฌ๋์๊ฒ๋ ์๋ฌด ์์ฉ์ด ์์ต๋๋ค.์ฌ๊ธฐ์์
TimestampConverter
์ด ๋์ต๋๋ค. ์ด ์์์๋ ์ ๊ณต๋ ๋ ์ง ๋ฐ ์๊ฐ ํ์ ๋ฌธ์์ด์ ์ฌ์ฉํ์ฌ Kafka Connect๋ฅผ ํต๊ณผํ ๋ ๋ฌธ์์ด์ ํ์์คํฌํ๋ก ์บ์คํ
ํฉ๋๋ค. ๋ํ ์ด๋ฅผ ์ฌ์ฉํ์ฌ ์ํฌํฌ ํ์์คํฌํ ๊ฐ ๊ฐ์ ๋ณํํ๊ณ ๋ฌธ์์ด, ์ํฌํฌ, ๋ ์ง ๋๋ ์๊ฐ(์ค์ ํ์์คํฌํ๋ ํฌํจ)์ ๋์์ผ๋ก ์ง์ ํ ์ ์์ต๋๋ค.curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-01/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day8-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "convertTS,changeTopic",
"transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTS.field" : "txn_date",
"transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
"transforms.convertTS.target.type": "Timestamp",
"transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeTopic.regex" : "(.*)",
"transforms.changeTopic.replacement": "$1_withTS"
}'
๋ค์์ MySQL์ ๊ฒฐ๊ณผ ํ ์ด๋ธ์ ๋๋ค.
mysql> describe `day8-transactions_withTS`;
+------------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+-------------+------+-----+---------+-------+
| customer_remarks | text | YES | | NULL | |
| item | text | YES | | NULL | |
| cost | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
| txn_date | datetime(3) | YES | | NULL | |
+------------------+-------------+------+-----+---------+-------+
5 rows in set (0.00 sec)
์์์ ์ธ๊ธํ๋ฏ์ด
target.type
๋ฅผ ๋ณ๊ฒฝํ์ฌ ํ์์คํฌํ์ ๋ ์ง ๋๋ ์๊ฐ ๊ตฌ์ฑ ์์๋ง ์ถ์ถํ ์๋ ์์ต๋๋ค.๋ ์ง๋ง
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-02/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day8-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "convertTS,changeTopic",
"transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTS.field" : "txn_date",
"transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
"transforms.convertTS.target.type": "Date",
"transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeTopic.regex" : "(.*)",
"transforms.changeTopic.replacement": "$1_withDate"
}'
MySQL์ ๊ฒฐ๊ณผ ํ ์ด๋ธ:
mysql> describe `day8-transactions_withDate`;
+------------------+------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+------+------+-----+---------+-------+
| customer_remarks | text | YES | | NULL | |
| item | text | YES | | NULL | |
| cost | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
| txn_date | date | YES | | NULL | |
+------------------+------+------+-----+---------+-------+
5 rows in set (0.01 sec)
mysql> select txn_date from `day8-transactions_withDate` LIMIT 5;
+------------+
| txn_date |
+------------+
| 2020-01-04 |
| 2020-01-04 |
| 2019-12-29 |
| 2020-01-01 |
| 2019-12-29 |
+------------+
5 rows in set (0.00 sec)
์๊ฐ๋ง
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-03/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day8-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "convertTS,changeTopic",
"transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTS.field" : "txn_date",
"transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
"transforms.convertTS.target.type": "Time",
"transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeTopic.regex" : "(.*)",
"transforms.changeTopic.replacement": "$1_withTime"
}'
MySQL์ ๊ฒฐ๊ณผ ํ ์ด๋ธ:
mysql> describe `day8-transactions_withTime`;
+------------------+---------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+---------+------+-----+---------+-------+
| customer_remarks | text | YES | | NULL | |
| item | text | YES | | NULL | |
| cost | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
| txn_date | time(3) | YES | | NULL | |
+------------------+---------+------+-----+---------+-------+
5 rows in set (0.00 sec)
mysql> select txn_date from `day8-transactions_withTime` LIMIT 5;
+--------------+
| txn_date |
+--------------+
| 14:05:19.000 |
| 14:09:11.000 |
| 19:18:25.000 |
| 03:22:06.000 |
| 09:57:44.000 |
+--------------+
5 rows in set (0.00 sec)
์ ๋์ค ์๋
์ ๋์ค ์๋๋ฅผ ์์ฑํ ์๋ ์์ต๋๋ค.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-jdbc-mysql-day8-04/config \
-d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:mysql://mysql:3306/demo",
"connection.user" : "mysqluser",
"connection.password" : "mysqlpw",
"topics" : "day8-transactions",
"tasks.max" : "4",
"auto.create" : "true",
"auto.evolve" : "true",
"transforms" : "convertTS,changeTopic",
"transforms.convertTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convertTS.field" : "txn_date",
"transforms.convertTS.format" : "EEE MMM dd HH:mm:ss zzz yyyy",
"transforms.convertTS.target.type": "unix",
"transforms.changeTopic.type" : "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeTopic.regex" : "(.*)",
"transforms.changeTopic.replacement": "$1_withUnixEpoch"
}'
MySQL์ ๊ฒฐ๊ณผ ํ ์ด๋ธ:
mysql> describe `day8-transactions_withUnixEpoch`;
+------------------+--------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+------------------+--------+------+-----+---------+-------+
| customer_remarks | text | YES | | NULL | |
| item | text | YES | | NULL | |
| cost | text | YES | | NULL | |
| card_type | text | YES | | NULL | |
| txn_date | bigint | YES | | NULL | |
+------------------+--------+------+-----+---------+-------+
5 rows in set (0.00 sec)
mysql> select txn_date from `day8-transactions_withUnixEpoch` LIMIT 5;
+---------------+
| txn_date |
+---------------+
| 1577973919000 |
| 1577714951000 |
| 1577819905000 |
| 1577762526000 |
| 1577786264000 |
+---------------+
5 rows in set (0.00 sec)
์ ๋์ค ์ํฌํฌ(bigint)์ ํ์์คํฌํ๊ฐ ์์ค๋ก ์๋ ๊ฒฝ์ฐ
TimestampConverter
๋ฅผ ์ฌ์ฉํ์ฌ ํ์์คํฌํ/๋ ์ง/์๊ฐ ๋ฐ ๋ฌธ์์ด๋ก๋ ์ธ ์ ์์ต๋๋ค. ํ์๋ฅผ ์ํํ๋ฉด format
๊ตฌ์ฑ์ด ๋ค์์ ์ ์ฉ๋ฉ๋๋ค. ๋ฌธ์์ด์ด ๊ธฐ๋ก๋ ํ์์
๋๋ค.์ค์ฒฉ ํ๋์ ํ์์คํฌํ ์ก์ธ์ค
๋ถํํ๋
TimestampConverter
๋ ๋ฃจํธ ์์ค ์์์์๋ง ์๋ํฉ๋๋ค. ๋ค๋ฅธ ํ๋์ ์ค์ฒฉ๋ ํ์์คํฌํ ํ๋์๋ ์ฌ์ฉํ ์ ์์ต๋๋ค. ๋จผ์ Flatten
์ ์ฌ์ฉํ๊ฑฐ๋ ๊ณ ์ ํ ๋ณํ์ ์์ฑํด์ผ ํฉ๋๋ค.๊ทธ๊ฒ์ ๋ฐ์ผ๋ก ์๋!
Docker Compose๋ฅผ ํฌํจํ์ฌ ์ด๋ฅผ ์๋ํ๊ธฐ ์ํ ์ ์ฒด ์ฝ๋๋ฅผ ์ฐพ์ ์ ์์ผ๋ฏ๋ก ๋ก์ปฌ ์์คํ ์์ ์คํํ ์ ์์ต๋๋ค๐พ here.
Reference
์ด ๋ฌธ์ ์ ๊ดํ์ฌ(๐ 12์ผ ๊ฐ์ SMT ๐ - 8์ผ ์ฐจ: TimestampConverter), ์ฐ๋ฆฌ๋ ์ด๊ณณ์์ ๋ ๋ง์ ์๋ฃ๋ฅผ ๋ฐ๊ฒฌํ๊ณ ๋งํฌ๋ฅผ ํด๋ฆญํ์ฌ ๋ณด์๋ค https://dev.to/rmoff/twelve-days-of-smt-day-8-timestampconverter-1bh4ํ ์คํธ๋ฅผ ์์ ๋กญ๊ฒ ๊ณต์ ํ๊ฑฐ๋ ๋ณต์ฌํ ์ ์์ต๋๋ค.ํ์ง๋ง ์ด ๋ฌธ์์ URL์ ์ฐธ์กฐ URL๋ก ๋จ๊ฒจ ๋์ญ์์ค.
์ฐ์ํ ๊ฐ๋ฐ์ ์ฝํ ์ธ ๋ฐ๊ฒฌ์ ์ ๋ (Collection and Share based on the CC Protocol.)
์ข์ ์นํ์ด์ง ์ฆ๊ฒจ์ฐพ๊ธฐ
๊ฐ๋ฐ์ ์ฐ์ ์ฌ์ดํธ ์์ง
๊ฐ๋ฐ์๊ฐ ์์์ผ ํ ํ์ ์ฌ์ดํธ 100์ ์ถ์ฒ ์ฐ๋ฆฌ๋ ๋น์ ์ ์ํด 100๊ฐ์ ์์ฃผ ์ฌ์ฉํ๋ ๊ฐ๋ฐ์ ํ์ต ์ฌ์ดํธ๋ฅผ ์ ๋ฆฌํ์ต๋๋ค