Spark integration ElasticSearch
3156 단어 spark
def dataframe_write_to_es(dataframe):
dataframe.write.format("org.elasticsearch.spark.sql")\
.option("es.nodes", "http://elasticsearch_domain")\
.option("es.port", 443)\
.option("es.nodes.wan.only", "true")\
.option("es.nodes.discovery", "false")\
.option("es.net.ssl", "true")\
.option("es.mapping.routing", "id_xxx")\
.save(es_index, mode="append")
def rdd_write_to_es(rdd):
conf = {"es.nodes": "http://elasticsearch_domain", "es.port": "80", "es.nodes.wan.only": "true",
"es.nodes.discovery": "false",
"es.mapping.routing": "xxx",
"es.batch.size.bytes": "30mb", "es.batch.size.entries": "300000",
"es.resource": index/type}
rdd .saveAsNewAPIHadoopFile(path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=conf)
이상의 conf는 elasticsearch-hadoop-configuration을 참조할 수 있습니다
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark 팁: 컴퓨팅 집약적인 작업을 위해 병합 후 셔플 파티션 비활성화작은 입력에서 UDAF(사용자 정의 집계 함수) 내에서 컴퓨팅 집약적인 작업을 수행할 때 spark.sql.adaptive.coalescePartitions.enabled를 false로 설정합니다. Apache Sp...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.