spark 2.3.3 kafka 의 데 이 터 를 소비 하고 hive 의 표 와 연결 하여 elasticsearch 6.5.3 (spark structed streaming) 을 기록 합 니 다.
해결 과정: ① hive 낡은 데이터 이전: 오래된 hive 클 러 스 터 에서 실행
export table a. b partition (day = 20150323) 을 'hdfs: / / 구 군집 master: port / tmp / b' 로 보 내기;
새로운 hive 클 러 스 터 에서 Create database a 를 실행 합 니 다.
'hdfs: / / 새 군집 master: port / tmp / b' 에서 외부 테이블 a. b 를 가 져 옵 니 다.
(우리 의 master: port 는 내부 기계 도 메 인 이름 으로 봉 인 됨) 완성 표 의 복사 ② hive - site. xml 새 군집 의 hive 명령 행 에서 set 를 실행 합 니 다. hive - site. xml 파일 이 있 는 디 렉 터 리 에서 새 군집 hive 가 사용 하 는 hive - site. xml 파일 을 spark 의 conf 디 렉 터 리 로 복사 할 수 있 습 니 다. (– files hive - site. xml 이 유효 하지 않 습 니 다. 원인 은 알 수 없습니다)③ 올 바른 카 프 카 의존 가방 을 찾 습 니 다. 제 것 은:
org.apache.spark
spark-streaming-kafka-0-10_2.11
2.3.0
org.apache.spark
spark-sql-kafka-0-10_2.11
2.3.0
③ 올 바른 Elasticsearch 의존 도 를 찾 습 니 다.
org.elasticsearch
elasticsearch-spark-20_2.11
6.5.3
④ 제 이 슨 의존 (제 이 슨 을 대상 으로 서열 화 하려 면 scala 의 class 를 case class 로 해 야 한다)
com.alibaba
fastjson
1.2.54
⑤ 일부 BUG 의 의존 도 를 해결한다.
com.thoughtworks.paranamer
paranamer
2.8
⑥ 핵심 코드: object RealTime extends Serializable {
val checkpointLocation = “/user/hdfs/checkpointLocation” val spark=SparkSession .builder() .config(ConfigurationOptions.ES_NODES,ip) .config(ConfigurationOptions.ES_PORT, port) .config(ConfigurationOptions.ES_INDEX_AUTO_CREATE,“true”) .config(ConfigurationOptions.ES_NODES_WAN_ONLY,“true”)/ / 저희 ES 의 Node 와 Port 가 nginx 를 만 들 었 기 때문에 실제 es 포트 로 전송 하 는 과정 에서 도 메 인 이름 에 대한 해석 이 있 습 니 다.. enableHiveSupport () / / 반드시. apName ("appname"). getOrCreate () spark. conf. set ("spark. sql. streaming. checkpoint Location", checkpoint Location) val linesdF = spark. readStream. format ("kafka"). option ("kafka. boottstrap. server", kafkaIpPort). option ("subscribe", sourcetopic). load () import spark. implicits. val data = linesdF. selectExpr ("cast (value as string)"). map (row = > {/ / 데 이 터 를 string 으로 해석 한 후 json 형식 JSON. parseObject (row. getString (0), classOF [myCaseClass])} data. createOrReplaceTempView ("b") spark. sql (“select * from b left join a on b.id = a.id”) .writeStream .format(“es”) .outputMode(“append”) .start(“esindex/estype”) exec.awaitTermination()
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spark 팁: 컴퓨팅 집약적인 작업을 위해 병합 후 셔플 파티션 비활성화작은 입력에서 UDAF(사용자 정의 집계 함수) 내에서 컴퓨팅 집약적인 작업을 수행할 때 spark.sql.adaptive.coalescePartitions.enabled를 false로 설정합니다. Apache Sp...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.