스파크의 브로드캐스트.

2857 단어 sparksql

스파크의 브로드캐스트.


1. 개요


실제 장면에서 1개의function이sparkoperation(예:map,reduce)에 전달될 때 이 function은 원격 집단 node에서 실행됩니다.이 변수들은 모든 기계에 복사됩니다. 원격 기계에서는 이 변수를 업데이트하지 않고 드라이버 프로그램으로 전송됩니다.tasks를 뛰어넘어 읽기와 쓰기 변수를 공유하는 지원은 일반적으로 비효율적이다.그러나 spark는 두 가지 일반적인 공유 변수 모델을 제공했다. 그것이 바로 방송 변수와 누적기이다.
Broadcast(방송)는 프로필, 맵 데이터 세트, 트리 데이터 구조 등을 공유하여 TASK 작업에 관련 변수를 더욱 빠르고 빠르게 사용할 수 있도록 합니다.또한 사용할 수 있습니다. 물론 redis를 사용하여 공유 데이터를 저장하고 모든task가 redis를 연결하여 공유 데이터를 얻을 수 있습니다.

2. 간단한 예


먼저 집합 변수를 생성하여 이 변수를 sparkContext의broadcast 함수를 통해 방송하고 마지막으로 rdd의 모든 파티션이 교체될 때 이 방송 변수를 사용합니다.
val values = List[Int](1,2,3)
val broadcastValues = sparkContext.broadcast(values)
rdd.mapPartitions(iter => {
  broadcastValues.getValue.foreach(println)
})

3. 구체적인 예: IP 귀속지 조회


ip.txt 파일 내용: 1.0.1.0 | 1.0.3.255 | 16777472 | 16778239 | 아시아 | 중국 | 복건 | 복주 | 전신 | 350100 | 중국 | CN | 119.306239 | 26.075302 1.0.8.0 | 1.0.15.255 | 16779264 | 16781311 | 아시아 | 중국 | 중국 | 광동 | 광저우 | 전신 | 440100 | 중국 | CN | 113.280637 | 23.125178 1.0.32.0 | 1.0.63.255 | 16785408 | 16793599 | 아시아 | 중국 | 광동 | 광저우 | 중국 | 중국 | 중국 | CN | 113.28063 7 | 23.125178 1.0.0 | 1.1.0.255 | 16842752 | 16843007 | 아시아 | 중국국가 | 푸젠 성 | 푸저우 | 전신 | 350100 | 중국 | CN | 119.306239 | 26.075302 1.2.0 | 1.7.255 | 16843264 | 16844799 | 아시아 | 중국 | 푸젠 성 | 푸저우 | 전신 | 350100 | 중국 | CN | 119.306239 | 26.075302
object IPFind {

  //Ip 
  def ip2Long(ip: String): Long = {
    val fragments = ip.split("[.]")
    var ipNum = 0L
    for (i = lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong))
        return middle
      if (ip < lines(middle)._1.toLong)
        high = middle - 1
      else {
        low = middle + 1
      }
    }
    -1
  }

  def main(args: Array[String]): Unit = {
    System.setProperty("hadoop.home.dir", "D:\\hadoop-2.6.1");
    val conf = new SparkConf().setAppName("IpJdbc2").setMaster("local[2]")
    val sc = new SparkContext(conf)

    val rdd1 = sc.textFile("D:\\textdata\\ip.txt").map(line =>{
      val fields = line.split("\\|")
      val start_num = fields(2)
      val end_num = fields(3)
      val province = fields(6)
      (start_num,end_num,province)
    })

	// , task 
    val rpRulesBroakcast =  rdd1.collect()
    val ipRulesBroadcast = sc.broadcast(rpRulesBroakcast)

	// Ip 
    val rdd3 = sc.textFile("D:\\textdata\\ip.txt").map(line =>{
      val fields = line.split("\\|")
      fields(1)
    })

    // task , 
    val result = rdd3.map(ip =>{
      val ipNum = ip2Long(ip.toString)
      val index = binarySearch(ipRulesBroadcast.value,ipNum)
      val info = ipRulesBroadcast.value(index)
      //(ip Num, ip Num, )
      info
    }).map(t => (t._3, 1)).reduceByKey(_+_)

}

좋은 웹페이지 즐겨찾기