Kafka - CDC Connector Debezium 예제
[ 데비지움 ( Debezium ) 이란? ]
변경 데이터 캡처(Change Data Capture, CDC)로,
데이터베이스의 변경 사항을 실시간으로 추적하고 이를 스트림 형태로 전달하는 오픈 소스 분산 플랫폼입니다.
주로 CDC 이벤트를 Kafka로 topic으로 전달하는데 사용하며, Sink connector로 설정됩니다.
다른 방법으로 적용할 수도 있습니다.
Debezium connector는 데이터베이스의 변경 로그(binlog, WAL 등)를
모니터링하여 실시간으로 변경 사항을 감지합니다.
[ 예제 with Docker ]
고려사항 ( Consideration )
Debezium 튜토리얼에서는 Docker를 활용하여
ZooKeeper, Kafka, Debezium, MySQL 등의 서비스를 각각의 컨테이너로 실행합니다.
각 서비스 별로 터미널을 여러개 열어서 독립적으로 실행하며,
설정의 단순화를 위해 데이터를 컨테이너 내부에 저장하여
컨테이너가 종료되면 모든 데이터가 사라집니다.
순서
각 순서마다 새로운 터미널 창을 열어 실행합니다.
- Start Zookeeper
- Start Kafka
- Start a MySQL database
- Start a MySQL command line client
- Start Kafka Connect
Start Zookeeper
docker run -it --rm --name zookeeper \
-p 2181:2181 -p 2888:2888 -p 3888:3888 \
quay.io/debezium/zookeeper:3.0
-p 2181:2181 -p 2888:2888 -p 3888:3888 :
다른 컨테이너가 ZooKeeper와 통신할 수 있도록
컨테이너의 포트 중 3개를 도커 호스트의 동일한 포트에 매핑합니다.
Start Kafka
docker run -it --rm --name kafka \
-p 9092:9092 \
--link zookeeper:zookeeper \
quay.io/debezium/kafka:3.0
--link zookeeper:zookeeper:
컨테이너에 동일한 도커 호스트에서 실행 중인 ZooKeeper와 연결합니다.
Start a MySQL database
데비지움이 CDC할 수 있는 대상인 Mysql을 설치 합니다.
( mysql 8.2버전의 debezium/examplge-mysql image )
docker run -it --rm --name mysql \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=debezium \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw \
quay.io/debezium/example-mysql:3.0
Debezium MySQL 커넥터에 필요한 최소 권한을 가진 사용자 및 비밀번호를 만듭니다.
Start a MySQL command line client
MySQL을 시작한 후 MySQL 명령줄 클라이언트를 시작하여 샘플 인벤토리 데이터베이스에 액세스합니다.
docker run -it --rm --name mysqlterm \
--link mysql mysql:8.2 \
sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
Docker가 링크된 컨테이너의 네트워크 정보를 자동으로 환경 변수로 설정합니다.
($MYSQL_PORT_3306_TCP_ADDR / $MYSQL_PORT_3306_TCP_PORT / $MYSQL_ENV_MYSQL_ROOT_PASSWORD)
데이터 확인
Start Kafka Connect
docker run -it --rm --name connect \
-p 8083:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e STATUS_STORAGE_TOPIC=my_connect_statuses \
--link kafka:kafka \
--link mysql:mysql \
quay.io/debezium/connect:3.0
--link kafka:kafka --link mysql:mysql :
kafka connect는 mysql container와 kafka container 둘다 연결합니다.
새로운 터미널을 열고,
Kafka Connect REST API 호출하여 Connect 상태를 체크합니다.
# response 값을 통해 running 중인지 확인
curl -H "Accept:application/json" localhost:8083
{"version":"3.9.0","commit":"a60e31147e6b01ee","kafka_cluster_id":"QbARhdY9ZSA"}%
# connect 실행은 했지만 등록되지 않을 상태
curl -H "Accept:application/json" localhost:8083/connectors/
[]%
Debezium MySQL connector를 등록합니다.
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" localhost:8083/connectors/ \
-d '{ "name": "inventory-connector", "config": { \
"connector.class": "io.debezium.connector.mysql.MySqlConnector", \
"tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", \
"database.user": "debezium", "database.password": "dbz", \
"database.server.id": "184054", "topic.prefix": "dbserver1", \
"database.include.list": "inventory", \
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092", \
"schema.history.internal.kafka.topic": "schemahistory.inventory" } \
}'
inventory-connector 가 등록되었는지 확인합니다.
curl -H "Accept:application/json" localhost:8083/connectors/
# 출력 값
["inventory-connector"]%
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector
# 출력 값
HTTP/1.1 200 OK
Date: Thu, 28 Nov 2024 22:20:57 GMT
Content-Type: application/json
Content-Length: 544
Server: Jetty(9.4.56.v20240826)
# 컨넥터 정보 확인
{
"name":"inventory-connector",
"config":{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"database.user":"debezium",
"topic.prefix":"dbserver1",
"schema.history.internal.kafka.topic":"schemahistory.inventory",
"database.server.id":"184054",
"tasks.max":"1",
"database.hostname":"mysql",
"database.password":"dbz",
"name":"inventory-connector",
"schema.history.internal.kafka.bootstrap.servers":"kafka:9092",
"database.port":"3306",
"database.include.list":"inventory"
},
"tasks":[{"connector":"inventory-connector","task":0}],
"type":"source"
}%
이벤트 변경 확인
테이블 데이터를 변경함으로써,
MySQL connector가 변경되는 이벤트를 어떻게 캡처하는지 확인합니다.
새로운 터미널을 열어 watch 컨테이너를 추가합니다.
docker run -it --rm --name watcher \
--link zookeeper:zookeeper \
--link kafka:kafka quay.io/debezium/kafka:3.0 watch-topic \
-a -k dbserver1.inventory.customers
watch-topic이 고객데이터를 저장하는 customers table의 이벤트를 추적할 수 있게합니다.
해당 컨테이너가 실행 후, customers table의 row들을 event로 추적되는 것을 로그에서 확인 할 수 있습니다.
- 실행된 container들
Update Event
database에서 update event를 생성합니다.
mysql> UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
Query OK, 1 row affected (0.05 sec)
Rows matched: 1 Changed: 1 Warnings: 0
watcher 터미널에서 추가되는 event를 확인합니다.
schema와 payload 부분을 확인합니다.
"schema": {...},
"payload": {
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"name": "3.0.2.Final",
"name": "dbserver1",
"server_id": 223344,
"ts_sec": 1486501486,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 364,
"row": 0,
"snapshot": null,
"thread": 3,
"db": "inventory",
"table": "customers"
},
"op": "u",
"ts_ms": 1486501486308,
"ts_us": 1486501486308910,
"ts_ns": 1486501486308910814
}
}
Restarting the Kafka Connect service
데비지움은 오프라인 상태에서도 변경 이벤트를 캡처하며,
재시작이후에 중지했던 부분부터 event를 다시 추적합니다.
kafka connect 중지
docker stop connect
create event 생성
INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");
kafka connect 재시작
docker run -it --rm --name connect \
-p 8083:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e STATUS_STORAGE_TOPIC=my_connect_statuses \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql quay.io/debezium/connect:3.0
다시 실행 후, 로그 확인
{
"schema": {
...
},
"payload": {
"before": null,
"after": {
"id": 1005,
"first_name": "Sarah",
"last_name": "Thompson",
"email": "kitt@acme.com"
},
"source": {
"version": "3.0.3.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1732833432000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"ts_us": 1732833432000000,
"ts_ns": 1732833432000000000,
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 793,
"row": 0,
"thread": 8,
"query": null
},
"transaction": null,
"op": "c",
"ts_ms": 1732833528945,
"ts_us": 1732833528945100,
"ts_ns": 1732833528945100800
}
}
참조 :