spark - jdbc 데이터베이스 파 티 션 읽 기

3377 단어 spark
spark 는 jdbc 를 통 해 데이터 베 이 스 를 읽 습 니 다. 데이터 가 너무 크 면 파 티 션 을 해 야 합 니 다. 그렇지 않 으 면 실행 이 느 리 고 파 티 션 수 는 webui 에서 볼 수 있 습 니 다. 파 티 션 수 는 task 수 입 니 다.만약 에 구역 을 나 눈 후에 어떤 task 는 빨리 완성 되 고 어떤 task 는 완성 이 비교적 느 립 니 다. 이 럴 때 sql 로 데이터 베이스 에서 데이터 가 기울 어 졌 는 지 찾 아야 합 니 다. 데이터 가 기울 어 진 곳 에 몇 개의 구역 을 더 만들어 야 합 니 다. 그러면 빠 를 것 입 니 다.자, 더 이상 말 하지 않 겠 습 니 다. 코드 를 올 리 겠 습 니 다.
import java.util.Properties

import org.apache.spark.sql.{SaveMode, SparkSession}

object MppTopN {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().appName("testMPP").getOrCreate()
    val prop = new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","root")
    prop.setProperty("driver","com.gbase.jdbc.Driver")
    prop.setProperty("fetchsize","500000")
    val url="jdbc:gbase://172.18.66.70/lishuaitest"
    val tableName="menjin_1"
//        ,          ,             ,           ,                 ,         。          
 val predicates = Array(
      "2015/8/1" -> "2015/8/10",
      "2015/8/11" -> "2015/8/20",
      "2015/8/21" -> "2015/8/30",
      "2015/9/1" -> "2015/9/5",
      "2015/9/6" -> "2015/9/10",
      "2015/9/11" -> "2015/9/15",
      "2015/9/16" -> "2015/9/20",
      "2015/9/21" -> "2015/9/25",
      "2015/9/26" -> "2015/9/30",
      "2015/10/1" -> "2015/10/5",
      "2015/10/6" -> "2015/10/10",
      "2015/10/11" -> "2015/10/13",
      "2015/10/14" -> "2015/10/15",
      "2015/10/16" -> "2015/10/20",
      "2015/10/21" -> "2015/10/23",
      "2015/10/24" -> "2015/10/25",
      "2015/10/26" -> "2015/10/28",
      "2015/10/29" -> "2015/10/31",
      "2015/11/1" -> "2015/11/5",
      "2015/11/6" -> "2015/11/10",
      "2015/11/11" -> "2015/11/15",
      "2015/11/16" -> "2015/11/20",
      "2015/11/21" -> "2015/11/25",
      "2015/11/26" -> "2015/11/30",
      "2015/12/1" -> "2015/12/5",
      "2015/12/6" -> "2015/12/10",
      "2015/12/11" -> "2015/12/15",
      "2015/12/16" -> "2015/12/20",
      "2015/12/21" -> "2015/12/23",
      "2015/12/24" -> "2015/12/25",
      "2015/12/26" -> "2015/12/28",
      "2015/12/29" -> "2015/12/31"

    ).map{
      case (start,end) =>
        s"cast(INOUTTIME as date) >= date '$start'" + s"and cast(INOUTTIME as date) <= date '$end'"
    }
//               ,          
    val dataFrame = session.sqlContext.read.jdbc(url,tableName,predicates,prop)
    dataFrame.registerTempTable("test")
    val frame = dataFrame.sqlContext.sql("select BUILDING  from test order by BUILDING  limit 50 ")
 frame.coalesce(1).write.mode(SaveMode.Append).jdbc(url,"top_menjin",prop)
    session.stop()
  }
}

좋은 웹페이지 즐겨찾기