spark+phoenix는 jdbc를 통해 표의 데이터를 읽습니다

5607 단어 hbase-phoenix 자료
잔말 말고 직접 코드로 연탄의 급소를 해결하다
마벤트 구성 추가

   org.apache.phoenix
   phoenix-core
   ${phoenix.version}

  org.apache.phoenix
  phoenix-spark
  ${phoenix.version}

spark에서phoenix의 데이터 읽기
package com.cctsoft.spark.offline;import org.apache.spark.sql.AnalysisException;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;/** * Created with IntelliJ IDEA.* User: Kevin Liu * Create Date: 2018/6/8 10:32 * Description: phoenix 테이블의 데이터 읽기 */public class FaceCrashImsiJob {public static void main (String [] args) throws Analysis Exception {//$example on: init session$Spark Session spark = Spark Session.builder (). appName"Java Spark SQL basic example")                .config("spark.some.config.option", "some-value")                .master("local")                .getOrCreate();       //$example off:init_session$        runBasicDataFrameExample(spark);        spark.stop();} private static void runBasic DataFrameExample(Spark Session spark) throws Analysis Exception {/** 4G 데이터 가져오기 * String tableName = "(select * from LTE DATA where to char(cap time) > ="+ imsidatastart Time + "and to char(cap time) <"+ imsiDataENd Time + ") as ELTFILTER Table*(select * from LTE_DATA where to_char(cap_time) >= '"+imsiDataStartTime+"' and to_char(cap_time) < '"+imsiDataEndTime+"') as LTE_DATA_FILTER";    logger.info("imsiTableName:"+imsiTableName);    Dataset df = spark.read().format("jdbc")            .option("driver","org.apache.phoenix.jdbc.PhoenixDriver")            .option("url","jdbc:phoenix:"+zookeeper+":2181")            .option("dbtable",imsiTableName)            .load();    df.registerTempTable("lte_data_tmp");    Dataset lteDataAll = spark.sql("select lte_dev_code,cap_time,imsi from lte_data_tmp order by cap_time desc");    lteDataAll.show();    }}
spark를 통해phoenix에 데이터를 쓰기
package com.cctsoft.spark.offline
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.sql.Row

/**
  * Created with IntelliJ IDEA.
  * User: Kevin Liu
  * CreateDate: 2018/6/15 12:32
  * Description:   phoenix   
  */
object TestMain {
  def main(args: Array[String]): Unit = {
    batchSaveFaceImsi(null)
  }


  def batchSaveFaceImsi(imsiRdd: JavaRDD[Row]): Unit ={
    import org.apache.phoenix.spark._

    val rdd = imsiRdd.rdd.map(x=>{
      (x.get(0).toString+":"+x.get(1).toString,x.get(0).toString,x.get(1).toString)
    }).filter(f=>f._3.toString!=null && f._3.toString!="")

    rdd.foreach(println)

    rdd.saveToPhoenix(
        "RESIDENT_TMP",
        Seq("ID","DEVICE_MAC","IMSI"),
        zkUrl = Some("jdbc:phoenix:abigdataclient1:2181")
    )



  }

}

좋은 웹페이지 즐겨찾기