스파크의 브로드캐스트.
2857 단어 sparksql
스파크의 브로드캐스트.
1. 개요
실제 장면에서 1개의function이sparkoperation(예:map,reduce)에 전달될 때 이 function은 원격 집단 node에서 실행됩니다.이 변수들은 모든 기계에 복사됩니다. 원격 기계에서는 이 변수를 업데이트하지 않고 드라이버 프로그램으로 전송됩니다.tasks를 뛰어넘어 읽기와 쓰기 변수를 공유하는 지원은 일반적으로 비효율적이다.그러나 spark는 두 가지 일반적인 공유 변수 모델을 제공했다. 그것이 바로 방송 변수와 누적기이다.
Broadcast(방송)는 프로필, 맵 데이터 세트, 트리 데이터 구조 등을 공유하여 TASK 작업에 관련 변수를 더욱 빠르고 빠르게 사용할 수 있도록 합니다.또한 사용할 수 있습니다. 물론 redis를 사용하여 공유 데이터를 저장하고 모든task가 redis를 연결하여 공유 데이터를 얻을 수 있습니다.
2. 간단한 예
먼저 집합 변수를 생성하여 이 변수를 sparkContext의broadcast 함수를 통해 방송하고 마지막으로 rdd의 모든 파티션이 교체될 때 이 방송 변수를 사용합니다.
val values = List[Int](1,2,3)
val broadcastValues = sparkContext.broadcast(values)
rdd.mapPartitions(iter => {
broadcastValues.getValue.foreach(println)
})
3. 구체적인 예: IP 귀속지 조회
ip.txt 파일 내용: 1.0.1.0 | 1.0.3.255 | 16777472 | 16778239 | 아시아 | 중국 | 복건 | 복주 | 전신 | 350100 | 중국 | CN | 119.306239 | 26.075302 1.0.8.0 | 1.0.15.255 | 16779264 | 16781311 | 아시아 | 중국 | 중국 | 광동 | 광저우 | 전신 | 440100 | 중국 | CN | 113.280637 | 23.125178 1.0.32.0 | 1.0.63.255 | 16785408 | 16793599 | 아시아 | 중국 | 광동 | 광저우 | 중국 | 중국 | 중국 | CN | 113.28063 7 | 23.125178 1.0.0 | 1.1.0.255 | 16842752 | 16843007 | 아시아 | 중국국가 | 푸젠 성 | 푸저우 | 전신 | 350100 | 중국 | CN | 119.306239 | 26.075302 1.2.0 | 1.7.255 | 16843264 | 16844799 | 아시아 | 중국 | 푸젠 성 | 푸저우 | 전신 | 350100 | 중국 | CN | 119.306239 | 26.075302
object IPFind {
//Ip
def ip2Long(ip: String): Long = {
val fragments = ip.split("[.]")
var ipNum = 0L
for (i = lines(middle)._1.toLong) && (ip <= lines(middle)._2.toLong))
return middle
if (ip < lines(middle)._1.toLong)
high = middle - 1
else {
low = middle + 1
}
}
-1
}
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.6.1");
val conf = new SparkConf().setAppName("IpJdbc2").setMaster("local[2]")
val sc = new SparkContext(conf)
val rdd1 = sc.textFile("D:\\textdata\\ip.txt").map(line =>{
val fields = line.split("\\|")
val start_num = fields(2)
val end_num = fields(3)
val province = fields(6)
(start_num,end_num,province)
})
// , task
val rpRulesBroakcast = rdd1.collect()
val ipRulesBroadcast = sc.broadcast(rpRulesBroakcast)
// Ip
val rdd3 = sc.textFile("D:\\textdata\\ip.txt").map(line =>{
val fields = line.split("\\|")
fields(1)
})
// task ,
val result = rdd3.map(ip =>{
val ipNum = ip2Long(ip.toString)
val index = binarySearch(ipRulesBroadcast.value,ipNum)
val info = ipRulesBroadcast.value(index)
//(ip Num, ip Num, )
info
}).map(t => (t._3, 1)).reduceByKey(_+_)
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
[AWS Glue] 쿼리를 바탕으로 Cloudfront 로그를 Parquet & JST의 ETL (+ 구역 분할) 단계로 나누기알림으로 다음과 같은 용례에 대한 대응 절차를 기재한다. (aws 컨트롤러의 사용 방법 등 세부 부분은 생략) Athena로 Cloudfront 로그 분석 ・Cloudfront 로그 내의 시간은 UTC이기 때문에 J...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.