DataPipeline/Elasticsearch

Logstash - Json, Mutate, Roby, Date

wave35 2024. 11. 26. 14:14

[ JSON 필터 ]

JSON 형식 데이터를 파싱하여 Logstash 이벤트의 필드로 변환합니다.

JSON 데이터를 Logstash 내부에서 개별 필드로 분리하여 가공 및 분석이 용이하도록 합니다.

 

입력 데이터

{"user": "john_doe", "action": "login", "timestamp": "2024-11-24T10:00:00Z"}

Logstash 설정

filter {
  json {
    source => "message"
  }
}

변환 후 출력

{
  "user": "john_doe",
  "action": "login",
  "timestamp": "2024-11-24T10:00:00Z"
}

JSON 문자열이 각각 user, action, timestamp라는 개별 필드로 변환됩니다.

 

 

[ Mutate 필터 ]

이벤트 데이터를 수정하거나 변환하는데 사용됩니다.

필드를 추가, 삭제, 데이터 타입 변환, 대소문자 변환 등의 작업이 가능합니다.

 

입력 데이터

{"user": "john_doe", "age": "25", "location": "New York"}

Logstash 설정

filter {
  mutate {
    convert => { "age" => "integer" }
    rename => { "location" => "city" }
    add_field => { "status" => "active" }
    remove_field => [ "user" ]
  }
}

변환 후 출력

{
  "age": 25,
  "city": "New York",
  "status": "active"
}

 

 

[ Json + Mutate 필터 ]

다수의 Json Rows 가 저장된 데이터

{"user": "james", "action": "login", "timestamp": "2024-11-24T10:00:00Z"}
{"user": "doe", "action": "logout", "timestamp": "2024-11-24T10:01:00Z"}

Logstash 설정

input {
  file {
    path => ["/opt/homebrew/dgkLib/logstash-8.16.0/data/testdata.log"]
    sincedb_path => "/dev/null"
    start_position => "beginning"
   }
}

filter {
  # JSON 데이터 파싱
  json {
    source => "message"
  }
  mutate {
    # 필드 추가
    add_field => {"status" => "%{action}"}
    # 불필요한 필드 제거
    remove_field => [ "@timestamp", "message", "host", "event", "@version", "action"]
  }
}

output {
  stdout { codec => rubydebug }

출력 값

{
    "timestamp" => "2024-11-24T10:00:00Z",
         "user" => "james",
       "status" => "login"
}
{
    "timestamp" => "2024-11-24T10:01:00Z",
         "user" => "doe",
       "status" => "logout"
}

row가 쉼표(,)로 구분되면 split 필터를 추가로 설정을 해주어야합니다.

 

 

[ Ruby 필터 ]

Ruby 코드를 통해 데이터 변환을 사용자 정의 방식으로 처리합니다.

복잡한 데이터 처리 로직이 필요한 경우 유용합니다.

 

event 객체는 Logstash 이벤트 데이터를 나타냅니다.

event.set() 과 event.get()을 이용하여 데이터를 컨트롤 합니다.

 

예제 1) 필드 추가

Logstash 설정

filter {
  ruby {
    code => "
      event.set('new_field', 'Hello World')
    "
  }
}

입력 데이터

{
  "user": "doe",
  "status": "logout"
}

출력 결과

{
  "user": "doe",
  "status": "logout",
  "new_field": "Hello World"
}

new_field가 추가되고 값이 "Hello World"로 설정되었습니다.

 

예제 2) 필드 값 계산

입력 데이터

{"field1": 10, "field2": 20}

Logstash 설정

filter {
  ruby {
    code => "event.set('sum', event.get('field1') + event.get('field2'))"
  }
}

출력

{
  "field1": 10,
  "field2": 20,
  "sum": 30
}

field1과 field2 값을 더한 결과(sum)를 새로운 필드로 추가.

 

예제 3) 중첩된 필드 생성

Logstash 설정

filter {
  ruby {
    code => "
      event.set('[log][details]', 'File Processed')
    "
  }
}

입력 데이터

{
  "log": {
    "file": {
      "path": "/path/to/file"
    }
  }
}

출력 결과

{
  "log": {
    "file": {
      "path": "/path/to/file"
    },
    "details": "File Processed"
  }
}

중첩된 필드 [log][details]가 추가되고 값이 "File Processed"로 설정되었습니다.

 

예제 4) event.set와 event.get의 조합

event.set와 event.get는 함께 사용되어 이벤트의 데이터를 읽고, 수정, 추가할 수 있습니다.

filter {
  ruby {
    code => "
      original_value = event.get('user')
      event.set('greeting', 'Hello ' + original_value)
    "
  }
}

입력 데이터

{
  "user": "james"
}

출력 결과

{
  "user": "james",
  "greeting": "Hello james"
}

기존 user 값을 읽어와 greeting 필드로 새로운 값을 생성.

 

 

[ Date 필터 ]

날짜 데이터를 파싱하여 Logstash 이벤트의 타임스탬프(@timestamp) 필드에 저장하거나

사용자 지정 필드로 변환합니다.

 

입력 데이터

{"event_date": "2024-11-24T10:00:00Z"}

Logstash 설정

filter {
  date {
    match => [ "event_date", "ISO8601" ]
    target => "@timestamp"
  }
}

변환 후 출력

{
  "event_date": "2024-11-24T10:00:00Z",
  "@timestamp": "2024-11-24T10:00:00.000Z"
}

event_date 필드 값이 파싱되어 Logstash의 기본 타임스탬프 필드인 @timestamp로 설정.