spark+phoenix는 jdbc를 통해 표의 데이터를 읽습니다
5607 단어 hbase-phoenix 자료
마벤트 구성 추가
org.apache.phoenix
phoenix-core
${phoenix.version}
org.apache.phoenix
phoenix-spark
${phoenix.version}
spark에서phoenix의 데이터 읽기
package com.cctsoft.spark.offline;import org.apache.spark.sql.AnalysisException;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.SparkSession;/** * Created with IntelliJ IDEA.* User: Kevin Liu * Create Date: 2018/6/8 10:32 * Description: phoenix 테이블의 데이터 읽기 */public class FaceCrashImsiJob {public static void main (String [] args) throws Analysis Exception {//$example on: init session$Spark Session spark = Spark Session.builder (). appName"Java Spark SQL basic example") .config("spark.some.config.option", "some-value") .master("local") .getOrCreate(); //$example off:init_session$ runBasicDataFrameExample(spark); spark.stop();} private static void runBasic DataFrameExample(Spark Session spark) throws Analysis Exception {/** 4G 데이터 가져오기 * String tableName = "(select * from LTE DATA where to char(cap time) > ="+ imsidatastart Time + "and to char(cap time) <"+ imsiDataENd Time + ") as ELTFILTER Table*(select * from LTE_DATA where to_char(cap_time) >= '"+imsiDataStartTime+"' and to_char(cap_time) < '"+imsiDataEndTime+"') as LTE_DATA_FILTER"; logger.info("imsiTableName:"+imsiTableName); Dataset df = spark.read().format("jdbc") .option("driver","org.apache.phoenix.jdbc.PhoenixDriver") .option("url","jdbc:phoenix:"+zookeeper+":2181") .option("dbtable",imsiTableName) .load(); df.registerTempTable("lte_data_tmp"); Dataset lteDataAll = spark.sql("select lte_dev_code,cap_time,imsi from lte_data_tmp order by cap_time desc"); lteDataAll.show(); }}
spark를 통해phoenix에 데이터를 쓰기
package com.cctsoft.spark.offline
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.sql.Row
/**
* Created with IntelliJ IDEA.
* User: Kevin Liu
* CreateDate: 2018/6/15 12:32
* Description: phoenix
*/
object TestMain {
def main(args: Array[String]): Unit = {
batchSaveFaceImsi(null)
}
def batchSaveFaceImsi(imsiRdd: JavaRDD[Row]): Unit ={
import org.apache.phoenix.spark._
val rdd = imsiRdd.rdd.map(x=>{
(x.get(0).toString+":"+x.get(1).toString,x.get(0).toString,x.get(1).toString)
}).filter(f=>f._3.toString!=null && f._3.toString!="")
rdd.foreach(println)
rdd.saveToPhoenix(
"RESIDENT_TMP",
Seq("ID","DEVICE_MAC","IMSI"),
zkUrl = Some("jdbc:phoenix:abigdataclient1:2181")
)
}
}