debzium

Kafka Connect

๊ฐ์ข… ๋ฐ์ดํ„ฐ ์†Œ์Šค(DB, searchEngine, cache, metric ...)์™€ kafka๋ฅผ ์—ฐ๊ฒฐํ•ด์ฃผ๋Š” ์ผ์ข…์˜ ๋งค๊ฐœ์ฒด. Connect์—๋Š” Connector๋“ค์„ ๋“ฑ๋กํ•˜์—ฌ ์—ฌ๋Ÿฌ ๋ฐ์ดํ„ฐ์†Œ์Šค์™€ ์—ฐ๊ฒฐํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ ์˜คํ”ˆ์†Œ์Šค์˜ connector๋“ค์ด ๋งŽ์ด ์กด์žฌํ•œ๋‹ค.

Producer์—ญํ• ์„ ์ˆ˜ํ–‰ํ•˜์—ฌ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•˜๋Š” connector๋ฅผ source connector๋ผ๊ณ  ํ•˜๋ฉฐ, consumer ์—ญํ• ์„ ์ˆ˜ํ–‰ํ•˜๋Š” connector๋ฅผ link connector๋ผ๊ณ  ํ•œ๋‹ค.

debezium

source connector์ค‘ ์˜คํ”ˆ์†Œ์Šค ํ”„๋กœ์ ํŠธ๋กœ ๊ฐ์ข… db์˜ ๋ณ€๊ฒฝ์‚ฌํ•ญ์„ ์บก์ฒ˜ํ•˜์—ฌ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•ด์ฃผ๋Š” ๋ถ„์‚ฐ ์„œ๋น„๋กœ ๋ฐ์ดํ„ฐ ๋ณ€๊ฒฝ์„ changed event stream์— ๊ธฐ๋กํ•˜์—ฌ ์ด ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๋ฅผ ์ˆœ์„œ๋Œ€๋กœ ์ฝ๊ฒŒ ๋œ๋‹ค.

MySQL๊ฐ™์€ ๊ฒฝ์šฐ binlog์— ์ ‘๊ทผํ•˜๊ณ  postgres์˜ ๊ฒฝ์šฐ logical replication stream์— ์ ‘๊ทผํ•œ๋‹ค.

Kafka์˜ connector๋กœ ๋“ฑ๋ก์„ ํ•  ์ˆ˜๋„ ์žˆ์ง€๋งŒ Embedded Engine์„ ์‚ฌ์šฉํ•˜๋ฉด ์ปค๋„ฅํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ณ  ์ž๋ฐ” ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋กœ์จ ์‚ฌ์šฉ์ด ๋˜์–ด ๋ณ€๊ฒฝ์ด๋ฒคํŠธ๋ฅผ ๋ฐ”๋กœ consuming ํ•˜๊ฑฐ๋‚˜ ๋‹ค๋ฅธ ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค์—๊ฒŒ ์ „๋‹ฌ๋„ ๊ฐ€๋Šฅํ•˜๋‹ค.

์‹คํ–‰

1. debezium ์„ค์น˜

debezium ๊ณต์‹ ํ™ˆํŽ˜์ด์ง€์—์„œ debezium์„ ๋‹ค์šด๋ฐ›์•„ ์••์ถ•ํ•ด์ œ ํ•ด์ค€๋‹ค.

2. connect์˜ plugin์œผ๋กœ ๋“ฑ๋ก

kafka์„ค์น˜ ๊ฒฝ๋กœ\config\connect-distributed.properties ํŒŒ์ผ์„ ์—ด์–ด plugin.path์— debezium ์„ค์น˜ ๊ฒฝ๋กœ๋ฅผ ์ถ”๊ฐ€ํ•ด์ค€๋‹ค.

plugins.path=C:\kafka-connect-plugins\

์ด๋•Œ connect-distributed ๋ถ„์‚ฐํ™˜๊ฒฝ์— ์ ํ•ฉํ•œ ๋ชจ๋“œ์ด๋ฉฐ connect-stadalone๋ชจ๋“œ๋Š” 1๊ฐœ์˜ ํ”„๋กœ์„ธ์Šค๋กœ ๋™์ž‘ํ•˜๋Š” connect์ด๋‹ค. connect ํ”„๋กœ์„ธ์Šค๊ฐ€ ๊ตฌ์„ฑ์„ ์ €์žฅํ•  ์œ„์น˜, ์ž‘์—…ํ• ๋‹น ๋ฐ”์—…, ์˜คํ”„์…‹ ๋ฐ ์ž‘์—… ์กฐ๊ฐ ์ €์žฅ ์œ„์น˜๋ฅผ ๊ฒฐ์ •ํ•˜๋Š” ๋ฐฉ๋ฒ•์ด ๋‹ฌ๋ผ์ง€๊ฒŒ ๋˜๊ธฐ ๋•Œ๋ฌธ์— ์‹คํ–‰ํ•  ๋ชจ๋“œ์— ๋งž๋Š” ์„ค์ •ํŒŒ์ผ์„ ์ˆ˜์ •ํ•˜๋ฉด๋œ๋‹ค.

3. kafka connect ์‹คํ–‰

bin\windows\connect-distributed.bat -daemon config\connect-distributed.properties

default ํฌํŠธ๋ฒˆํ˜ธ๋Š” 8083๋ฒˆ.

4. connector ํ™•์ธ

