SideProject - logstash를 통해 데이터 수집 후 API 제공
[ 실행 환경 ]
AWS EC2를 통한 서버 구성
- 유연한 확장성: 필요에 따라 인스턴스 크기를 조정하거나 새 인스턴스를 추가하여 확장할 수 있습니다.
- 빠른 배포: 몇 분 만에 새로운 인스턴스를 생성하여 서버를 시작할 수 있습니다.
- 비용 효율성: 사용한 만큼만 비용을 지불합니다.
Ansible로 배포
- Ansible은 IT 인프라 자동화를 위한 간단하고 강력한 도구이며, 손쉽게 다수의 서버에 배포가 가능합니다.
systemd에 등록하여 어플리케이션 실행
- 간편한 배포를 위해 서비스 파일을 통해 애플리케이션의 실행 명령어, 환경 변수 등을 설정하여 쉽게 관리합니다.
[ 요구 사항 ]
- Generator는 1초마다 아래 스키마의 데이터 생성
Property | Type | Description |
Item_id | string | A, B, C 중 하나 |
created_at | datetime | 데이터 생성 시간 |
- Collector를 통해 생성된 데이터 수집
- database에 데이터 저장
- API를 통해 데이터 조회가능
- GET /api/stats : 아이템들 건수를 기준으로 비율과 랭크를 반환
- GET /api/count : 특정 기간의 아이템 건수를 반환
[ 프로세스 ]
[ 프로젝트 구조 ]
logstash_pipeline
├── data_pipeline/
│ ├── inventory/
│ │ └── hosts.yml # Ansible 인벤토리 파일
│ ├── roles/
│ │ ├── collector/
│ │ │ ├── tasks/
│ │ │ │ └── main.yml # Collector 역할의 작업 정의
│ │ │ ├── templates/
│ │ │ │ ├── collector.conf # Logstash 구성 템플릿
│ │ │ │ └── collector.service # Collector 서비스 템플릿
│ │ ├── database/
│ │ │ ├── tasks/
│ │ │ │ └── main.yml # Database 역할의 작업 정의
│ │ │ ├── templates/
│ │ │ │ └── create.sql # 데이터베이스 초기화 SQL 템플릿
│ │ ├── generator/
│ │ │ ├── package/
│ │ │ │ └── data_creator.py # 데이터 생성 Python 스크립트
│ │ │ ├── tasks/
│ │ │ │ └── main.yml # Generator 역할의 작업 정의
│ │ │ ├── templates/
│ │ │ │ └── generator.service # Generator 서비스 템플릿
│ │ ├── serving/
│ │ │ ├── package/
│ │ │ │ └── fastapi_app.py # FastAPI 애플리케이션 코드
│ │ │ ├── tasks/
│ │ │ │ └── main.yml # Serving 역할의 작업 정의
│ │ │ ├── templates/
│ │ │ │ └── fastapi.service # FastAPI 서비스 템플릿
├── setting_aws/
│ ├── .env # 환경 변수 파일
│ ├── setup_server.sh # AWS 서버 설정 스크립트
│ ├── user_script.sh # 사용자 스크립트
├── README.md # 프로젝트 설명 파일
[ 파이프라인 구현 방법 ]
1) AWS 환경설정
- AWS 계정 설정을 위해 .env 파일 수정합니다.
- .env파일에는 access_key, 생성할 인스턴스 유형, 보안그룹 등의 정보가 저장되어있습니다.
2) 스크립트를 통해 스팟 인스턴스 생성
- 스크립트를 실행하여 동일한 환경의 EC2 server를 손쉽게 추가합니다.
sh setting_aws/setup_server.sh ${EC2_NAME}
3) SSH 접속
- EC2 생성 완료 후, 코드 배포를 위해 ssh로 접속합니다.
ssh -i /Users/AccessKeyPath/keyname.pem ec2-user@54.180.112.221
4) 프로비저닝 툴인 Ansible을 통해 애플리케이션 설치
- 순서대로 애플리케이션을 ansible playbook을 통해 배포합니다.
ansible-playbook -i data_pipeline/inventory/hosts.yml data_pipeline/roles/database/tasks/main.yml
ansible-playbook -i data_pipeline/inventory/hosts.yml data_pipeline/roles/serving/tasks/main.yml
ansible-playbook -i data_pipeline/inventory/hosts.yml data_pipeline/roles/collector/tasks/main.yml
ansible-playbook -i data_pipeline/inventory/hosts.yml data_pipeline/roles/generator/tasks/main.yml
배포 예시
5) Systemd를 통해 서비스 확인
systemctl status gernerator
systemctl status collector
systemctl status mariadb
systemctl status fastapi
6) DB 접속하여 수집되는 데이터 확인
mysql -u root -p {$password}
> use data
> select * from collector order by created_at desc limit 10;
7) Fast API Docs 확인
- Fast API는 API 스펙을 확인할 수 있는 페이지를 제공합니다.
http://54.180.112.221:8000/docs
8) API 호출
http://54.180.112.221:8000/api/stats
http://54.180.112.221:8000/api/count?item_id=A&from=2024-11-20T10:50:01.000&to=2024-11-20T23:20:59.000
[ Generator ]
초당 랜덤하게 데이터를 생성하기 위한 Python Code입니다.
def generate_data(data_dir):
while True:
item_id = random.choice(["A", "B", "C"])
created_at = datetime.now().isoformat()
date_str = datetime.now().strftime("%Y%m%d")
file_path = f"{data_dir}/{item_id}/{date_str}.dat"
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "a") as file:
file.write(f"{item_id},{created_at}\n")
print("file create")
time.sleep(0.1) # 초당 10건을 생성하기 위해 0.1초마다 실행
generate_data('/home/ec2-user/logs') # 파일 저장 경로
[ Collector ]
- Logstash는 스트리밍 데이터 파이프라인으로 설계되어 데이터를 실시간으로 수집 및 처리할 수 있습니다.
- 파일의 변경과 서버가 다운되더라도 마지막 읽은 부분부터 offset 추적 가능합니다.
Logstash Conf 파일
input {
file {
path => [
"/home/ec2-user/logs/A/${LOG_DATE}.dat",
"/home/ec2-user/logs/B/${LOG_DATE}.dat",
"/home/ec2-user/logs/C/${LOG_DATE}.dat"
]
# 파일의 처음부터 읽기 시작
start_position => "end"
# 마지막 offset 저장 파일
sincedb_path => "/home/ec2-user/packages/logstash-sincedb"
codec => plain # 파일 내용은 텍스트로 처리
}
}
filter {
# 로그 데이터 가공
grok {
match=>{"message"=>"%{WORD:item_id},%{TIMESTAMP_ISO8601:created_at}"}
}
...
}
output {
jdbc {
driver_class => "com.mysql.cj.jdbc.Driver"
connection_string => "jdbc:mysql://localhost:3306/data?user=...
...
}
[ Database ]
MariaDB를 통해 Collector에 전달 된 데이터를 저장합니다.
[ Serving ]
- 구현된 API의 파라미터, 리턴 값 등을 확인하고 테스트 할 수 있는 Fast API Docs를 확인합니다.
http://54.180.112.221:8000/docs
API 요청
아이템 건수를 기준으로 비율과 랭크를 반환
http://54.180.112.221:8000/api/stats
특정 기간의 아이템 건수를 반환
http://54.180.112.221:8000/api/count?item_id=A&from=2024-11-20T10:50:01.000&to=2024-11-20T23:20:59.000
[ 참조 ]
https://github.com/ehdrn3020/logstash_pipeline
GitHub - ehdrn3020/logstash_pipeline: logstash를 통해 데이터 수집 후 API 제공
logstash를 통해 데이터 수집 후 API 제공. Contribute to ehdrn3020/logstash_pipeline development by creating an account on GitHub.
github.com