Spark를 사용하여 Elasticsearch에 데이터를 신속하게 기록하는 방법

11657 단어 SparkElasticWaterdrop

Spark를 사용하여 Elasticsearch에 데이터를 신속하게 기록하는 방법


데이터가 Elasticsearch에 기록되면 가장 먼저 떠오르는 것은 Logstash이다.Logstash는 간단하고 확장 가능하며 신축 가능한 장점 때문에 많은 사용자들이 받아들인다.그러나 자가 짧고 한 치의 장점도 있다. Logstash도 적용할 수 없는 응용 장면이 있을 것이다. 예를 들어 다음과 같다.
  • 해량 데이터 ETL
  • 해량의 데이터 집합
  • 다원적 데이터 처리

  • 이러한 장면을 만족시키기 위해 많은 학우들이 스파크를 선택하여 스파크 산자를 빌려 데이터 처리를 하고 마지막으로 처리 결과를 Elasticsearch에 기록한다.
    우리 부서는 이전에 Spark를 이용하여 Nginx 로그를 분석하고 우리의 웹 서비스 방문 상황을 통계하며 Nginx 로그를 분당 한 번씩 집합하여 최종적으로 결과를 Elasticsearch에 기록한 다음에 Kibana 설정을 이용하여 실시간으로 대시보드를 감시했다.Elasticsearch와 Kibana는 모두 편리하고 실용적이지만 유사한 수요가 갈수록 많아지면서 스파크를 통해 데이터를 어떻게 신속하게 Elasticsearch에 쓰는가가 우리의 큰 문제가 되었다.
    오늘 여러분께 데이터의 빠른 쓰기를 실현할 수 있는 블랙 테크놀로지인 워터드롭(Waterdrop)을 추천합니다. 매우 사용하기 쉽고 고성능이며 대량의 데이터에 대응할 수 있는 실시간 데이터 처리 제품으로 스파크에 구축되어 간단하고 사용하기 쉬우며 유연하게 배치되어 개발할 필요가 없습니다.

    Kafka to Elasticsearch


    Logstash와 마찬가지로 Waterdrop은 다양한 유형의 데이터 입력을 지원합니다. 여기서 가장 흔히 볼 수 있는 Kakfa를 입력원으로 삼아 Waterdrop을 사용하여 데이터를 Elasticsearch에 신속하게 쓰는 방법을 설명합니다.

    Log Sample


    원본 로그 형식은 다음과 같습니다.
    127.0.0.1 elasticsearch.cn 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:21:54:32 +0800] "GET /article HTTP/1.1" 200 123 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)"
    

    Elasticsearch Document


    1분마다 도메인 이름에 대한 액세스를 집계하고자 합니다. 수집된 데이터에는 다음 필드가 있습니다.
    domain String
    hostname String
    status int
    datetime String
    count int
    

    Waterdrop with Elasticsearch


    다음은 Waterdrop을 통해 Kafka의 데이터를 읽고 데이터를 해석하고 집합하는 방법을 상세히 소개하고, 마지막으로 처리 결과를 Elasticsearch에 기록합니다.

    Waterdrop


    Waterdrop 역시 매우 풍부한 플러그인을 가지고 있으며 Kafka, HDFS, Hive에서 데이터를 읽고 다양한 데이터 처리를 지원하며 결과를 Elasticsearch, Kudu 또는 Kafka에 기록합니다.

    Prerequisites


    우선 Waterdrop을 설치해야 합니다. 설치가 매우 간단하고 시스템 환경 변수를 설정할 필요가 없습니다.
  • 스파크 환경을 준비하세요
  • Waterdrop 설치
  • Waterdrop 설정

  • 다음은 Quick Start를 참조할 수 있는 간단한 단계입니다.
    cd /usr/local
    wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
    tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
    wget https://github.com/InterestingLab/waterdrop/releases/download/v1.1.1/waterdrop-1.1.1.zip
    unzip waterdrop-1.1.1.zip
    cd waterdrop-1.1.1
    
    vim config/waterdrop-env.sh
    #  Spark 
    SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}
    

    Waterdrop Pipeline


    Logstash와 마찬가지로, 우리는 Waterdrop Pipeline의 프로필만 작성하면 데이터의 가져오기를 완성할 수 있으며, Logstash를 아는 친구가 Waterdrop 프로필을 빨리 구할 수 있을 것이라고 믿습니다.
    구성 파일은 Spark, Input, filter, Output 등 네 가지 섹션으로 구성됩니다.

    Spark


    이 부분은 스파크의 관련 설정으로 스파크가 실행될 때 필요한 자원 크기를 주로 설정한다.
    spark {
      spark.app.name = "Waterdrop"
      spark.executor.instances = 2
      spark.executor.cores = 1
      spark.executor.memory = "1g"
      spark.streaming.batchDuration = 5
    }
    

    Input


    이 부분에서 데이터 원본을 정의합니다. 다음은 Kafka에서 데이터를 읽는 설정 사례입니다.
    kafkaStream {
        topics = "waterdrop-es"
        consumer.bootstrap.servers = "localhost:9092"
        consumer.group.id = "waterdrop_es_group"
        consumer.rebalance.max.retries = 100
    }
    

    Filter


    Filter 섹션에서는 로그를 분할하는 정규 해석, HTTPDATE를 Elasticsearch에서 지원하는 날짜 형식으로 변환하는 시간 변환, Number 형식의 필드 형식 변환, SQL을 통한 데이터 집합 등 일련의 변환을 구성합니다.
    filter {
        #  
        #  raw_message 
        grok {
            source_field = "raw_message"
            pattern = '%{NOTSPACE:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
       }
        #  "dd/MMM/yyyy:HH:mm:ss Z" 
        # Elasticsearch 
        date {
            source_field = "timestamp"
            target_field = "datetime"
            source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
            target_time_format = "yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
        }
        ##  SQL 
        sql {
            table_name = "access_log"
            sql = "select domain, hostname, int(status), datetime, count(*) from access_log group by domain, hostname, status, datetime"
        }
     }
    

    Output


    마지막으로 우리는 처리된 구조화된 데이터를 Elasticsearch에 쓸 것이다.
    output {
        elasticsearch {
            hosts = ["localhost:9200"]
            index = "waterdrop-${now}"
            es.batch.size.entries = 100000
            index_time_format = "yyyy.MM.dd"
        }
    }
    

    Running Waterdrop


    우리는 상술한 네 부분의 설정을 우리의 프로필로 조합할 것이다config/batch.conf .
    vim config/batch.conf
    
    spark {
      spark.app.name = "Waterdrop"
      spark.executor.instances = 2
      spark.executor.cores = 1
      spark.executor.memory = "1g"
      spark.streaming.batchDuration = 5
    }
    input {
        kafkaStream {
            topics = "waterdrop-es"
            consumer.bootstrap.servers = "localhost:9092"
            consumer.group.id = "waterdrop_es_group"
            consumer.rebalance.max.retries = 100
        }
    }
    filter {
        #  
        #  raw_message 
        grok {
            source_field = "raw_message"
            pattern = '%{IP:hostname}\\s%{NOTSPACE:domain}\\s%{IP:remote_addr}\\s%{NUMBER:request_time}s\\s\"%{DATA:upstream_ip}\"\\s\\[%{HTTPDATE:timestamp}\\]\\s\"%{NOTSPACE:method}\\s%{DATA:url}\\s%{NOTSPACE:http_ver}\"\\s%{NUMBER:status}\\s%{NUMBER:body_bytes_send}\\s%{DATA:referer}\\s%{NOTSPACE:cookie_info}\\s\"%{DATA:user_agent}'
       }
        #  "dd/MMM/yyyy:HH:mm:ss Z" 
        # Elasticsearch 
        date {
            source_field = "timestamp"
            target_field = "datetime"
            source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
            target_time_format = "yyyy-MM-dd'T'HH:mm:00.SSS+08:00"
        }
        ##  SQL 
        sql {
            table_name = "access_log"
            sql = "select domain, hostname, status, datetime, count(*) from access_log group by domain, hostname, status, datetime"
        }
     }
    output {
        elasticsearch {
            hosts = ["localhost:9200"]
            index = "waterdrop-${now}"
            es.batch.size.entries = 100000
            index_time_format = "yyyy.MM.dd"
        }
    }
    

    명령을 실행하고 프로필을 지정하고 Waterdrop을 실행하면 데이터를 Elasticsearch에 쓸 수 있습니다.여기서 우리는 현지 모델을 예로 들 수 있다.
    ./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'
    

    마지막으로 Elasticsearch에 기록된 데이터는 다음과 같습니다. Kibana를 추가하면 웹 서비스의 실시간 모니터링을 실현할 수 있습니다 _.
    "_source": {
        "domain": "elasticsearch.cn",
        "hostname": "localhost",
        "status": "200",
        "datetime": "2018-11-26T21:54:00.000+08:00",
        "count": 26
      }
    

    Conclusion


    이 글에서 우리는 워터드롭을 통해 카프카의 데이터를 Elasticsearch에 기록하는 방법을 소개했다.하나의 프로필만으로 Spark Application을 신속하게 실행할 수 있으며, 데이터의 처리와 쓰기를 완성할 수 있으며, 코드를 작성하지 않아도 매우 간단하다.
    데이터 처리 과정에서 Logstash가 지원할 수 없는 장면이나 Logstah 성능이 기대에 미치지 못할 경우 Waterdrop을 사용하여 문제를 해결할 수 있습니다.
    Waterdrop과 Elasticsearch, Kafka, Hadoop이 결합하여 사용하는 더 많은 기능과 사례를 알고 싶으면 프로젝트 메인 페이지에 직접 들어갈 수 있습니다https://github.com/InterestingLab/waterdrop
    –Power by InterestingLab

    좋은 웹페이지 즐겨찾기