spark 2.3.3 kafka 의 데 이 터 를 소비 하고 hive 의 표 와 연결 하여 elasticsearch 6.5.3 (spark structed streaming) 을 기록 합 니 다.

배경: 로 컬 물리 기관실 에 빅 데이터 클 러 스 터 (cdh, spark 2.3.3 + hiv 3.0) 의 오래된 클 러 스 터 환경 을 새로 만 들 었 습 니 다. spark 2.1.0 + hiv 2.4.2 신 구 클 러 스 터 는 네트워크 로 접근 합 니 다. 클 러 스 터 구축 에 참여 하지 않 았 습 니 다. 프로필 디 렉 터 리, 각종 의존 하 는 jar 버 전, 포트 등 을 모 릅 니 다. elasticsearch 의 포트 는 nginx 를 통 해 전 송 됩 니 다.
해결 과정: ① hive 낡은 데이터 이전: 오래된 hive 클 러 스 터 에서 실행
export table a. b partition (day = 20150323) 을 'hdfs: / / 구 군집 master: port / tmp / b' 로 보 내기;
새로운 hive 클 러 스 터 에서 Create database a 를 실행 합 니 다.
'hdfs: / / 새 군집 master: port / tmp / b' 에서 외부 테이블 a. b 를 가 져 옵 니 다.
(우리 의 master: port 는 내부 기계 도 메 인 이름 으로 봉 인 됨) 완성 표 의 복사 ② hive - site. xml 새 군집 의 hive 명령 행 에서 set 를 실행 합 니 다. hive - site. xml 파일 이 있 는 디 렉 터 리 에서 새 군집 hive 가 사용 하 는 hive - site. xml 파일 을 spark 의 conf 디 렉 터 리 로 복사 할 수 있 습 니 다. (– files hive - site. xml 이 유효 하지 않 습 니 다. 원인 은 알 수 없습니다)③ 올 바른 카 프 카 의존 가방 을 찾 습 니 다. 제 것 은:
    
        org.apache.spark
        spark-streaming-kafka-0-10_2.11
        2.3.0
    
    
        org.apache.spark
        spark-sql-kafka-0-10_2.11
        2.3.0
    

③ 올 바른 Elasticsearch 의존 도 를 찾 습 니 다.
    
       org.elasticsearch
       elasticsearch-spark-20_2.11
       6.5.3
   

④ 제 이 슨 의존 (제 이 슨 을 대상 으로 서열 화 하려 면 scala 의 class 를 case class 로 해 야 한다)
    
        com.alibaba
        fastjson
        1.2.54
    

⑤ 일부 BUG 의 의존 도 를 해결한다.
  
      com.thoughtworks.paranamer
      paranamer
      2.8
  

⑥ 핵심 코드: object RealTime extends Serializable {
val checkpointLocation = “/user/hdfs/checkpointLocation” val spark=SparkSession .builder() .config(ConfigurationOptions.ES_NODES,ip) .config(ConfigurationOptions.ES_PORT, port) .config(ConfigurationOptions.ES_INDEX_AUTO_CREATE,“true”) .config(ConfigurationOptions.ES_NODES_WAN_ONLY,“true”)/ / 저희 ES 의 Node 와 Port 가 nginx 를 만 들 었 기 때문에 실제 es 포트 로 전송 하 는 과정 에서 도 메 인 이름 에 대한 해석 이 있 습 니 다.. enableHiveSupport () / / 반드시. apName ("appname"). getOrCreate () spark. conf. set ("spark. sql. streaming. checkpoint Location", checkpoint Location) val linesdF = spark. readStream. format ("kafka"). option ("kafka. boottstrap. server", kafkaIpPort). option ("subscribe", sourcetopic). load () import spark. implicits. val data = linesdF. selectExpr ("cast (value as string)"). map (row = > {/ / 데 이 터 를 string 으로 해석 한 후 json 형식 JSON. parseObject (row. getString (0), classOF [myCaseClass])} data. createOrReplaceTempView ("b") spark. sql (“select * from b left join a on b.id = a.id”) .writeStream .format(“es”) .outputMode(“append”) .start(“esindex/estype”) exec.awaitTermination()

좋은 웹페이지 즐겨찾기