사이드프로젝트

SideProject - logstash를 통해 데이터 수집 후 API 제공

wave35 2024. 12. 16. 23:31

[ 실행 환경 ]

AWS EC2를 통한 서버 구성

  • 유연한 확장성: 필요에 따라 인스턴스 크기를 조정하거나 새 인스턴스를 추가하여 확장할 수 있습니다.
  • 빠른 배포: 몇 분 만에 새로운 인스턴스를 생성하여 서버를 시작할 수 있습니다.
  • 비용 효율성: 사용한 만큼만 비용을 지불합니다.

Ansible로 배포

  • Ansible은 IT 인프라 자동화를 위한 간단하고 강력한 도구이며, 손쉽게 다수의 서버에 배포가 가능합니다.

systemd에 등록하여 어플리케이션 실행

  • 간편한 배포를 위해 서비스 파일을 통해 애플리케이션의 실행 명령어, 환경 변수 등을 설정하여 쉽게 관리합니다.

 

[ 요구 사항 ]

- Generator는 1초마다 아래 스키마의 데이터 생성

Property Type Description
Item_id string A, B, C 중 하나
created_at datetime 데이터 생성 시간

- Collector를 통해 생성된 데이터 수집

- database에 데이터 저장

- API를 통해 데이터 조회가능

  • GET /api/stats : 아이템들 건수를 기준으로 비율과 랭크를 반환
  • GET /api/count : 특정 기간의 아이템 건수를 반환

 

[ 프로세스 ]

 

[ 프로젝트 구조 ]

logstash_pipeline
├── data_pipeline/
│   ├── inventory/
│   │   └── hosts.yml                 # Ansible 인벤토리 파일
│   ├── roles/
│   │   ├── collector/
│   │   │   ├── tasks/
│   │   │   │   └── main.yml          # Collector 역할의 작업 정의
│   │   │   ├── templates/
│   │   │   │   ├── collector.conf    # Logstash 구성 템플릿
│   │   │   │   └── collector.service # Collector 서비스 템플릿
│   │   ├── database/
│   │   │   ├── tasks/
│   │   │   │   └── main.yml          # Database 역할의 작업 정의
│   │   │   ├── templates/
│   │   │   │   └── create.sql        # 데이터베이스 초기화 SQL 템플릿
│   │   ├── generator/
│   │   │   ├── package/
│   │   │   │   └── data_creator.py   # 데이터 생성 Python 스크립트
│   │   │   ├── tasks/
│   │   │   │   └── main.yml          # Generator 역할의 작업 정의
│   │   │   ├── templates/
│   │   │   │   └── generator.service # Generator 서비스 템플릿
│   │   ├── serving/
│   │   │   ├── package/
│   │   │   │   └── fastapi_app.py    # FastAPI 애플리케이션 코드
│   │   │   ├── tasks/
│   │   │   │   └── main.yml          # Serving 역할의 작업 정의
│   │   │   ├── templates/
│   │   │   │   └── fastapi.service   # FastAPI 서비스 템플릿
├── setting_aws/
│   ├── .env                          # 환경 변수 파일
│   ├── setup_server.sh               # AWS 서버 설정 스크립트
│   ├── user_script.sh                # 사용자 스크립트
├── README.md                         # 프로젝트 설명 파일

 

[ 파이프라인 구현 방법 ]

1) AWS 환경설정

- AWS 계정 설정을 위해 .env 파일 수정합니다.

- .env파일에는 access_key, 생성할 인스턴스 유형, 보안그룹 등의 정보가 저장되어있습니다.

 

2) 스크립트를 통해 스팟 인스턴스 생성

- 스크립트를 실행하여 동일한 환경의 EC2 server를 손쉽게 추가합니다.

sh setting_aws/setup_server.sh ${EC2_NAME}

 

3) SSH 접속

- EC2 생성 완료 후, 코드 배포를 위해 ssh로 접속합니다.

ssh -i /Users/AccessKeyPath/keyname.pem ec2-user@54.180.112.221

 

4) 프로비저닝 툴인 Ansible을 통해 애플리케이션 설치

