기본적으로 REST API는 8083 포트에서 사용할 수 있으며 보안되지 않습니다. ( 인증 기능 추가 가능 )
REST API는 모든 요청 본문이 콘텐츠 유형 application/json 을 사용할 것으로 예상하며
모든 응답도 해당 콘텐츠 유형을 사용하여 보냅니다.
목차
- 커넥터 생성 및 삭제하기
- 커넥터 구성
- 커넥터의 수명 주기 제어
- 커넥터 오프셋 나열
- 문제 디버깅
커넥터 생성 및 삭제
클러스터의 버전을 확인
$ curl localhost:8083
{
"version": "3.5.0",
"commit": "c97b88d5db4de28d",
"kafka_cluster_id": "PSCn87RpRoqhfjAs9KYtuw"
}
사용할 수 있는 커넥터 플러그인 확인
기본적으로 여기에는 Kafka Connect 클러스터에 설치된 소스 및 싱크 커넥터 플러그인만 나열됩니다.
$ curl localhost:8083/connector-plugins
[{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "3.5.0"
}, {
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "3.5.0"
}, {
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "3.5.0"
}]
실행 중인 커넥터 확인
$ curl localhost:8083/connectors
[]
커넥터 생성
# 예: PUT 요청:
$ curl -X PUT -H "Content-Type: application/json" \
-d "@sink-config.json" \
localhost:8083/connectors/file-sink/config
{
"name":"file-sink",
"config":{
"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max":"1",
"topics":"topic-to-export",
"file":"/tmp/sink.out",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"name":"file-sink"
},
"tasks":[],"type":"sink"
}
sink-config.json 파일
{
"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "topic-to-export",
"file": "/tmp/sink.out",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
생성 확인
$ curl localhost:8083/connectors
["file-sink"]
쿼리 매개변수를 통해 자세히 확인
- GET /connectors?expand=info 을 호출하면 구성 및 모든 작업 등이 나열
- GET /connectors?expand=status 은 커넥터의 상태 및 관련 작업을 확인
$ curl "localhost:8083/connectors?expand=status&expand=info"
{
"file-sink": {
"status": {
"name": "file-sink",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.1.110:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "192.168.1.110:8083"
}
],
"type": "sink"
},
"info": {
"name": "file-sink",
"config": {
"connector.class":
"org.apache.kafka.connect.file.FileStreamSinkConnector",
"file": "/tmp/sink.out",
"tasks.max": "1",
"topics": "topic-to-export",
"name": "file-sink",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
},
"tasks": [
{
"connector": "file-sink",
"task": 0
}
],
"type": "sink"
}
}
}
특정 커넥터 상태 확인
GET /connectors/<CONNECTOR>/status
특정 커넥터 태스크 확인
GET /connectors/<CONNECTOR>/tasks
커넥터와 연동되는 토픽 확인
Kafka Connect 파이프라인의 관리자는 특정 커넥터가 어떤 토픽과 상호 작용했는지 파악하는 것이 유용할 수 있습니다.
이를 돕기 위해 Kafka Connect는 커넥터가 상호 작용한 모든 토픽을 검색할 수 있는 메커니즘을 제공합니다.
예를 들어, 방금 만든 file-sink 커넥터의 경우, 이 커넥터가 상호작용한 단일 토픽을 볼 수 있습니다.
$ curl localhost:8083/connectors/file-sink/topics
{
"file-sink": {
"topics": [
"topic-to-export"
]
}
}
커넥터와 토픽 연동 해제
Kafka Connect가 이전 상호 작용을 '잊어버리도록' 할 수 있습니다:
$ curl localhost:8083/connectors/file-sink/topics
{
"file-sink": {
"topics": [
"topic-to-export"
]
}
}
주제 목록을 다시 확인하면 이제 비어 있는 것을 볼 수 있습니다:
$ curl localhost:8083/connectors/file-sink/topics
{
"file-sink": {
"topics": []
}
}
커넥터 삭제
커넥터가 삭제되면 그 아래의 모든 작업도 제거됩니다.
그러나 커넥터의 오프셋은 재설정되지 않으므로 같은 이름의 새 커넥터가 만들어지면 해당 오프셋을 사용하여 읽기를 시도하게 됩니다.
이를 방지하려면 커넥터를 삭제한 후에는 커넥터의 오프셋을 재설정하는 것이 가장 좋습니다.
$ curl -X DELETE localhost:8083/connectors/file-sink
커넥터 및 작업 구성
플러그인 설정 확인
REST API는 특정 플러그인의 구성 설정을 나열하고 검증하는 데 도움이 되는 옵션을 제공합니다.
예를 들어 다음과 같이 FileStreamSinkConnector 에 대한 구성 설정 목록을 얻을 수 있습니다:
$ curl localhost:8083/connector-plugins/org.apache.kafka.connect.file.FileStreamS
inkConnector/config
[
{
"name": "file", 1
"type": "STRING", 2
"required": false, 3
"default_value": null, 4
"importance": "HIGH", 5
"documentation": "Destination filename. If not specified, the standard output
will be used", 6
"group": null, 7
"width": "NONE", 8
"display_name": "file", 9
"dependents": [], 10
"order": -1 11
}
]
1 - 이 구성 설정의 이름입니다.
2 - 구성 값의 예상 유형, BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD 중 하나입니다.
3 - 이 구성 값이 필요한지 여부를 나타냅니다.
4 - 기본값, required 이 true 인 경우 기본값은 null 입니다.
5 - 구성 설정의 중요도 수준입니다. HIGH , MEDIUM, LOW 중 하나입니다.
6 - 구성 설정에 대한 정보입니다.
7 - 이 구성 설정이 어느 group 에 속하는지를 나타냅니다. 플러그인은 자체 설정에 대해 자체 그룹을 도입할 수 있습니다.
8 - 구성 설정의 width. NONE , SHORT, MEDIUM, LONG 중 하나.
9 - 구성 설정의 표시 이름(이름과 일치할 수 있음)입니다.
10 - 이 설정에 의존하는 다른 구성 설정 목록입니다.
11 - 구성 값의 정수 순서 번호(설정하지 않은 경우 -1 )입니다.
플러그인 설정 유효성 검사
사용할 구성을 결정한 후에는 PUT 요청을 보내 유효성을 검사할 수 있습니다.
구성에서 커넥터 이름이 누락된 경우 FileStreamSinkConnector 의 예를 살펴보겠습니다.
$ curl -X PUT -H "Content-Type: application/json" \
-d '{"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector"
, "tasks.max": "1", "topics": "sink-topic"}' \
localhost:8083/connector-plugins/org.apache.kafka.connect.file.FileStreamSinkCo
nnector/config/validate
{
"name": "org.apache.kafka.connect.file.FileStreamSinkConnector", 1
"error_count": 1, 2
"groups": [ 3
"Common",
"Transforms",
"Predicates",
"Error Handling"
],
"configs": [
{
"definition": { 4
"name": "name",
"type": "STRING",
"required": true,
"default_value": null,
"importance": "HIGH",
"documentation": "Globally unique name to use...",
"group": "Common",
"width": "MEDIUM",
"display_name": "Connector name",
"dependents": [],
"order": 1
},
"value": {
"name": "name", 5
"value": null, 6
"recommended_values": [], 7
"errors": [ 8
"Missing required configuration \"name\" which has no default value."
],
"visible": true 9
}
},
...
]
}
1 - 플러그인을 제공하는 클래스의 이름입니다.
2 - 제공된 구성의 유효성을 검사하는 동안 발견된 오류의 수입니다.
3 - 구성 설정에 있는 그룹이 반환됩니다.
4 - 이 구성 설정의 정의입니다. 엔드포인트의 출력과 일치합니다. /connector-plugins/<CONNECTOR_PLUGIN>/config 엔드포인트의 출력과 일치합니다.
5 - 이 구성 설정의 이름입니다.
6 - 구성 설정에 제공된 값(제공되지 않은 경우 null )입니다.
7 - 제공된 다른 구성 값을 고려할 때 구성 설정에 유효한 값입니다.
8 - 오류가 없는 경우 빈 배열, 또는 이 값이 구성 설정에 허용되지 않는 이유에 대한 오류 메시지 배열입니다.
9 - 이 구성 값을 나열할지 여부를 나타냅니다.
커넥터의 수명 주기 제어
실패 커넥터 재실행
특정 커넥터에서 실패한 모든 작업을 다시 시작할 수 있습니다
$ curl -X POST "localhost:8083/connectors/file-source/restart?includeTasks=true&onlyFailed=true"
커넥터 일시중지(PAUSED) 와 중지(STOPPED)
실행 중인 커넥터를 일시 중지하거나 중지할 수도 있습니다.
이 기능은 커넥터의 데이터 흐름을 일시적으로 중지하고 나중에 중단한 지점부터 다시 시작하고 싶을 때 유용합니다.
외부 시스템에 과부하가 걸리는 경우, config 업데이트를 적용할 때 그 시간 동안 트래픽을 원하지 않는 경우 등에 사용할 수 있습니다.
PAUSED 와 STOPPED 의 차이점은
STOPPED 상태에서는 모든 작업이 종료되어 리소스를 사용하지 않는다는 것입니다.
즉, STOPPED 상태보다 PAUSED 상태에서 커넥터가 다시 시작되는 데 걸리는 시간이 더 짧습니다.
실행 중인 커넥터를 일시 중지
$ curl -X PUT localhost:8083/connectors/file-sink/pause
이제 상태 엔드포인트에 커넥터와 모든 작업이 PAUSED 상태로 표시됩니다.
$ curl localhost:8083/connectors/file-sink/status
{
"name": "file-sink",
"connector": {
"state": "PAUSED",
"worker_id": "192.168.1.110:8083"
},
"tasks": [
{
"id": 0,
"state": "PAUSED",
"worker_id": "192.168.1.110:8083"
}
],
"type": "sink"
}
커넥터를 다시 시작하려면 비슷한 명령을 사용하되 pause 대신 resume 를 사용합니다.
$ curl -X PUT localhost:8083/connectors/file-sink/resume
커넥터 및 작업의 상태가 RUNNING 상태로 돌아갑니다.
커넥터를 완전히 중지
$ curl -X PUT localhost:8083/connectors/file-sink/stop
이제 상태 엔드포인트에 커넥터가 STOPPED 상태로 표시됩니다.
$ curl localhost:8083/connectors/file-sink/status
{
"name": "file-sink",
"connector": {
"state": "STOPPED",
"worker_id": "192.168.1.110:8083"
},
"tasks": [],
"type": "sink"
}
일단 중지되면 PUT /connectors/<CONNECTOR>/resume 엔드포인트를 사용하여 RUNNING 상태로 돌아갑니다.
커넥터 오프셋 나열
특정 커넥터에 대한 오프셋을 나열 ( 소스 커넥터 )
이 엔드포인트의 응답 형식은 소스 및 싱크 커넥터에 따라 다릅니다.
커밋된 오프셋이 없는 커넥터의 엔드포인트를 호출하면 빈 목록이 반환됩니다.
$ curl localhost:8083/connectors/file-sink/offsets
{
"offsets": [
{
"partition": {
"kafka_partition": 0, 1
"kafka_topic": "topic-to-export" 2
},
"offset": {
"kafka_offset": 3 3
}
}
]
}
1 - 이 예제에서는 커넥터에 단일 파티션인 0 이 있습니다.
2 - 커넥터가 topic-to-export 토픽의 레코드를 소비하고 있습니다.
3 - 커넥터가 오프셋까지 소모되었습니다 3.
특정 커넥터에 대한 오프셋을 나열 ( 싱크 커넥터 )
소스 커넥터의 출력은 약간 다르며 특정 커넥터에 따라 다릅니다.
예를 들어 FileStreamSource 커넥터를 실행하면 다음과 같은 내용이 표시될 수 있습니다.
$ curl localhost:8083/connectors/file-source/offsets
{
"offsets": [
{
"partition": {
"filename": "/tmp/source.txt"
},
"offset": {
"position": 41
}
}
]
}
offsets, partition, offset 키는 모든 소스 커넥터에 공통이지만,
partition 및 offset JSON 객체 내부의 내용은 개별 커넥터에 의해 결정됩니다.
FileStreamSource 커넥터는 파티션을 {"filename": "/path/to/file"} 으로,
오프셋을 {"position": <BYTES>} 으로 저장하며, 여기서 <BYTES> 은 파일에서 읽은 바이트 수입니다.
문제 디버깅
Kafka Connect의 문제를 조사하고 디버깅할 때는 런타임 또는 커넥터의 로그를 사용하는 것이 중요합니다.
Kafka Connect는 관리자가 런타임에 logger 수준을 보고 업데이트할 수 있도록
/admin 아래에 몇 개의 엔드포인트를 노출합니다.
현재 logger 레벨 확인
$ curl localhost:8083/admin/loggers
{
"org.apache.zookeeper": {
"level": "ERROR"
},
"org.reflections": {
"level": "ERROR"
},
"root": {
"level": "INFO"
}
}
logger 레벨 변경
가장 낮은 수준부터 가장 자세한 수준까지 유효한 로그 수준은 FATAL, ERROR, WARN, INFO, DEBUG, TRACE 입니다.
$ curl -X PUT -H "Content-Type: application/json" \
-d '{"level": "DEBUG"}' \
localhost:8083/admin/loggers/org.apache.kafka.connect.mirror
[
"org.apache.kafka.connect.mirror",
"org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"org.apache.kafka.connect.mirror.MirrorSourceConnector"
]
logger를 다시 나열하면 방금 추가한 새 로거를 볼 수 있습니다.
$ curl localhost:8083/admin/loggers
{
"org.apache.kafka.connect.mirror": {
"level": "DEBUG"
},
"org.apache.kafka.connect.mirror.MirrorCheckpointConnector": {
"level": "DEBUG"
},
"org.apache.kafka.connect.mirror.MirrorSourceConnector": {
"level": "DEBUG"
},
"org.apache.zookeeper": {
"level": "ERROR"
},
"org.reflections": {
"level": "ERROR"
},
"root": {
"level": "INFO"
}
}
문제를 디버깅한 후에는 로그 수준을 INFO 로 다시 변경해야 합니다.
그렇지 않으면 로그가 이러한 메시지로 희석되어 다른 커넥터의 문제를 진단하기 어려울 수 있습니다.
가능하면 root 로그 레벨을 변경하지 말고 항상 보다 구체적인 로거를 구성하는 것이 좋습니다.
'DataPipeline > Kafka' 카테고리의 다른 글
| Kafka Connect - 4장 효과적인 데이터 파이프라인 설계 (0) | 2025.06.30 |
|---|---|
| kafka - connector 구성요소 (0) | 2025.03.13 |
| 실전 카프카 - 11장 카프카 커넥트 (0) | 2025.02.05 |
| 실전 카프카 - 10장 스키마 레지스트리 (0) | 2025.01.30 |
| 실전 카프카 - 6장 컨슈머 동작원리와 구현 (0) | 2024.12.31 |