logstash 사용자 정의 input

13897 단어 ElasticSearch

 


logstash는 데이터 파이프 중간부품으로서 각종 유형의 데이터에 대한 수집과 변환을 지원하고 데이터를 각종 유형의 저장소에 전송한다. 예를 들어 kafka 데이터를 소비하고 Elasticsearch에 쓰기, 로그 파일을 대상에 동기화하여 S3를 저장하는 등, mysql 데이터는 Elasticsearch에 동기화한다.
logstash 내부에는 주로 세 개의 모듈이 포함되어 있습니다.
input:  
filter:  、 
output:  

image
서로 다른 유형의 데이터는 대응하는 input-plugin,output-plugin을 통해 데이터의 입력과 출력을 완성할 수 있다.만약에 kafka의 데이터를 소비하고 Elasticsearch에 기록해야 한다면logstash의kafka-input-plugin을 사용하여 데이터 입력을 완성하고logstash-output-elasticsearch를 사용하여 데이터 출력을 완성해야 한다.입력 데이터에 대한 필터링이나 변환이 필요하다면, 예를 들어 키워드에 따라 필요하지 않은 내용을 필터링하거나 시간 필드의 형식 변환이 필요하다면, 필터-plugin이 완성되어야 한다.
logstash의 input 플러그인은 현재 수십 가지가 있으며, 대부분의 일반적인 데이터 원본 입력을 지원합니다.그러나 만약에 회사 내부에서 개발한 데이터베이스나 다른 저장류의 서비스가 개원 제품과 인터페이스 프로토콜에서 호환되지 못한다면 예를 들어 텐센트가 자체 연구한 메시지 대기열 서비스 CMQ는 다른 개원 메시지 대기열 제품에 의존하지 않기 때문에logstash의logstash-input-kafka나logstash-input-rabbitmq를 사용하여 CMQ의 데이터를 동기화할 수 없다.텐센트 클라우드 대상 저장 서비스인 COS는 감권 방식에 있어 AWS의 S3와 차이가 존재하고logstash-input-s3 플러그인을 직접 사용하여 COS에서 데이터를 읽을 수 없다. 이런 상황에 대해logstash의 input 플러그인을 자체 개발해야 한다.
본고는logstash의cos input 플러그인을 개발하는 것을 예로 들어logstash의 input 플러그인을 개발하는 방법을 소개한다.
logstash 공식은 간단한 inputplugin example를 제공하여 참고할 수 있습니다.https://github.com/logstash-plugins/logstash-input-example/

환경 준비


