[ 스키마의 개념 ]
카프카에서는 토픽으로 전송되는 메세지에 대해 미리 형식을 정의한 후 전송함으로써
DB 스키마와 같은 효과를 얻을 수 있습니다.
스키마를 통해 데이터를 consume하는 여러 부서가 있다면
스키마를 통해 데이터에 대한 정확한 정의 및 데이터 변경여부를 알려 줄 수 있습니다.
또한 데이터 형식을 벗어난 데이터 필드에 대해 미리 파악 할 수 있습니다.
[ 스키마 레지스트리 ]
카프카에서 스키마를 활용하는 방법은 스키마 레지스트리라는를 애플리케이션을 이용하는 것 입니다.
아키텍처
스키마 레지스트리는 카프카와 별도로 구성된 독립적인 애플리케이션으로,
프로듀서 / 컨슈머와 직접 통신합니다.
[ 에이프로(Avro) 포맷 ]
스키마 레지스트리가 지원하는 대표적인 데이터 포맷은 에이브로(Avro)이며
데이터 교환을 도화주는 오픈소스 직렬화 시스템입니다.
빠른 바이너리 포맷을 지원하며 JSON형태의 스키마를 정의할 수 있습니다.
[ 스키마 레지스트리 API ]
HTTP 기반으로 통신이 이루어지며 주요 기능들에 대해 API를 제공합니다.
옵션 | 설명 |
GET /schemas | 스키마 레지스트리에 등록된 전체 스키마 리스트 조회 |
GET /schemas/ids/id | 스키마 ID로 조회 |
GET /schemas/ids/id/versions | 스키마 ID 버전 |
GET /subjects | 스키마 레지스트리에 등록된 subject 리스트 ( subject는 토픽이름-key, 토픽이름-value 형태 ) |
GET /subjects/{subject name}/versions | 특정 subject 버전 리스트 조회 |
GET /config | 전역으로 설정된 호환성 레벨 조회 |
GET /config/{subject name} | subject에 설정된 호환성 조회 |
DELETE /subjects/{subject name} | 특정 subject 전체 삭제 |
DELETE /subjects/{subject name}/version/{version name} | 특정 subject에서 특정 version 삭제 |
[ 스키마 레지스트리 호환성 ]
호환성(Compatibility) 설정은 새로운 스키마를 등록하거나 기존 스키마를 수정할 때,
데이터의 호환성을 유지하도록 설정하는 규칙입니다.
BACKWARD 호환성
- 상위 버전의 스키마를 컨슈머에게 적용한 뒤 프로듀에서게 나중에 적용합니다.
- 컨슈머는 최신 스키마를 적용 할 필요없이, 이전 스키마로 데이터를 처리할 수 있습니다.
- 프로듀서는 새로운 스키마로 데이터를 작성하더라도, 컨슈머가 이전 스키마를 사용하여 데이터를 읽을 수 있어야 합니다.
- 추가된 필드(address)가 있어도 소비자에게 영향을 미치지 않습니다.
사용 사례
- 시스템이 데이터를 읽는 컨슈머 중심으로 설계된 경우일 때
- 컨슈머 스키마를 변경하기 어렵거나, 업데이트 주기가 길 때
예시
이전 스키마: { "name": "string", "age": "int" }
새로운 스키마: { "name": "string", "age": "int", "address": "string" }
새로운 필드(address)는 추가되었지만, 기본값을 제공하여 이전 스키마와 호환.
컨슈머는 이전 스키마를 사용하며, name과 age만 인식합니다.
추가된 필드 address는 무시됩니다.
결과적으로 컨슈머는 이전 스키마를 아래와 같은 데이터를 처리
{ "name": "Alice", "age": 30 }
FORWARD 호환성
- 상위 버전의 스키마를 프로듀에서게 먼저 적용한 뒤 컨슈머에게 나중에 적용합니다.
- 프로듀서는 이전 스키마를 사용할 필요 없이, 새로운 스키마로 데이터를 생성합니다.
- 컨슈머가 최신 스키마로 데이터를 처리할 수 있어야 합니다.
- 새롭게 추가된 필드는 기본값을 통해 처리합니다.
사용 사례
- 시스템이 데이터를 생성하는 프로듀서 중심으로 설계된 경우
- 프로듀서와 컨슈머가 별도로 개발되고, 소비자가 업데이트 주기가 짧을 때.
예시
이전 스키마: { "name": "string", "age": "int", "address": "string" }
새로운 스키마: { "name": "string", "age": "int" }
새로운 스키마에서 address 필드가 제거되었지만, 기존 데이터와 호환 가능.
컨슈머는 새로운 스키마를 사용하며, 추가된 필드address를 기본값을 할당.
기본값 예: "address": "unknown"
결과적으로 컨슈머는 아래와 같은 데이터를 처리:
{ "name": "Alice", "age": 30, "address": "unknown" }
FULL 호환성
- BACKWARD 와 FORWARD 호환성 모두 지원합니다.
- 새로운 스키마는 이전 스키마로 작성된 데이터를 읽을 수 있어야 합니다.
- 이전 스키마는 새로운 스키마로 작성된 데이터를 읽을 수 있어야 합니다.
사용 사례
- 프로듀서와 컨슈머가가 서로 다른 스키마를 사용할 수 있지만, 양방향 호환성을 보장해야 하는 경우
- 시스템의 모든 구성 요소가 항상 호환되어야 할 때.
- 프로듀서와 컨슈머가 동작하고, 양쪽 모두의 안정성을 유지해야 할 때
비교 요약
[ 예제 실행 ]
샘플 스키마 데이터
student.avro : 에이브로를 활용한 학생 명단에 대한 스키마 정의 파일
{"namespace": "student.avro",
"type": "record",
"doc": "This is an example of Avro.",
"name": "Student",
"fields": [
{"name": "name", "type": "string", "doc": "Name of the student"},
{"name": "class", "type": "int", "doc": "Class of the student"}
]
}
- namespace - 이름을 식별하는 문자열
- type - avro는 record, enums, arrays, maps 등의 타입을 지원
- doc - 설명, 주석
- name - 레코드의 이름을 나타내는 문자열
- fields - Json배열 데이터
스키마 레지스트리 설치
주키퍼와 카프카가 설치되어있는 상태에서 스키마 레지스트리 파일을 다운받습니다.
sudo wget http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz -O /opt/confluent-community-6.1.0.tar.gz
sudo tar zxf /opt/confluent-community-6.1.0.tar.gz -C /usr/local/
sudo ln -s /usr/local/confluent-6.1.0 /usr/local/confluent
스키마 레지스트리 설정
schema-registry.properties
vi /usr/local/confluent/etc/schema-registry/schema-registry.properties
>>>
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://kafka_01.com:9092,kafka_02.com:9092,kafka_03.com:9092
kafkastore.topic=_schemas
schema.compatibility.level=full
- listeners - 스키마 레지스트리에서 사용할 TCP 포트를 8081로 지정
- kafkastore.bootstrap.servers - 연동할 카프카 주소 입력
- kafkastore.topic - 스키마 히스토리 및 데이터 저장 토픽의 이름 지정
- schema.compatibillity.level - 스키마 호환성 레벨을 full로 설정 (BACKWARD, FORWARD, FULL)
브로커의 _schemas 토픽이 스키마 레지스트리의 저장소로 활용되며
스키마 관리 목적으로 사용되는 메세지들은 순서가 중요하여 파티션 수는 항상 1 입니다.
systemd 등록
schema-registry.service
[Unit]
Description=schema registry
After=network.target
[Service]
Type=simple
ExecStart=/usr/local/confluent/bin/schema-registry-start /usr/local/confluent/etc/schema-registry/schema-registry.properties
Restart=always
[Install]
WantedBy=multi-user.target
등록 실행
sudo vi /etc/systemd/system/schema-registry.service
sudo systemctl daemon-reload
sudo systemctl start schema-registry
# 스키마 레지스트리 설정 정보 가져오기
curl -X GET http://kafka_01.com:8081/config
>>>
{"compatibillityLevel":"FULL"}
메세지 전송 후 스키마 적용 확인
- 가상 환경에 필요한 모듈 설치
python -m venv venv
source venv/bin/activate
pip install confluent-kafka[avro]
- producer_avro.py
#https://github.com/confluentinc/confluent-kafka-python
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema_str = """
{"namespace": "student.avro",
"type": "record",
"doc": "This is an example of Avro.",
"name": "Student",
"fields": [
{"name": "name", "type": ["null", "string"], "default": null, "doc": "Name of the student"},
{"name": "class", "type": "int", "default": 1, "doc": "Class of the student"}
]
}
"""
value_schema = avro.loads(value_schema_str)
value = {"name": "Peter", "class": 1} # 전송할 메시지
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
avroProducer = AvroProducer({
'bootstrap.servers': 'kafka_01.com,kafka_02.com,kafka_03.com',
'on_delivery': delivery_report,
'schema.registry.url': 'http://kafka_01.com:8081'
}, default_value_schema=value_schema)
avroProducer.produce(topic='kafka-avro2', value=value)
avroProducer.flush()
- consumer_avro.py
#https://github.com/confluentinc/confluent-kafka-python
from confluent_kafka import avro
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
value_schema_str = """
{"namespace": "student.avro",
"type": "record",
"doc": "This is an example of Avro.",
"name": "Student",
"fields": [
{"name": "name", "type": ["null", "string"], "default": null, "doc": "Name of the student"},
{"name": "class", "type": "int", "default": 1, "doc": "Class of the student"}
]
}
"""
value_schema = avro.loads(value_schema_str)
c = AvroConsumer({
'bootstrap.servers': 'kafka_01.com,kafka_02.com,kafka_03.com',
'group.id': 'python-groupid01',
'auto.offset.reset': 'earliest',
'schema.registry.url': 'http://kafka_01.com:8081'},reader_value_schema=value_schema)
c.subscribe(['kafka-avro2'])
while True:
try:
msg = c.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue
print(msg.value())
c.close()
- 파일 실행
# 메세지 전송 ( Schema Registry가 실행 중인 서버 )
python producer_avro.py
>>> Message delivered to kafka-avro2 [0]
# 메세지 확인
python consumer_avro.py
>>> {'name': 'Peter', 'class': 1}
- 스키마 적용 확인
curl http://kafka_01.com:8081/schemas | python -m json.tool
>>>
...
[
{
"subject": "kafka-avro2-value",
"version": 1,
"id": 1,
"schema": "{
\"type\":\"record\",
\"name\":\"Student\",
\"namespace\":\"student.avro\",
\"doc\":\"This is an example of Avro.\",
\"fields\":[
{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Name of the student\",\"default\":null},
{\"name\":\"class\",\"type\":\"int\",\"doc\":\"Class of the student\",\"default\":1}
]
}"
}
]
'DataPipeline > Kafka' 카테고리의 다른 글
kafka - connector 구성요소 (0) | 2025.03.13 |
---|---|
실전 카프카 - 11장 카프카 커넥트 (0) | 2025.02.05 |
실전 카프카 - 6장 컨슈머 동작원리와 구현 (0) | 2024.12.31 |
실전 카프카 - 5장 프로듀서 동작원리와 구현 (0) | 2024.12.25 |
실전 카프카 - 4장 내부 동작 원리와 구현 (0) | 2024.12.01 |