[ 기초 개념 ]
리플리케이션 Replication
토픽 생성 명령 중 replication-factor 2 이라는 옵션을 설정하면
원본을 포함한 복제본이 총 2개가 있다는 뜻입니다.
리플리케이션 팩터 수가 커지면 안정성은 높아지지만
브로커 리소스를 많이 사용하게 됩니다.
파티션 Partition
하나의 토픽이 한 번에 처리할 수 있는 처리량을 늘리기 위해
토픽을 여러 개로 나눠 병렬 처리가 가능하게 만든 것을 파티션이라고 합니다.
나뉜 파티션 수만큼 컨슈머를 연결할 수 있습니다.
파티션은 언제든 늘릴 수 있지만 줄일 수는 없으므로,
컨슈머의 LAG ( 프로듀서 메세지 수 - 컨슈머 메세지 가져간 수)가 0이 되도록 설정하는 것이 좋습니다.
참조 - 파티션과 컨슈머 할당에 대한 글
세그먼트 Segment
프로듀서 의해 브로커로 전송된 메세지는 토픽의 파티션에 저장되며
각 메세지들은 세그먼트라는 로그 파일의 형태로 로컬 디스크에 저장됩니다.
각 파티션 마다 N개의 세그먼트 로그 파일들이 존재합니다.
# peter-overview01 토픽 생성 이후
cd /data/kafka-logs/peter-overview01-0
ls
>>>
000000000000000000000.index
000000000000000000000.log
000000000000000000000.timendex
# 메세지 내용 확인
xxd 000000000000000000000.log
[ 핵심 개념 ]
페이지 캐시
페이지 캐시는 물리 메모리 중 일부 잔여 메모리를 활용하여,
디스크 I/O에 대한 접근이 줄어들어 성능을 높일 수 있습니다.
배치 전송 처리
수많은 통신을 묶어서 처리할 수 있으며,
단건 통신보다 네트워크 오버헤드 측면에서 효율적으로 처리할 수 있습니다.
압축 전송
높은 압축률이 필요하면 gzip,
빠른 응답속도가 필요하면 snappy를 권장합니다.
오프셋
파티션의 메세지가 저장되는 위치를 오프셋이라고 부르며
순차적으로 증가하는 숫자(64비트 정수)로 되어있습니다,.
[ 프로듀서 동작 ]
프로듀서 디자인
ProducerRecord라고 표시된 부분은 실제 데이터이며, Partition / Key는 선택사항입니다.
각 레코드들은 send() 메소드를 통해 시리얼라이저와 파티셔너를 거치게 됩니다.
시리얼라이저는 데이터를 직렬화하여 Kafka가 이해할 수 있는 바이트 배열 형식으로 변환합니다.
메시지 키(key)와 값(value) 모두에 대해 적용됩니다.
파티셔너는 파티션을 맵핑하며 기본적으로 라운드 로빈 방식으로 동작합니다.
ProducerRecord에서 파티션을 지정했으면 파티셔너는 실행되지 않습니다.
프로듀서 내부에서는 send() 메소드 이후에 레코드들을 잠시 모아두는데 배치전송을 하기 위함입니다.
전송이 실패하면 재시도가 이루어집니다.
재시도가 지정된 횟수이상이면 실패, 전송이 성공하면 메타데이터를 리턴합니다.
프로듀서 주요 옵션
옵션 | 설명 |
bootstrap.servers | 클라이언트가 카프카 클러스터에 처음 연결하기 위한 호스트와 포트 정보를 나타냅니다. |
acks | 프로듀서가 카프카 토픽의 리더 측에 메시지를 전송한 후 요청이 완료하기를 결정하는 옵션 - 0 : 일부 메시지 손실 대신 빠른 전송 - 1 : 리더가 메시지를 받았는지 확인, 모든 팔로워를 전부 확인하지 않음 - all (-1) : 팔로워가 메시지를 받았는지 여부 확인, 느리나 하나의 팔로워가 있다면 메시지는 손실되지 않음 |
buffer.memory | 프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기할 수 있는 전체 메모리 바이트 |
compression.type | 프로듀서가 메시지 전송 시 선택할 수 있는 압축 타입 none, gzip, snappy, lz4, zstd 등 원하는 타입 선택 |
enable.idempotence | 설정을 true로 할 경우 중복 없는 전송이 가능 max.in.flight.requests.per.connection = 5 이하, retries = 0 이상, acks = all로 지정해야 함 |
max.in.flight.requests.per.connection | 하나의 커넥션에서 프로듀서가 최대한 ACK 없이 전송할 수 있는 요청 수 메시지의 순서가 중요하다면 1로 설정할 것을 권장하지만 성능은 다소 떨어짐 |
retries | 일시적인 오류로 인해 전송에 실패한 데이터를 다시 보내는 횟수 |
batch.size | 프로듀서는 동일한 파티션으로 보내는 여러 데이터를 함께 배치로 보내려고 시도 |
linger.ms | 배치 형태의 메시지를 보내기 전에 추가적인 메시지를 위해 기다리는 시간을 조정 배치 크기에 도달하지 못한 상황에서 linger.ms 제한 시간에 도달했을 때 메시지를 전송 |
transactional.id | '정확히 한 번 전송'을 위해 사용하는 옵션 enable.idempotence=true로 설정되어 있어야 함 |
[ 컨슈머 동작 ]
기본 동작
파티션 수보다 컨슈머 수가 많게 구현하는 것은 처리량을 높이는 것이 아니라
많은 수의 컨슈머들이 대기 상태로 존재할 수 있기에 바람직하지 않습니다.
또한 컨슈머 장애 시, 컨슈머 그룹 내에서 리밸런싱 동작을 통해
장애가 발생한 컨슈머의 역할을 동일한 그룹에 있는 다른 컨슈머가 수행하므로
굳이 장애 대비를 위해 추가 할 필요가 없습니다.
주요 옵션
옵션 | 설명 |
bootstrap.servers | 브로커의 정보를 입력합니다. |
fetch.min.bytes | 한 번에 가져올 수 있는 최소 데이터 크기 지정 크기보다 작을 경우 누적될 때 까지 기다림 |
group.id | 컨슈머가 속한 그룹 식별자 |
heartbeat.interval.ms | 컨슈머 상태를 지속적으로 확인 session.timeout.ms 보다 낮은 값으로 설정 (일반적으로 1/3) |
max.partition.fetch.bytes | 파티션당 가져올 수 있는 최대 크기 |
session.timeout.ms | 컨슈머 종료를 판단 해당 시간 전까지 하트비트를 보내지 않았다면 컨슈머종료로 간주하고 컨슈머그룹에서 제외 후 리밸런싱을 시작 |
enable.auto.commit | 백그라운드로 주기적으로 오프셋을 커밋 |
auto.offset.reset | 초기 오프셋이 없거나 현재 오프셋이 더 이상 존재하지 않을 경우 reset 기능 - earliest : 가장 초기의 오프셋값으로 설정 - lastest : 가장 마지막의 오프셋값으로 설정 - none : 이전 오프셋값을 찾지 못하면 에러 표기 |
fetch.max.bytes | 한 번의 가져오기 요청으로 가져올 수 있는 최대 크기 |
group.instance.id | 컨슈머의 고유한 식별자 설정한다면 static 멤버로 간주되어 불필요한 리밸런싱을 하지 않음 |
isolation.level | 트랜잭션 컨슈머에서 사용하는 옵션 - read_uncommitted : 기본값으로 모든 메세지를 읽음 - read_committed : 트랜잭션이 완료된 메세지만 읽음 |
max.poll.records | 한 번의 poll() 요청으로 가져오는 최대 메세지 수 |
partition.assignment.strategy | 파티션 할당 전략, 기본값 range |
fetch.max.wait.ms | fetch.min.bytes에 의해 설정된 데이터보다 적은 경우, 응답 최대 시간 |
consumer의 경우 fetch.min.bytes와 fetch.max.bytes, fetch.max.wait.ms를 조정하여
브로커의 리소스를 효율적으로 사용가능하게 한다.
'DataPipeline > Kafka' 카테고리의 다른 글
실전 카프카 - 5장 프로듀서 동작원리와 구현 (0) | 2024.12.25 |
---|---|
실전 카프카 - 4장 내부 동작 원리와 구현 (0) | 2024.12.01 |
Kafka - CDC Connector Debezium 예제 (0) | 2024.11.29 |
Apache Kafka - Docker를 통한 환경 셋팅 (0) | 2023.04.28 |
Apache Kafka - Simple Python Code (0) | 2023.03.22 |