logstash는 jruby 개발을 사용합니다. 우선 jruby 환경을 설정해야 합니다.
  • 설치 rvm: rvm는 루비 관리자로 루비 환경을 설치하고 관리할 수 있으며 명령줄을 통해 서로 다른 루비 버전으로 전환할 수 있습니다
    gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 7D2BAF1CF37B13E2069D6956105BD0E739499BDB
    
    \curl -sSL https://get.rvm.io | bash -s stable
    
    source /etc/profile.d/rvm.sh
    
  • jruby 설치
    rvm install jruby
    
    rvm use jruby
    
  • 패키지 관리 도구bundle와 테스트 도구rspec를 설치합니다
    gem install bundle
    gem install rspec
    

  • example부터.

  • clone logstash-input-example
    git clone https://github.com/logstash-plugins/logstash-input-example.git
    
  • 클론에서 나온 logstash-input-example 원본 코드copy를 logstash-input-cos 디렉터리로 보내고 삭제합니다.git 폴더, 목적은logstash-input-example의 원본 코드를 참고하여 개발하는 동시에 명칭을 변경해야 하는 부분을 수정하는 것입니다
     mv logstash-input-example.gemspec logstash-input-cos.gemspec
     mv lib/logstash/inputs/example.rb lib/logstash/inputs/cos.rb
     mv spec/inputs/example_spec.rb spec/inputs/cos_spec.rb
    
  • 구축된 원본 디렉터리 구조는 그림과 같다

  • image
    여기서 중요한 파일의 역할은 다음과 같습니다.
  • cos.rb:주 파일입니다. 이 파일에서logstash 프로필의 읽기와 원본 데이터로 얻은 코드를 작성하려면 LogStash::Inputs::Base 기본 클래스를 계승해야 합니다
  • cos_spec.rb: 단원 테스트 파일,rspec를 통해cos를 할 수 있습니다.rb의 코드를 테스트합니다
  • logstash-input-cos.gemspec: 마ven의pom과 유사합니다.xml 파일, 프로젝트의 버전, 이름, licene, 패키지 의존 등을 설정하고bundle 명령을 통해 의존 패키지를 다운로드할 수 있습니다

  • 종속 구성 및 다운로드


    텐센트 클라우드 COS 서비스는 루비 sdk가 없기 때문에 자바 sdk에만 의존하여 개발할 수 있기 때문에 우선cosjava sdk에 대한 의존을 추가한다.logstash-input-cos.gemspec의 Gem dependencies 구성 모음에 다음 내용이 추가됩니다.
    # Gem dependencies
      s.requirements << "jar 'com.qcloud:cos_api', '5.4.4'"
      s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
      s.add_runtime_dependency 'logstash-codec-plain'
      s.add_runtime_dependency 'stud', '>= 0.0.22'
      s.add_runtime_dependency 'jar-dependencies'
      s.add_development_dependency 'logstash-devutils', '1.3.6'
    

    logstash-input-example보다.gemspec, 대com 추가.qcloud:cos_api 패키지와jar-dependencies 패키지의 의존,jar-dependencies는ruby 환경에서jar 패키지를 관리하고jar 패키지의 불러오는 상태를 추적할 수 있습니다.
    그리고logstash-input-cos.gemspec에서 추가 구성:
    s.platform = 'java'
    

    이렇게 하면 자바 의존 패키지를 성공적으로 다운로드할 수 있고 루비 코드에서 자바 코드를 직접 호출할 수 있다.
    마지막으로 다음 명령을 실행하여 종속성을 다운로드합니다.
    bundle install
    

    코드 작성


    logstash-input-cos의 코드 논리는 사실 비교적 간단하다. 주로 정시 작업을 수행하고cosjavasdk의listObjects 방법을 호출하여 지정한 버킷에 있는 데이터를 얻고 매번 정시 작업이 끝난 후marker를 로컬에 저장하고 다시 실행할 때marker 위치에서 데이터를 가져와 데이터의 증량 동기화를 실현한다.

    jar 패키지 인용


    cos java sdk의 코드를 호출하려면 이 jar 패키지를 참조하십시오.
    require 'cos_api-5.4.4.jar'
    java_import com.qcloud.cos.COSClient;
    java_import com.qcloud.cos.ClientConfig;
    java_import com.qcloud.cos.auth.BasicCOSCredentials;
    java_import com.qcloud.cos.auth.COSCredentials;
    java_import com.qcloud.cos.exception.CosClientException;
    java_import com.qcloud.cos.exception.CosServiceException;
    java_import com.qcloud.cos.model.COSObjectSummary;
    java_import com.qcloud.cos.model.ListObjectsRequest;
    java_import com.qcloud.cos.model.ObjectListing;
    java_import com.qcloud.cos.region.Region;
    

    구성 파일 읽기


    logstash 프로필에서 읽은 코드는 그림과 같습니다.
     
    image
    config_name은 cos입니다. 다른 설정 항목의 읽기 코드는 루비의 코드 규범에 따라 작성되고 형식 검사와 기본값을 추가하면 다음 설정 파일에서 설정 항목을 읽을 수 있습니다.
    input {
        cos {
            "endpoint" => "cos.ap-guangzhou.myqcloud.com"
            "access_key_id" => "*****"
            "access_key_secret" => "****"
            "bucket" => "******"
            "region" => "ap-guangzhou"
            "appId" => "**********"
            "interval" => 60
        }
    }
    
    output {
        stdout {
            codec=>rubydebug
        }
    }
    

    레지스터 구현 방법


    logstash input 플러그인은 다른 방법을 실현해야 합니다:register와run
    register 방법은 초기화 방법과 유사합니다. 이 방법에서 프로필에서 읽고 값을 부여하는 변수를 직접 사용하여cosclient의 초기화를 완성할 수 있습니다. 코드는 다음과 같습니다.
        # 1  (appid, secretId, secretKey)
        cred = com.qcloud.cos.auth.BasicCOSCredentials.new(@access_key_id, @access_key_secret)
        # 2  bucket , COS  https://www.qcloud.com/document/product/436/6224
        clientConfig = com.qcloud.cos.ClientConfig.new(com.qcloud.cos.region.Region.new(@region))
        # 3  cos 
        @cosclient = com.qcloud.cos.COSClient.new(cred, clientConfig)
        # bucket ,  appid
        bucketName = @bucket + "-"+ @appId
        @bucketName = bucketName
    
        @listObjectsRequest = com.qcloud.cos.model.ListObjectsRequest.new()
        #  bucket 
        @listObjectsRequest.setBucketName(bucketName)
        # prefix object key prefix 
        @listObjectsRequest.setPrefix(@prefix)
        #  ,  listobject 1000
        @listObjectsRequest.setMaxKeys(1000)
        @listObjectsRequest.setMarker(@markerConfig.getMarker)
    

    예시 코드에는 @cosclient와 @listObjectRequest가 전역 변수로 설정되어 있습니다. run 방법에서 이 두 변수를 사용할 수 있기 때문입니다.
    루비에서 자바 코드를 호출하는 방식을 주의하십시오. 변수 설명자가 없습니다.new Object()는 직접 사용할 수 없고 Object만 사용할 수 있습니다.new().

    실행 방법


    run 방법으로 데이터를 얻고 데이터 흐름을 이벤트 이벤트로 변환
    가장 간단한 run 방법:
    def run(queue)
        Stud.interval(@interval) do
          event = LogStash::Event.new("message" => @message, "host" => @host)
          decorate(event)
          queue << event
        end # loop
      end # def run
    

    코드 설명:
  • Stud ruby 모듈을 통해 정시 작업을 수행하고,interval을 사용자 정의하여 프로필에서 읽을 수 있습니다
  • 이벤트를 생성하고 예시 코드는 두 필드의 데이터를 포함하는 이벤트를 생성합니다
  • decorate () 방법을 호출하여 이 이벤트에 태그를 걸고 설정하면
  • queue<

  • logstash-input-cos의 run 방법은 다음과 같습니다.
    def run(queue)
        @current_thread = Thread.current
        Stud.interval(@interval) do
          process(queue)
        end
    end
     
    def process(queue)
        @logger.info('Marker from: ' + @markerConfig.getMarker)
        
        objectListing = @cosclient.listObjects(@listObjectsRequest)
        nextMarker = objectListing.getNextMarker()
        cosObjectSummaries = objectListing.getObjectSummaries()
        cosObjectSummaries.each do |obj|
           #  key
           key = obj.getKey()
        
           if stop?
             @logger.info("stop while attempting to read log file")
             break
           end
           #  key 
           getObject(key) { |log|
             #  
             @codec.decode(log) do |event|
               decorate(event)
               queue << event
             end
           }
    
           #  marker
           @markerConfig.setMarker(key)
           @logger.info('Marker end: ' + @markerConfig.getMarker)
        end
      end
    
    
      #  
     def getObject(key, &block)
        getObjectRequest = com.qcloud.cos.model.GetObjectRequest.new(@bucketName, key)
        cosObject = @cosclient.getObject(getObjectRequest)
        cosObjectInput = cosObject.getObjectContent()
        buffered =BufferedReader.new(InputStreamReader.new(cosObjectInput))
        while (line = buffered.readLine())
          block.call(line)
        end
      end
    

    테스트 코드


    spec/inputs/cos_spec.rb에 다음 테스트 코드가 추가됩니다.
    # encoding: utf-8
    require "logstash/devutils/rspec/spec_helper"
    require "logstash/inputs/cos"
    
    describe LogStash::Inputs::Cos do
    
      it_behaves_like "an interruptible input plugin" do
        let(:config) { {
            "endpoint" => 'cos.ap-guangzhou.myqcloud.com',
            "access_key_id" => '*',
            "access_key_secret" => '*',
            "bucket" => '*',
             "region" => 'ap-guangzhou',
             "appId" => '*',
            "interval" => 60 } }
      end
    end
    
    

    rspec는 ruby 테스트 라이브러리로 bundle 명령을 통해 rspec를 실행합니다.
    bundle exec rspec
    

    하면, 만약, 만약...rb의 코드에 구문이 없거나 실행 중 오류가 발생하면 테스트가 성공했다는 메시지가 나타납니다.
    Finished in 0.8022 seconds (files took 3.45 seconds to load)
    1 example, 0 failures
    

    input-plugin-cos 구축 및 테스트


    build


    gem을 사용하여 input-plugin-cos 플러그인 원본을 빌드합니다.
    gem build logstash-input-cos.gemspec
    

    구축이 완료되면logstash-input-cos-0.0.1-java라는 이름이 생성됩니다.gem 파일

    test


    logstash의 압축 해제 디렉터리에서 명령을 실행하여logstash-input-cosplugin을 설치합니다.
    ./bin/logstash-plugin install /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem
    

    실행 결과:
    Validating /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem
    Installing logstash-input-cos
    Installation successful
    

    또한./bin/logstash-pluginlist 명령은logstash가 설치된 모든 input/output/filter/codec 플러그인을 보십시오.
    프로필 생성 cos.logstash.conf, 내용:
    input {
        cos {
            "endpoint" => "cos.ap-guangzhou.myqcloud.com"
            "access_key_id" => "*****"
            "access_key_secret" => "****"
            "bucket" => "******"
            "region" => "ap-guangzhou"
            "appId" => "**********"
            "interval" => 60
        }
    }
    
    output {
        stdout {
            codec=>rubydebug
        }
    }
    

    이 프로필은 텐센트 클라우드 홈페이지 계정의 secret_를 사용합니다id 및 secret_key는 권한 검증을 하고 지정한 버킷의 데이터를 추출하여 테스트를 위해 출력을 표준 출력으로 설정합니다.
    logstash 실행:
    ./bin/logstash -f cos.logstash.conf
    

    출력 결과:
    Sending Logstash's logs to /root/logstash-5.6.4/logs which is now configured via log4j2.properties
    [2018-07-30T19:26:17,039][WARN ][logstash.runner          ] --config.debug was specified, but log.level was not set to 'debug'! No config info will be logged.
    [2018-07-30T19:26:17,048][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.4/modules/netflow/configuration"}
    [2018-07-30T19:26:17,049][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.4/modules/fb_apache/configuration"}
    [2018-07-30T19:26:17,252][INFO ][logstash.inputs.cos      ] Using version 0.1.x input plugin 'cos'. This plugin isn't well supported by the community and likely has no maintainer.
    [2018-07-30T19:26:17,341][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
    [2018-07-30T19:26:17,362][INFO ][logstash.inputs.cos      ] Registering cos input {:bucket=>"bellengao", :region=>"ap-guangzhou"}
    [2018-07-30T19:26:17,528][INFO ][logstash.pipeline        ] Pipeline main started
    [2018-07-30T19:26:17,530][INFO ][logstash.inputs.cos      ] Marker from:
    log4j:WARN No appenders could be found for logger (org.apache.http.client.protocol.RequestAddCookies).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    [2018-07-30T19:26:17,574][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
    [2018-07-30T19:26:17,714][INFO ][logstash.inputs.cos      ] Marker end: access.log
    {
           "message" => "77.179.66.156 - - [25/Oct/2016:14:49:33 +0200] \"GET / HTTP/1.1\" 200 612 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
          "@version" => "1",
        "@timestamp" => 2018-07-30T11:26:17.710Z
    }
    {
           "message" => "77.179.66.156 - - [25/Oct/2016:14:49:34 +0200] \"GET /favicon.ico HTTP/1.1\" 404 571 \"http://localhost:8080/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
          "@version" => "1",
        "@timestamp" => 2018-07-30T11:26:17.711Z
    }
    

    코스의 버킷에access라는 이름을 올렸습니다.log의nginx 로그, 상기 출력 결과에서 마지막으로 출력된 모든 json 구조체는 하나의 이벤트를 구성하는데 그 중에서 메시지 메시지는access입니다.로그의 모든 로그입니다.
    저자: bellengao 링크:https://www.jianshu.com/p/5dc06601af9e출처: 간서간서의 저작권은 작가의 소유이며, 어떠한 형식의 전재도 작가에게 연락하여 권한을 수여받고 출처를 밝히십시오.

    좋은 웹페이지 즐겨찾기