debzium
Kafka Connect
๊ฐ์ข ๋ฐ์ดํฐ ์์ค(DB, searchEngine, cache, metric ...)์ kafka๋ฅผ ์ฐ๊ฒฐํด์ฃผ๋ ์ผ์ข ์ ๋งค๊ฐ์ฒด. Connect์๋ Connector๋ค์ ๋ฑ๋กํ์ฌ ์ฌ๋ฌ ๋ฐ์ดํฐ์์ค์ ์ฐ๊ฒฐํ ์ ์์ผ๋ฉฐ ์คํ์์ค์ connector๋ค์ด ๋ง์ด ์กด์ฌํ๋ค.
Producer์ญํ ์ ์ํํ์ฌ ๋ฉ์์ง๋ฅผ ๋ฐํํ๋ connector๋ฅผ source connector
๋ผ๊ณ ํ๋ฉฐ, consumer ์ญํ ์ ์ํํ๋ connector๋ฅผ link connector
๋ผ๊ณ ํ๋ค.
debezium
source connector์ค ์คํ์์ค ํ๋ก์ ํธ๋ก ๊ฐ์ข db์ ๋ณ๊ฒฝ์ฌํญ์ ์บก์ฒํ์ฌ ์ฒ๋ฆฌํ ์ ์๋๋ก ํด์ฃผ๋ ๋ถ์ฐ ์๋น๋ก ๋ฐ์ดํฐ ๋ณ๊ฒฝ์ changed event stream์ ๊ธฐ๋กํ์ฌ ์ด ๋ณ๊ฒฝ ์ด๋ฒคํธ๋ฅผ ์์๋๋ก ์ฝ๊ฒ ๋๋ค.
MySQL๊ฐ์ ๊ฒฝ์ฐ binlog์ ์ ๊ทผํ๊ณ postgres์ ๊ฒฝ์ฐ logical replication stream์ ์ ๊ทผํ๋ค.
Kafka์ connector๋ก ๋ฑ๋ก์ ํ ์๋ ์์ง๋ง Embedded Engine
์ ์ฌ์ฉํ๋ฉด ์ปค๋ฅํฐ๋ฅผ ์ฌ์ฉํ์ง ์๊ณ ์๋ฐ ์ ํ๋ฆฌ์ผ์ด์
๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ก์จ ์ฌ์ฉ์ด ๋์ด ๋ณ๊ฒฝ์ด๋ฒคํธ๋ฅผ ๋ฐ๋ก consuming ํ๊ฑฐ๋ ๋ค๋ฅธ ๋ฉ์์ง ๋ธ๋ก์ปค์๊ฒ ์ ๋ฌ๋ ๊ฐ๋ฅํ๋ค.
์คํ
1. debezium ์ค์น
debezium ๊ณต์ ํํ์ด์ง์์ debezium์ ๋ค์ด๋ฐ์ ์์ถํด์ ํด์ค๋ค.
2. connect์ plugin์ผ๋ก ๋ฑ๋ก
kafka์ค์น ๊ฒฝ๋ก\config\connect-distributed.properties
ํ์ผ์ ์ด์ด plugin.path์ debezium ์ค์น ๊ฒฝ๋ก๋ฅผ ์ถ๊ฐํด์ค๋ค.
plugins.path=C:\kafka-connect-plugins\
์ด๋ connect-distributed ๋ถ์ฐํ๊ฒฝ์ ์ ํฉํ ๋ชจ๋์ด๋ฉฐ connect-stadalone๋ชจ๋๋ 1๊ฐ์ ํ๋ก์ธ์ค๋ก ๋์ํ๋ connect์ด๋ค. connect ํ๋ก์ธ์ค๊ฐ ๊ตฌ์ฑ์ ์ ์ฅํ ์์น, ์์ ํ ๋น ๋ฐ์ , ์คํ์ ๋ฐ ์์ ์กฐ๊ฐ ์ ์ฅ ์์น๋ฅผ ๊ฒฐ์ ํ๋ ๋ฐฉ๋ฒ์ด ๋ฌ๋ผ์ง๊ฒ ๋๊ธฐ ๋๋ฌธ์ ์คํํ ๋ชจ๋์ ๋ง๋ ์ค์ ํ์ผ์ ์์ ํ๋ฉด๋๋ค.
3. kafka connect ์คํ
bin\windows\connect-distributed.bat -daemon config\connect-distributed.properties
default ํฌํธ๋ฒํธ๋ 8083๋ฒ.
4. connector ํ์ธ
# kafka connect ์คํํ์ธ
curl -s "http://ip:8083"
# ์ฌ์ฉํ ์ ์๋ ํ๋ฌ๊ทธ์ธ ํ์ธ
curl -X GET -s "http://ip:8083/connector-plugins"
# kafka connector ํ์ธ
curl -X GET -s "http://ip:8083/connectors"
์ ๋ช ๋ น์ด๋ curl์ ์ฌ์ฉํด๋ ๋์ง๋ง postman์ผ๋ก ํ์ธํ๋ค๋ฉด ์๋ต์ ๋ ์ด์๊ฒ ๋ณผ ์ ์๋ค.
ํ๋ฌ๊ทธ์ธ์ ์ฐ๋ฆฌ๊ฐ ์ค์นํ debezium์ด ์์ด์ผ ํ๋ฉฐ, connector์๋ ์์ง ์๋๊ฒ์ด ์ ์์ด๋ค.
5. DB ์ ๊ทผ๊ถํ ์ค์
MySQL๊ธฐ์ค์ผ๋ก binlog์ debezium์ด ์ ๊ทผ์ด ๊ฐ๋ฅํด์ผํ๊ธฐ ๋๋ฌธ์ ์ด๋ฅผ ์ํ ์ค์ ์ ํด์ฃผ์ด์ผ ํ๋ค.
1. log_bin ํ์ฑํ
show variables like 'log_bin';
์ ๋ณ์๊ฐ on
์ํ์ฌ์ผ ํ๋ค.
๋ง์ผ off๋ผ๋ฉด mysql ์คํ์ --log-bin=ON
๊ณผ ๊ฐ์ ์ต์
์ผ๋ก ์ฃผ๊ฑฐ๋ my.cnf
์ mysqld ์น์
์ ์ต์
์ ์ค์ ํด์ฃผ๋ฉด ๋๋ค.
2. binlog level์ด low-level
show variables like 'binlog_format'; # ROW ์ฌ์ผ ํ๋ค.
3. binlog์ ๊ทผ ๊ถํ ๊ณ์ ์์ฑ
grant select, reload, super, replication slave, replication client on *.* to ๊ณ์ ์์ด๋@'%' identified by '๋น๋ฐ๋ฒํธ';
flush privileges;
4. connector ์์ฑ
echo '
{
"name" : "mysql-kafka-connect",
"config":{
"connector.class" : "io.debezium.connecotr.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port" : "3306",
"database.user" : "db ๊ณ์ ์์ด๋",
"database.password" : "๋น๋ฐ๋ฒํธ",
"database.server.name" : "์๋ฒ ์ด๋ฆ",
"database.history.kafka.bootstrap.servers" : "localhost:9092",
"database.history.kafka.topic" : "topic ์ด๋ฆ",
"include.schema.changes" : "true" | "false",
"database.whitelist" : "์ ๊ทผํ table ๋ช
",
"database.serverTimezone" : "Asia/Seoul"
}
}
'|curl -X POST -d @- http://localhost:8083/connecotrs --header "content-Type:application/json"
์ ํ์์ postman์ ์ด์ฉํด body๋ก ์ ๋ฌํ๋ฉด ์ข๋ ํธํ๊ฒ ์์ฒญ์ ๋ณด๋ผ ์ ์์ผ๋ฉฐ ๋๋ง์ ์ต์ ์ด ์กด์ฌํ๋๋ฐ ์ด๋ ๊ณต์ํํ์ด์ง์ ์น์ ํ๊ฒ ์ค๋ช ์ด ๋ชจ๋ ๋์์๋ค.
# kafka connector ํ์ธ
curl -X GET -s "http://ip:8083/connectors"
์ ์์ ์ผ๋ก connector๊ฐ ๋ฑ๋ก๋์๋ค๋ฉด ์ ๋ช ๋ น์ผ๋ก ์์ฑํ connector๋ฅผ ํ์ธ ํด๋ณผ ์ ์๋ค.
๋ฒ์ธ.
# kafka connector ์ค์ ํ์ธ
curl -X GET -s "http://ip:8083/connectors/test-connector/config"
# kafka connector ์ํํ์ธ
curl -X GET -s "http://ip:8083/connectors/test-connector/status"
# kafka connector ์ญ์
curl -X DELETE -s "http://ip:8083/connectors/test-connector"
Last updated