[ 프로젝트 목적 아키텍처 ]
이 프로젝트는 카프카(Kafka) 기반의 분산 메시징 시스템을 구축하고 운영하고 있는,
아자르(Azar) 비즈니스 메트릭을 생성하는 프로세스를 흉내내어본 것입니다.
Git 참조 : https://github.com/ehdrn3020/kafka_system_with_azar
Azar 참조 : https://hyperconnect.github.io/2022/10/14/grafana-with-ksqlDB.html
- AWS 환경에서 Kafka 클러스터 구성
- Zookeeper를 통한 분산 코디네이션
- Schema Registry를 활용한 데이터 스키마 관리
- Kafka Connect를 활용한 데이터 파이프라인 구성
- ElasticSearch(OpenSearch)를 통한 데이터 저장 및 검색
아키텍처는 3대의 서버로 구성된 Kafka 클러스터를 중심으로,
데이터 생성부터 저장까지의 전체 파이프라인을 Ansible을 통해 자동화하여 구축할 수 있도록 설계되었습니다.

[ 파일 구조 ]
kafka_system_with_azar/
├── .git/ # Git 저장소 정보
├── .idea/ # IDE 설정 파일
├── README.md # 프로젝트 설명 및 사용법
├── schema_registry/ # 스키마 레지스트리 관련 코드
│ ├── opensearch_sink_example.py # OpenSearch 싱크 예제
│ ├── producer_avro.py # Avro 형식 데이터 생성자
│ ├── consumer_avro.py # Avro 형식 데이터 소비자
│ ├── README.md # 스키마 레지스트리 설명
│ └── single_mode/ # 단일 모드 구성 파일
├── ansible/ # Ansible 자동화 스크립트
│ ├── inventory/ # 서버 인벤토리 정보
│ ├── group_vars/ # 그룹 변수 설정
│ ├── roles/ # Ansible 역할 정의
│ ├── connector.yml # Connector 설치 플레이북
│ ├── opensearch.yml # OpenSearch 설치 플레이북
│ ├── schema_registry.yml # 스키마 레지스트리 설치 플레이북
│ ├── kafka.yml # Kafka 설치 플레이북
│ └── zookeeper.yml # Zookeeper 설치 플레이북
├── setting_aws/ # AWS 환경 설정 스크립트
│ └── setup_server.sh # EC2 서버 설정 스크립트
[ 시스템 설정 ]
AWS Server Setting
### .env 파일 생성
- setting_aws/env_example 참조하여 생성
### keypair.pem 키 생성
- ec2 접속을 위해 keypair.pem 키를 setting_aws 폴더에 생성
- 파일 권한 수정 : sudo chmod 600 setting_aws/keypair.pem
### EC2 서버 실행
```commandline
sh setting_aws/setup_server.sh server_1
sh setting_aws/setup_server.sh server_2
sh setting_aws/setup_server.sh server_3
```
### scp keypair.pem
```commandline
scp -i setting_aws/keypair.pem setting_aws/keypair.pem ec2-user@server_1_ip:~
```
### SSH 접속
```commandline
ssh -i setting_aws/keypair.pem ec2-user@server_1_ip
```
### group_var host 관련 수정
```commandline
inventory/hosts 파일의 ansible_host 변수 수정
git push
cd /home/ec2-user/kafka_system_with_azar/
git pull ( server_1 에서 실행 )
```
Zookeeper Setting
### zookeeper 설치
```commandline
cd /home/ec2-user/kafka_system_with_azar/ansible/
ansible-playbook -i inventory/hosts zookeeper.yml
```
### zookeeper 실행 확인
```commandline
systemctl status zookeeper
cat /data/zookeeper/myid
```
Kafka Setting
### kafka 설치
```commandline
ansible-playbook -i inventory/hosts kafka.yml
```
### kafka 실행 확인
```commandline
# server_1에서 토픽생성 ( 자동토픽생성(Auto Topic Creation)으로 토픽 생성 생략가능 )
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server kafka_01.com:9092 --create --topic test-overview01 --partitions 1 --replication-factor 3
# server_2에서 consumer 실행
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka_01.com:9092 --topic test-overview01
# server_1 producer로 메세지 전송
/usr/local/kafka/bin/kafka-console-producer.sh --bootstrap-server kafka_02.com:9092 --topic test-overview01
# server_2에서 전송 된 메세지 확인
```
Schema Registery
# Schema Registry
### 설치
```commandline
cd ~
sudo wget http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz -O /opt/confluent-community-6.1.0.tar.gz
sudo tar zxf /opt/confluent-community-6.1.0.tar.gz -C /usr/local/
sudo ln -s /usr/local/confluent-6.1.0 /usr/local/confluent
```
### 설정
```commandline
vi /usr/local/confluent/etc/schema-registry/schema-registry.properties
프로젝트의 schema_registry/schema-registry.properties 참조하여 업데이트
```
### 실행
```commandline
sudo vi /etc/systemd/system/schema-registry.service
프로젝트의 schema_registry/schema-registry.properties 참조하여 업데이트
sudo systemctl daemon-reload
sudo systemctl start schema-registry
sudo systemctl status schema-registry
```
### 호환성 확인
```commandline
curl -X GET http://kafka_01.com:8081/config
>>> 출력 값
{"compatibilityLevel":"FULL"}
```
### 예시 - python 파일을 통해 메세지 전송
```commandline
# 가상환경에 필요한 모듈 설치
cd /home/ec2-user/kafka_system_with_azar/schema_registry
python -m venv venv
source venv/bin/activate
pip install confluent-kafka[avro]
# 모듈 설치시 호환성
- 해당 예제는 confluent-kafka==2.8.0 설치하여 librdkafka 1.8.2 이상의 버전이 필요합니다.
- confluent-kafka Python라이브러리는 librdkafka를 래핑(wrapping)한 라이브러리입니다.
- librdkafka는 Apache Kafka 브로커와 통신하는 역할을 하며, Kafka 브로커의 버전과 호환성이 있습니다.
- python3.9 이상에서는 librdkafka 1.x.x 이상이 설치되지만, python3.7은 librdkafka 0.11.x 버전이 설치됩니다.
- librdkafka 0.11.x 버전은 confluent-kafka 1.0.0 이하와 호환되므로, 아래 py파일의 코드가 실행되지 않을 수 있습니다.
# 메세지 전송 ( Schema Registry가 실행 중인 서버 )
python producer_avro.py
>>> Message delivered to kafka-avro2 [0]
# 메세지 확인
python consumer_avro.py
>>> {'name': 'Peter', 'class': 1}
```
### 스키마 적용 확인
```commandline
curl http://kafka_01.com:8081/schemas | python -m json.tool
>>>
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 375 100 375 0 0 16769 0 --:--:-- --:--:-- --:--:-- 17045
[
{
"subject": "kafka-avro2-value",
"version": 1,
"id": 1,
"schema": "{
\"type\":\"record\",
\"name\":\"Student\",
\"namespace\":\"student.avro\",
\"doc\":\"This is an example of Avro.\",
\"fields\":[
{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"doc\":\"Name of the student\",\"default\":null},
{\"name\":\"class\",\"type\":\"int\",\"doc\":\"Class of the student\",\"default\":1}
]
}"
}
]
```
위 내용을 ansible을 통해 자동화
```
# 설치, 설정파일 수정 및 실행
ansible-playbook -i inventory/hosts schema_registry.yml
# 데몬 확인
sudo systemctl status schema-registry
# 호환성 확인
curl -X GET http://kafka_01.com:8081/config
>>> 출력 값
{"compatibilityLevel":"FULL"}
```
ElasticSearch
### opensearch 설치
```commandline
# kafka01 호스트에 싱글 노드로 설치
ansible-playbook -i inventory/hosts opensearch.yml
```
### opensearch 확인
```commandline
sudo systemctl status opensearch
# 클러스터 내 각 노드의 정보
curl -X GET "http://kafka_01.com:9200/_cat/nodes?v"
# 클러스터의 전체 상태(Health) 를 조회
curl -X GET "http://kafka_01.com:9200/_cluster/health?pretty"
```
Connector Sink
### 커넥터 설치
```commandline
ansible-playbook -i inventory/hosts connector.yml
```
### 커넥터 확인
```commandline
# 실행 확인
sudo systemctl status kafka-connect
# 에러시 로그 확인
journalctl -u kafka-connect -f
# 클러스터에 현재 등록된 커넥터 목록을 확인
curl http://localhost:8083/connectors | python -m json.tool
# 커넥터 플러그인 확인
curl http://localhost:8083/connector-plugins | jq
[
{
"class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"type": "sink",
"version": "3.1.1"
},
...]
```
### 토픽 생성
```commandline
# 생성
/usr/local/kafka/bin/kafka-topics.sh --create \
--bootstrap-server kafka_01.com:9092,kafka_02.com:9092,kafka_03.com:9092 \
--replication-factor 3 \
--partitions 3 \
--topic opensearch-sink
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka_01.com:9092,kafka_02.com:9092,kafka_03.com:9092
# 확인
/usr/local/kafka/bin/kafka-topics.sh --list --bootstrap-server kafka_01.com:9092
```
### 컨넥터 등록
```commandline
# API로 opensearch sink connector 등록
curl -X POST http://kafka_01.com:8083/connectors -H "Content-Type: application/json" -d '{
"name": "opensearch-sink",
"config": {
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"tasks.max": "1",
"topics": "opensearch-sink",
"connection.url": "http://kafka_01.com:9200",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://kafka_01.com:8081",
"value.converter.schema.registry.url": "http://kafka_01.com:8081",
"schema.registry.url": "http://kafka_01.com:8081",
"value.converter.schemas.enable": "false",
"schema.ignore": "true",
"type.name": "kafka-connect"
}
}'
# 등록 확인
curl http://localhost:8083/connectors | python -m json.tool
...
[
"opensearch-sink"
]
# 상태 확인
curl -X GET http://kafka_01.com:8083/connectors/opensearch-sink/status | jq
# 커넥터 삭제
curl -X DELETE http://localhost:8083/connectors/opensearch-sink
```
[ kafka 데이터 전송 ]
```commandline
# 가상환경에 필요한 모듈 설치 ( kafka_01.com 호스트에서 실행 )
cd /home/ec2-user/kafka_system_with_azar/schema_registry
python -m venv venv
source venv/bin/activate
pip install confluent-kafka[avro]
# 메세지 전송 ( Schema Registry가 실행 중인 서버 )
python opensearch_sink_example.py
>>> Message delivered to kafka-avro2 [0]
```
### schema 등록확인
```commandline
# subjects list 확인
curl http://kafka_01.com:8081/subjects
>>> ["opensearch-sink-value"]
# subject 버전 확인
curl http://kafka_01.com:8081/subjects/opensearch-sink-value/versions
>>> [1]
```
### opensearch index store 확인
```commandline
curl -X GET "http://kafka_01.com:9200/_cat/indices?v"
curl -X GET "http://kafka_01.com:9200/opensearch-sink*/_search?pretty"
```
'사이드프로젝트' 카테고리의 다른 글
| SideProject - logstash를 통해 데이터 수집 후 API 제공 (0) | 2024.12.16 |
|---|