elasticsearch 데이터 가져오기hive
11346 단어 hiveelasticsearch
2. 코드를 이용하여 오늘 두 번째 방법을 소개합니다. 본고는spark로elasticsearch to hive를 진행하는데 말하지 않고 코드pom로 바로 올라가겠습니다.xml 파일은 다음과 같습니다.
com.taobao.ym_dmp
1.0-SNAPSHOT
dmp_tags
4.0.0
jar
2.11.0
1.8
net.minidev
json-smart
2.3
junit
junit
4.4
test
org.apache.spark
spark-core_2.11
2.2.0
org.apache.spark
spark-sql_2.11
2.2.0
org.scala-lang
scala-library
${scala.version}
org.apache.httpcomponents
httpclient
4.5.2
org.mongodb.scala
mongo-scala-driver_2.11
2.1.0
org.mongodb.spark
mongo-spark-connector_2.11
2.2.0
cz.mallat.uasparser
uasparser
0.6.2
joda-time
joda-time
2.10.1
com.alibaba
fastjson
1.2.58
mysql
mysql-connector-java
8.0.13
com.amazonaws
aws-java-sdk-s3
1.11.588
com.aerospike
aerospike-client
4.2.0
org.elasticsearch
elasticsearch-spark-20_2.11
7.0.0
org.elasticsearch.client
elasticsearch-rest-high-level-client
6.2.4
src/main/scala
src/test/scala
org.codehaus.mojo
build-helper-maven-plugin
1.9.1
add-source
generate-sources
add-source
src/main/scala
src/main/java
org.apache.maven.plugins
maven-compiler-plugin
3.1
${jdk.version}
${jdk.version}
org.scala-tools
maven-scala-plugin
compile
testCompile
${scala.version}
-target:jvm-1.5
org.apache.maven.plugins
maven-shade-plugin
2.4.1
package
shade
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
spark 코드는 다음과 같습니다.
package com.taobao.dmp.impl
import java.net.URI
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.http.client.methods.HttpGet
import org.apache.http.entity.ContentType
import org.apache.http.nio.entity.NStringEntity
import org.apache.http.util.EntityUtils
import org.apache.http.{HttpEntity, HttpHost}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.elasticsearch.client.{Response, RestClient}
import org.elasticsearch.spark._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import scala.collection.mutable.ArrayBuffer
import scala.util.control.Breaks._
case class IfaClass(ifa:String,bundles:Array[String],countrys:Array[String])
object Es2Hive {
def run(): Unit ={
//get Audience Str
try{
// sparksession
val conf = new SparkConf()
conf.set("es.nodes","xxx.xxx.xxx.xxx")// elasticsearch ip
conf.set("es.port","9200")
conf.set("es.index.auto.create","true")
conf.set("spark.es.nodes.wan.only","false")
conf.set("spark.defalut.parallelism","750")
conf.set("es.batch.size.bytes", "50mb")
conf.set("es.batch.size.entries", "10000")
conf.set("es.scroll.size", "10000")
val ss = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
generateUserTagBySpark(ss)
// spark
ss.close()
} catch{
case e: Exception => {
e.printStackTrace()
throw new Exception("genrate audience")
}
}
}
def generateUserTagBySpark(ss: SparkSession) ={
// val TargetFilePath = s"s3://www.taobao.com/hive_dataware/dmp/t_dmp_target_audience_tbl/day=$Day/audience_id=$AudienceId/"
// FileSystem.get(new URI("s3://www.taobao.com"), ss.sparkContext.hadoopConfiguration).delete(new Path(TargetFilePath), true)
//IfaClass(ifa:String,bundles:Array[String],countrys:Array[String])
// import ss.implicits._
val queryDsl=
s"""
|{
| "query":{
| "match_all": {}
| }
|}
""".stripMargin
//IfaClass(line._1,line._2.get("bundles"),line._2.get("countrys"))
for(index IfaClass(line._1,assemArr(line._2.get("bundles").toString),assemArr(line._2.get("countrys").toString)))
import ss.implicits._
val ifaBundleCountryResult=rdd.toDF()
println(s"generate final t_dmp_idfa_bundle_country_array_tbl_$index start")
//dylan
val MediaFilePath = s"s3://www.taobao.com/hive_dataware/dmp/t_dmp_idfa_bundle_country_array_tbl_$index"
FileSystem.get(new URI("s3://www.taobao.com"), ss.sparkContext.hadoopConfiguration).delete(new Path(MediaFilePath), true)
// ifaBundleCountryResult.write.format("orc").save("s3://www.taobao.com/hive_dataware/dmp/t_dmp_idfa_bundle_country_array_tbl")
ifaBundleCountryResult.repartition(500).write.format("orc").save(MediaFilePath)
println(s"write to t_dmp_idfa_bundle_country_array_tbl_$index success")
ifaBundleCountryResult.unpersist(true)
}
}
def assemArr(assSr:String):Array[String]={
val arr: Array[String] =assSr.replace("Some(Buffer(","").replace(")","").split(",")
arr
}
def main(args: Array[String]): Unit = {
run()
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark + HWC로 Hive 테이블을 만들고 자동으로 Metadata를 Atlas에 반영합니다.HDP 3.1.x의 경우 Spark + HWC에서 Hive 테이블을 만들고 자동으로 Metadata를 Atlas에 반영하는 방법이 있습니다. 방법: 전제조건: Hive Warehouse Connector (HWC) ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.