Logstash on Docker로 CSV 데이터 전처리 도구 만들기

업무로 Logstash를 이용해 CSV 형식의 로그 데이터를 가공하는 툴을 만들었습니다. 이것이 예상외로 대고전. . . 「Pandas라고 한순간에 끝날 것 같지만.. . . 또 같은 업무를 할 때를 위해 비망록을 남겨두려고 생각합니다.

데이터 형식



이번에는 다음과 같은 입력 데이터를 Logstash로 읽고 각종 filter 처리를 실행하여 출력 이미지로 나타낸 형식의 데이터를 출력합니다.

입력 데이터
a,b,c,d,e: あいうえお: 日本語: カタカナ

출력 이미지
{"test1":"a","test2":"b","test3":"c","test4":"d","tag1":"e", "tag2":"あいうえお","tag3":"日本語","tag4":"カタカナ"}

Logstash on Docker 준비



이번에는 Logstash의 필터 기능 등을 연구하면서 작업을 진행할 필요가 있었기 때문에, 환경 구축이나 재실행을 용이하게 실행할 수 있는 Docker의 컨테이너를 이용하기로 했습니다. Logstash의 컨테이너 이미지를 컨테이너로 Docker Compose로 시작하고 원하는 로그 편집 프로세스를 수행합니다. 환경 구축에는 이하의 2개의 파일을 준비해, 이하의 디렉토리 구성과 같이 격납했습니다.

디렉토리 구성
logstashtest/
|-- docker
|   |-- config
|   |   `-- logstash.conf
|   `-- docker-compose.yml
`-- logfile
    |-- input_logs.csv
    `-- output_logs.csv

3 directories, 4 files

입력 데이터와 출력 데이터를 저장하는 디렉토리 ( logfile )와 Logstash의 내부 처리를 정의하는 logstash.conf
docker-compose.yml
version: "2.4"
services:
    logstash:
        image: docker.elastic.co/logstash/logstash:7.13.2
        volumes:
            - ../logfile:/usr/share/logstash/logfiles
            - ./config/logstash.conf:/usr/share/logstash/pipeline/logstash.conf

데이터 읽기 → 가공 → 출력 순서로 처리를 기재합니다.

logstash.conf
input {
    file {
        path => ["/usr/share/logstash/logfiles/input_logs.csv"]
        start_position => "beginning"
        sincedb_path => "/usr/share/logstash/mysincedb"
        codec => plain {
            charset => "SJIS"
        }
    }
}
filter {
  # Add column names
  csv {
      columns => [
          "test1",
          "test2",
          "test3",
          "test4",
          "test5"
      ]
  }
  # Create some new tags
  grok {
      match => {
          "test5" => "%{DATA:tag1}: %{DATA:tag2}: %{DATA:tag3}: %{GREEDYDATA:tag4}"
      }
  }
  # Remove unnecessary field names
  mutate {
      remove_field => [
          "@version",
          "host",
          "path",
          "@timestamp",
          "message",
          "tags",
          "test5"
      ]
  }
}
output {
    if ([test1])  {
        file {
            path => ["/usr/share/logstash/logfiles/output_logs.csv"]
        }
    }
}

판독부(input)


volumes 파일의 시작 부분에서 읽습니다. start_position => "beginning" 에 임의의 파일 패스를 지정하는 것으로, 어디까지 파일을 읽어들였는지를 바이트 단위로 기록해 주는 sicedb에 곧바로 액세스 할 수 있습니다. sincedb_path 항에서는 문자 코드를 지정할 수 있습니다.

처리부(filter)



3 종류의 필터를 사용합니다. csv 필터로 읽은 데이터에 열 이름을 추가합니다. 이 컬럼명을 사용해, 이후의 행정을 실행합니다. 그런 다음 grok 필터에서 "test5"항목에서 정규 패턴과 일치하는 부분을 분할하는 방법을 정의합니다. 여기에서는 DATA(임의의 문자의 반복) 형식의 항목이 codec 단락지어지고 있는 부분을 분할해, 정의한 컬럼명 : 를 추가합니다. 마지막으로 tag〇 필터에서 출력 데이터에 불필요한 열 이름을 제거합니다.

출력부(output)



if문의 사용법의 비망록으로서 조건 분기를 기재하고 있습니다만, 이 분기는 없어도 동작합니다. 이 방법으로 test1 항이 있으면 경로에 지정된 파일에 처리 된 데이터가 기록됩니다.

실행 결과



실행 결과는 다음과 같습니다. 사전 형식이므로 줄지어는 랜덤으로 되어 있습니다만, 이미지대로의 결과를 얻을 수 있었습니다.
{"test4":"d","tag1":"e","tag2":"あいうえお","test3":"c","test1":"a","tag3":"日本語","tag4":"カタカナ","test2":"b"}

Reference


  • Logstash File input plugin
  • Logstash File output plugin
  • Logstash Grok filter plugin
  • Logstash CSV filter plugin
  • Logstash on Docker config
  • 처음 Logstash
  • Logstash File input: sincedb_path
  • Logstash의 실용적인 설명
  • Kibana(Elasticsearch)+Logstash로 오타쿠의 지출을 시각화해 본다
  • logstash-pattern @ GitHub
  • logstash check if field exists
  • logstash 조건부 출력을 elasticsearch (filebeat 호스트 이름 당 인덱스) - elasticsearch, logstash, logstash-grok, logstash-configuration
  • i-Filter 액세스 로그를 Logstash로 정규화하고 보안 로그 분석에 활용하는 방법
  • [정규 표현식].*?은 최단 일치가 아닙니다.
  • 좋은 웹페이지 즐겨찾기