Grapx 에서 pregel 상세 해석 및 구체 적 인 응용 분석 (최 단 경 로 를 예 로 들 면)
4532 단어 빅 데이터
Pregel 은 강력 한 그림 기반 의 교체 알고리즘 이자 Spark 의 교체 응용 aggregateMessage 의 전형 적 인 사례 로 그림 에서 가장 짧 은 경로, 관건 적 인 경로, n 도 관계 등 편리 한 교체 계산 을 할 수 있다.그러나 예전 에 그림 계산 에 접촉 이 많 지 않 았 던 어린이 신발 에 있어 이 api 는 비교적 무 거 운 그룹의 인터페이스 라 이해 하기 어렵다.Spark 의 Pregel 정 의 는 다음 과 같 습 니 다.
def pregel[A: ClassTag](
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
}
각 매개 변수의 의 미 는 다음 과 같다. initial Msg: 메 시 지 를 초기 화 합 니 다. 이 초기 메 시 지 는 그림 의 각 노드 의 속성 을 초기 화 하 는 데 사 용 됩 니 다. pregel 에서 호출 할 때 먼저 그림 에서 mapVertices 를 사용 하여 initial Msg 의 값 에 따라 각 노드 의 값 을 업데이트 합 니 다. 어떻게 업데이트 하 는 지 는 vprog 매개 변수 에 의 해 정 해 집 니 다.vprog 함 수 는 initialMsg 메 시 지 를 매개 변수 로 받 아 해당 노드 의 값 을 업데이트 합 니 다.
maxIterations: 최대 교체 횟수
activeDirection: 변 의 활약 방향 을 나타 내 는데 활약 방향 이 무엇 입 니까? 먼저 활약 메시지 와 활약 정점 의 개념 을 설명해 야 합 니 다. 활약 노드 는 특정한 교체 에서 pregel 은 sendmsg 와 mergemsg 를 매개 변수 로 graph 의 aggregateMessage 방법 을 호출 한 후에 소식 을 받 는 노드 를 말 합 니 다. 활약 메 시 지 는 바로 이번 교체 에서 성공 적 으로 받 은 모든 소식 입 니 다.이렇게 되면 어떤 쪽 의 src 노드 는 활성 노드 이 고 어떤 dst 노드 는 활성 노드 이 며 어떤 쪽 의 양쪽 노드 는 모두 활성 노드 이다.activeDirection 인자 가 "EdgeDirection. Out" 으로 지정 되면 다음 라운드 에서 메 시 지 를 받 는 아웃 사 이 드 (src - > dst) 만 sendmsg 함 수 를 실행 합 니 다. 즉, sendmsg 리 셋 함 수 는 "dst - > src" 의 edgeTriplet 컨 텍스트 파 라미 터 를 걸 러 냅 니 다.
vprog: 노드 변환 함수, 초기 시간, 그리고 매 라운드 교체 후 pregel 은 지난 라운드 에 사 용 된 msg 와 이곳 의 vprod 함수 에 따라 그림 에서 join Vertices 방법 을 호출 하여 모든 메 시 지 를 받 은 노드 를 변화 시 킵 니 다. 이 함 수 는 초기 시간 을 제외 하고 모두 이 자 를 받 은 노드 에서 만 실 행 됩 니 다. 이 점 은 소스 코드 에서 볼 수 있 습 니 다. 소스 코드 에 사 용 된 것 은 join Vertices 입 니 다.(message) (vprog) 이 때문에 메 시 지 를 받 지 못 한 노드 는 join 이후 에 거 른 것 입 니 다.
sendmsg: 메시지 전송 함수 입 니 다. 이 함수 의 실행 매개 변 수 는 대표 변 의 컨 텍스트 입 니 다. pregel 은 aggregateMessages 를 호출 할 때 EdgeContext 를 EdgeTriplet 대상 (ctx. toEdgeTriplet) 으로 변환 합 니 다. 사용 자 는 Iterator [(VertexId, A)] 를 통 해 사용 해 야 합 니 다.어떤 메 시 지 를 보 내 는 지, 그 노드 에 보 내 는 내용 이 무엇 인지 지정 합 니 다. 한 가장자리 에 여러 개의 메 시 지 를 보 낼 수 있 기 때 문 입 니 다. 예 를 들 어 sendToDst, 예 를 들 어 sendToSrc 등 이 있 기 때문에 여 기 는 Iterator 입 니 다. 모든 요 소 는 하나의 tuple 입 니 다. 그 중의 vertexId 는 이 메 시 지 를 받 으 려 는 노드 의 id 입 니 다. 이것 은 이 가장자리 에 있 는 srcId 나 dstId 일 수 있 고 A 는 보 낼 내용 입 니 다. 따라서src 에서 dst 에 메 시 지 를 보 내야 한다 면, Iterator (dstId, A) 가 있 습 니 다. 아무 메시지 도 보 내지 않 으 면 빈 Iterator: Iterator. empty 로 돌아 갈 수 있 습 니 다.
mergeMsg: 이웃 노드 가 여러 메 시 지 를 받 았 을 때의 합병 논 리 는 vprog 함수 와 구별 되 는 것 을 주의 하 십시오. mergeMsg 는 메시지 내용 만 합 칠 수 있 지만 합병 후 노드 에 업데이트 되 지 않 습 니 다. vprog 함 수 는 받 은 메시지 (mergeMsg 가 발생 한 결과) 에 따라 노드 속성 을 업데이트 할 수 있 습 니 다.
코드 예제: 최 단 경로 구현
package BooksCode
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
import org.graphstream.graph.implementations.{AbstractEdge, SingleGraph, SingleNode}
object ShortestPath_Pregel {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val conf = new SparkConf().setAppName("ShortestPath_Pregel").setMaster("local")
val sc = new SparkContext(conf)
val graph:Graph[Long,Double] = GraphGenerators.logNormalGraph(sc,numVertices = 10).mapEdges(e =>e.attr.toDouble)
val sourceId = 5L
val initialGraph = graph.mapVertices((id,_)=>if(id==sourceId) 0.0 else Double.PositiveInfinity)
// println(initialGraph.vertices.collect.mkString("
"))
println(initialGraph.edges.distinct().collect.mkString("
"))
println("################################################")
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
// verte program
(id,dist,newDisst) =>{
println((id,dist,newDisst))
math.min(dist,newDisst)} ,
//Send Message
triplet => {if (triplet.srcAttr + triplet.attr < triplet.dstAttr){
Iterator((triplet.dstId,triplet.srcAttr + triplet.attr))
}
else {
Iterator.empty
}
},
(a,b) => math.min(a,b) //Merge message
)
//println(sssp.vertices.collect.mkString("
"))
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.