- 순서대로 애플리케이션을 ansible playbook을 통해 배포합니다.

ansible-playbook -i data_pipeline/inventory/hosts.yml data_pipeline/roles/database/tasks/main.yml
ansible-playbook -i data_pipeline/inventory/hosts.yml data_pipeline/roles/serving/tasks/main.yml
ansible-playbook -i data_pipeline/inventory/hosts.yml data_pipeline/roles/collector/tasks/main.yml
ansible-playbook -i data_pipeline/inventory/hosts.yml data_pipeline/roles/generator/tasks/main.yml

배포 예시

 

5) Systemd를 통해 서비스 확인

systemctl status gernerator
systemctl status collector
systemctl status mariadb
systemctl status fastapi

 

6) DB 접속하여 수집되는 데이터 확인

mysql -u root -p {$password}
> use data
> select * from collector order by created_at desc limit 10;

 

7) Fast API Docs 확인

- Fast API는 API 스펙을 확인할 수 있는 페이지를 제공합니다.

http://54.180.112.221:8000/docs

 

8) API 호출

http://54.180.112.221:8000/api/stats
http://54.180.112.221:8000/api/count?item_id=A&from=2024-11-20T10:50:01.000&to=2024-11-20T23:20:59.000

 

[ Generator ]

초당 랜덤하게 데이터를 생성하기 위한 Python Code입니다.

def generate_data(data_dir):
    while True:
        item_id = random.choice(["A", "B", "C"])
        created_at = datetime.now().isoformat()
        date_str = datetime.now().strftime("%Y%m%d")
        file_path = f"{data_dir}/{item_id}/{date_str}.dat"

        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        with open(file_path, "a") as file:
            file.write(f"{item_id},{created_at}\n")
            print("file create")

        time.sleep(0.1)  # 초당 10건을 생성하기 위해 0.1초마다 실행

generate_data('/home/ec2-user/logs') # 파일 저장 경로

 

 

[ Collector ]

- Logstash는 스트리밍 데이터 파이프라인으로 설계되어 데이터를 실시간으로 수집 및 처리할 수 있습니다.

- 파일의 변경과 서버가 다운되더라도 마지막 읽은 부분부터 offset 추적 가능합니다.

 

Logstash Conf 파일

input {
  file {
    path => [
      "/home/ec2-user/logs/A/${LOG_DATE}.dat",
      "/home/ec2-user/logs/B/${LOG_DATE}.dat",
      "/home/ec2-user/logs/C/${LOG_DATE}.dat"
    ]
    # 파일의 처음부터 읽기 시작
    start_position => "end"     
    # 마지막 offset 저장 파일
    sincedb_path => "/home/ec2-user/packages/logstash-sincedb" 
    codec => plain                    # 파일 내용은 텍스트로 처리
  }
}

filter {
  # 로그 데이터 가공
  grok {
    match=>{"message"=>"%{WORD:item_id},%{TIMESTAMP_ISO8601:created_at}"}
  }
  ...
}

output {
  jdbc {
    driver_class => "com.mysql.cj.jdbc.Driver"
    connection_string => "jdbc:mysql://localhost:3306/data?user=...
    ...
}

 

[ Database ]

MariaDB를 통해 Collector에 전달 된 데이터를 저장합니다.

 

[ Serving ]

- 구현된 API의 파라미터, 리턴 값 등을 확인하고 테스트 할 수 있는 Fast API Docs를 확인합니다.

http://54.180.112.221:8000/docs

 

API 요청

아이템 건수를 기준으로 비율과 랭크를 반환

http://54.180.112.221:8000/api/stats

특정 기간의 아이템 건수를 반환

http://54.180.112.221:8000/api/count?item_id=A&from=2024-11-20T10:50:01.000&to=2024-11-20T23:20:59.000

 

[ 참조 ]

https://github.com/ehdrn3020/logstash_pipeline

 

GitHub - ehdrn3020/logstash_pipeline: logstash를 통해 데이터 수집 후 API 제공

logstash를 통해 데이터 수집 후 API 제공. Contribute to ehdrn3020/logstash_pipeline development by creating an account on GitHub.

github.com