Spark integration ElasticSearch

3156 단어 spark
Spark Streaming으로 AWS의 kinesis(Kafka와 유사)에서 streaming 데이터를 읽고 spark 계산 프레임워크를 통해 처리한 후 write into ElasticSearch, spark는 데이터가 elasticsearch까지 두 가지 방식이 있는데 다음은 integration의 과정이라고 썼다.
  • 필요한 패키지 org.elasticsearch:elasticsearch-spark-20_2.11 [버전 spark 2.0, 2.11]download
  • spark가 ElasticSearch에 쓰는 두 가지 방식
  • rdd 직접 쓰기 ES 또는 데이터 프레임 직접 쓰기 ES:
    def dataframe_write_to_es(dataframe):
        dataframe.write.format("org.elasticsearch.spark.sql")\
                               .option("es.nodes", "http://elasticsearch_domain")\
                              .option("es.port", 443)\
                              .option("es.nodes.wan.only", "true")\
                              .option("es.nodes.discovery", "false")\
                              .option("es.net.ssl", "true")\
                              .option("es.mapping.routing", "id_xxx")\
                             .save(es_index, mode="append")
    
    
    def rdd_write_to_es(rdd):
     conf = {"es.nodes": "http://elasticsearch_domain", "es.port": "80", "es.nodes.wan.only": "true",
            "es.nodes.discovery": "false", 
            "es.mapping.routing": "xxx",
            "es.batch.size.bytes": "30mb", "es.batch.size.entries": "300000",
            "es.resource": index/type}   
     rdd .saveAsNewAPIHadoopFile(path='-',
          outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
          keyClass="org.apache.hadoop.io.NullWritable",
          valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
          conf=conf)

  • 이상의 conf는 elasticsearch-hadoop-configuration을 참조할 수 있습니다

    좋은 웹페이지 즐겨찾기