# kafka connect ์‹คํ–‰ํ™•์ธ
curl -s "http://ip:8083"

# ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ํ”Œ๋Ÿฌ๊ทธ์ธ ํ™•์ธ
curl -X GET -s "http://ip:8083/connector-plugins"

# kafka connector ํ™•์ธ
curl -X GET -s "http://ip:8083/connectors"

์œ„ ๋ช…๋ น์–ด๋Š” curl์„ ์‚ฌ์šฉํ•ด๋„ ๋˜์ง€๋งŒ postman์œผ๋กœ ํ™•์ธํ•œ๋‹ค๋ฉด ์‘๋‹ต์„ ๋” ์ด์˜๊ฒŒ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

ํ”Œ๋Ÿฌ๊ทธ์ธ์— ์šฐ๋ฆฌ๊ฐ€ ์„ค์น˜ํ•œ debezium์ด ์žˆ์–ด์•ผ ํ•˜๋ฉฐ, connector์—๋Š” ์•„์ง ์—†๋Š”๊ฒƒ์ด ์ •์ƒ์ด๋‹ค.

5. DB ์ ‘๊ทผ๊ถŒํ•œ ์„ค์ •

MySQL๊ธฐ์ค€์œผ๋กœ binlog์— debezium์ด ์ ‘๊ทผ์ด ๊ฐ€๋Šฅํ•ด์•ผํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์ด๋ฅผ ์œ„ํ•œ ์„ค์ •์„ ํ•ด์ฃผ์–ด์•ผ ํ•œ๋‹ค.

1. log_bin ํ™œ์„ฑํ™”

show variables like 'log_bin';

์œ„ ๋ณ€์ˆ˜๊ฐ€ on์ƒํƒœ์—ฌ์•ผ ํ•œ๋‹ค.

๋งŒ์ผ off๋ผ๋ฉด mysql ์‹คํ–‰์‹œ --log-bin=ON๊ณผ ๊ฐ™์€ ์˜ต์…˜์œผ๋กœ ์ฃผ๊ฑฐ๋‚˜ my.cnf์˜ mysqld ์„น์…˜์— ์˜ต์…˜์„ ์„ค์ •ํ•ด์ฃผ๋ฉด ๋œ๋‹ค.

2. binlog level์ด low-level

show variables like 'binlog_format';  # ROW ์—ฌ์•ผ ํ•œ๋‹ค.

3. binlog์ ‘๊ทผ ๊ถŒํ•œ ๊ณ„์ • ์ƒ์„ฑ

grant select, reload, super, replication slave, replication client on *.* to ๊ณ„์ •์•„์ด๋””@'%' identified by '๋น„๋ฐ€๋ฒˆํ˜ธ';

flush privileges;

4. connector ์ƒ์„ฑ

echo '
{
    "name" : "mysql-kafka-connect",
    "config":{
        "connector.class" : "io.debezium.connecotr.mysql.MySqlConnector",
        "database.hostname": "localhost",
        "database.port" : "3306",
        "database.user" : "db ๊ณ„์ •์•„์ด๋””",
        "database.password" : "๋น„๋ฐ€๋ฒˆํ˜ธ",
        "database.server.name" : "์„œ๋ฒ„ ์ด๋ฆ„",
        "database.history.kafka.bootstrap.servers" : "localhost:9092",
        "database.history.kafka.topic" : "topic ์ด๋ฆ„",
        "include.schema.changes" : "true" | "false",
        "database.whitelist" : "์ ‘๊ทผํ•  table ๋ช…",
        "database.serverTimezone" : "Asia/Seoul"
    }
}
'|curl -X POST -d @- http://localhost:8083/connecotrs --header "content-Type:application/json"

์œ„ ํ˜•์‹์„ postman์„ ์ด์šฉํ•ด body๋กœ ์ „๋‹ฌํ•˜๋ฉด ์ข€๋” ํŽธํ•˜๊ฒŒ ์š”์ฒญ์„ ๋ณด๋‚ผ ์ˆ˜ ์žˆ์œผ๋ฉฐ ๋”๋งŽ์€ ์˜ต์…˜์ด ์กด์žฌํ•˜๋Š”๋ฐ ์ด๋Š” ๊ณต์‹ํ™ˆํŽ˜์ด์ง€์— ์นœ์ ˆํ•˜๊ฒŒ ์„ค๋ช…์ด ๋ชจ๋‘ ๋‚˜์™€์žˆ๋‹ค.

# kafka connector ํ™•์ธ
curl -X GET -s "http://ip:8083/connectors"

์ •์ƒ์ ์œผ๋กœ connector๊ฐ€ ๋“ฑ๋ก๋˜์—ˆ๋‹ค๋ฉด ์œ„ ๋ช…๋ น์œผ๋กœ ์ƒ์„ฑํ•œ connector๋ฅผ ํ™•์ธ ํ•ด๋ณผ ์ˆ˜ ์žˆ๋‹ค.

๋ฒˆ์™ธ.

# kafka connector ์„ค์ •ํ™•์ธ
curl -X GET -s "http://ip:8083/connectors/test-connector/config"

# kafka connector ์ƒํƒœํ™•์ธ
curl -X GET -s "http://ip:8083/connectors/test-connector/status"

# kafka connector ์‚ญ์ œ
curl -X DELETE -s "http://ip:8083/connectors/test-connector"

Last updated