DataPipeline/Elasticsearch
Logstash - pipelines.yml을 통한 다중 파이프라인
wave35
2024. 11. 26. 23:25
[ pipelines.yml 역할 ]
pipelines.yml 파일은 Logstash에서 여러 파이프라인을 동시에 실행하기 위해 사용되는 설정 파일입니다.
단일 Logstash 프로세스에서 여러 파이프라인을 구성하고 실행할 수 있어
다양한 데이터 흐름을 병렬로 처리할 수 있습니다.
파이프라인 간 독립성:
- 각 파이프라인은 고유한 ID와 설정 파일을 가지며, Logstash는 이를 독립적으로 실행합니다.
- 특정 파이프라인의 오류가 다른 파이프라인에 영향을 미치지 않습니다.
효율적인 리소스 활용:
- 단일 Logstash 인스턴스로 다중 파이프라인을 실행해 리소스를 절약하면서도 병렬 처리가 가능합니다.
유지보수 용이성:
- 설정 파일을 독립적으로 관리할 수 있어 유지보수가 용이하며, 확장성이 뛰어납니다.
[ 예제 ]
Input은 3가지 방법을 통해 정의합니다.
1) http를 통한 API에서 데이터 추출
2) 특정 경로의 파일을 통해 데이터 추출
3) DB에 접속해서 데이터 추출
Outout은 아래와 같습니다.
1) API와 파일에서 가져온 데이터는 API를 통해 output을 정의하고
2) DB에서 가져온 데이터는 파일로 output합니다.
[ 설정 파일 ]
pipelines.yml 파일
# pipelines.yml
- pipeline.id: api_pipeline
path.config: "/etc/logstash/conf.d/api_pipeline.conf"
- pipeline.id: file_pipeline
path.config: "/etc/logstash/conf.d/file_pipeline.conf"
- pipeline.id: db_pipeline
path.config: "/etc/logstash/conf.d/db_pipeline.conf"
1) API에서 데이터를 가져오는 파이프라인
/etc/logstash/conf.d/api_pipeline.conf
input {
http_poller {
urls => {
api_endpoint => "https://api.example.com/data"
}
request_timeout => 60
schedule => { cron => "* * * * *" } # 매 분마다 실행
codec => "json"
}
}
filter {
mutate {
add_field => { "pipeline_source" => "api" }
}
}
output {
http {
url => "https://api.example.com/output"
http_method => "post"
}
}
Input과 Output의 역할이 다르기 때문에 동일한 API를 호출하더라도
http_poller와 http 플러그인을 각각 사용합니다.
2) 파일에서 데이터를 가져오는 파이프라인
/etc/logstash/conf.d/file_pipeline.conf
input {
file {
path => "/var/log/input_data.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
mutate {
add_field => { "pipeline_source" => "file" }
}
}
output {
http {
url => "https://api.example.com/output"
http_method => "post"
}
}
3) DB에서 데이터를 가져오는 파이프라인
/etc/logstash/conf.d/db_pipeline.conf
input {
jdbc {
jdbc_driver_library => "/path/to/mysql-connector-java.jar"
jdbc_connection_string => "jdbc:mysql://localhost:3306/my_database"
jdbc_user => "db_user"
jdbc_password => "db_password"
statement => "SELECT * FROM my_table WHERE status = 'new'"
schedule => "* * * * *" # 매 분마다 실행
}
}
filter {
mutate {
add_field => { "pipeline_source" => "db" }
}
}
output {
file {
path => "/var/log/output_data.log"
}
}
[ Pipeline 동작 ]
API 데이터 처리:
- api_pipeline이 https://api.example.com/data에서 데이터를 가져와 필터링 후 https://api.example.com/output으로 전송.
파일 데이터 처리:
- file_pipeline이 /var/log/input_data.log에서 데이터를 읽고 필터링 후 https://api.example.com/output으로 전송.
DB 데이터 처리:
- db_pipeline이 MySQL 데이터베이스에서 새로운 데이터를 읽고 필터링 후 /var/log/output_data.log에 저장.
[ 출력 예제 ]
1. API에서 가져온 데이터의 처리 결과
{
"id": 101,
"name": "John Doe",
"status": "active",
"pipeline_source": "api"
}
2. 파일에서 가져온 데이터의 처리 결과
{
"timestamp": "2024-11-24T10:00:00Z",
"log_message": "File log entry",
"pipeline_source": "file"
}
3. DB에서 가져온 데이터의 처리 결과
{
"id": 202,
"record": "Sample record from DB",
"pipeline_source": "db"
}
이 데이터는 /var/log/output_data.log에 저장됩니다.