Grapx 에서 pregel 상세 해석 및 구체 적 인 응용 분석 (최 단 경 로 를 예 로 들 면)

4532 단어 빅 데이터
Spark Pregel 매개 변수 설명
Pregel 은 강력 한 그림 기반 의 교체 알고리즘 이자 Spark 의 교체 응용 aggregateMessage 의 전형 적 인 사례 로 그림 에서 가장 짧 은 경로, 관건 적 인 경로, n 도 관계 등 편리 한 교체 계산 을 할 수 있다.그러나 예전 에 그림 계산 에 접촉 이 많 지 않 았 던 어린이 신발 에 있어 이 api 는 비교적 무 거 운 그룹의 인터페이스 라 이해 하기 어렵다.Spark 의 Pregel 정 의 는 다음 과 같 습 니 다.
def pregel[A: ClassTag](
      initialMsg: A,
      maxIterations: Int = Int.MaxValue,
      activeDirection: EdgeDirection = EdgeDirection.Either)(
      vprog: (VertexId, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)
  }

각 매개 변수의 의 미 는 다음 과 같다. initial Msg: 메 시 지 를 초기 화 합 니 다. 이 초기 메 시 지 는 그림 의 각 노드 의 속성 을 초기 화 하 는 데 사 용 됩 니 다. pregel 에서 호출 할 때 먼저 그림 에서 mapVertices 를 사용 하여 initial Msg 의 값 에 따라 각 노드 의 값 을 업데이트 합 니 다. 어떻게 업데이트 하 는 지 는 vprog 매개 변수 에 의 해 정 해 집 니 다.vprog 함 수 는 initialMsg 메 시 지 를 매개 변수 로 받 아 해당 노드 의 값 을 업데이트 합 니 다.
maxIterations: 최대 교체 횟수
activeDirection: 변 의 활약 방향 을 나타 내 는데 활약 방향 이 무엇 입 니까? 먼저 활약 메시지 와 활약 정점 의 개념 을 설명해 야 합 니 다. 활약 노드 는 특정한 교체 에서 pregel 은 sendmsg 와 mergemsg 를 매개 변수 로 graph 의 aggregateMessage 방법 을 호출 한 후에 소식 을 받 는 노드 를 말 합 니 다. 활약 메 시 지 는 바로 이번 교체 에서 성공 적 으로 받 은 모든 소식 입 니 다.이렇게 되면 어떤 쪽 의 src 노드 는 활성 노드 이 고 어떤 dst 노드 는 활성 노드 이 며 어떤 쪽 의 양쪽 노드 는 모두 활성 노드 이다.activeDirection 인자 가 "EdgeDirection. Out" 으로 지정 되면 다음 라운드 에서 메 시 지 를 받 는 아웃 사 이 드 (src - > dst) 만 sendmsg 함 수 를 실행 합 니 다. 즉, sendmsg 리 셋 함 수 는 "dst - > src" 의 edgeTriplet 컨 텍스트 파 라미 터 를 걸 러 냅 니 다.
vprog: 노드 변환 함수, 초기 시간, 그리고 매 라운드 교체 후 pregel 은 지난 라운드 에 사 용 된 msg 와 이곳 의 vprod 함수 에 따라 그림 에서 join Vertices 방법 을 호출 하여 모든 메 시 지 를 받 은 노드 를 변화 시 킵 니 다. 이 함 수 는 초기 시간 을 제외 하고 모두 이 자 를 받 은 노드 에서 만 실 행 됩 니 다. 이 점 은 소스 코드 에서 볼 수 있 습 니 다. 소스 코드 에 사 용 된 것 은 join Vertices 입 니 다.(message) (vprog) 이 때문에 메 시 지 를 받 지 못 한 노드 는 join 이후 에 거 른 것 입 니 다.
sendmsg: 메시지 전송 함수 입 니 다. 이 함수 의 실행 매개 변 수 는 대표 변 의 컨 텍스트 입 니 다. pregel 은 aggregateMessages 를 호출 할 때 EdgeContext 를 EdgeTriplet 대상 (ctx. toEdgeTriplet) 으로 변환 합 니 다. 사용 자 는 Iterator [(VertexId, A)] 를 통 해 사용 해 야 합 니 다.어떤 메 시 지 를 보 내 는 지, 그 노드 에 보 내 는 내용 이 무엇 인지 지정 합 니 다. 한 가장자리 에 여러 개의 메 시 지 를 보 낼 수 있 기 때 문 입 니 다. 예 를 들 어 sendToDst, 예 를 들 어 sendToSrc 등 이 있 기 때문에 여 기 는 Iterator 입 니 다. 모든 요 소 는 하나의 tuple 입 니 다. 그 중의 vertexId 는 이 메 시 지 를 받 으 려 는 노드 의 id 입 니 다. 이것 은 이 가장자리 에 있 는 srcId 나 dstId 일 수 있 고 A 는 보 낼 내용 입 니 다. 따라서src 에서 dst 에 메 시 지 를 보 내야 한다 면, Iterator (dstId, A) 가 있 습 니 다. 아무 메시지 도 보 내지 않 으 면 빈 Iterator: Iterator. empty 로 돌아 갈 수 있 습 니 다.
mergeMsg: 이웃 노드 가 여러 메 시 지 를 받 았 을 때의 합병 논 리 는 vprog 함수 와 구별 되 는 것 을 주의 하 십시오. mergeMsg 는 메시지 내용 만 합 칠 수 있 지만 합병 후 노드 에 업데이트 되 지 않 습 니 다. vprog 함 수 는 받 은 메시지 (mergeMsg 가 발생 한 결과) 에 따라 노드 속성 을 업데이트 할 수 있 습 니 다.
코드 예제: 최 단 경로 구현
package BooksCode

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
import org.graphstream.graph.implementations.{AbstractEdge, SingleGraph, SingleNode}


object ShortestPath_Pregel {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    val conf = new SparkConf().setAppName("ShortestPath_Pregel").setMaster("local")
    val sc = new SparkContext(conf)

    val graph:Graph[Long,Double] = GraphGenerators.logNormalGraph(sc,numVertices = 10).mapEdges(e =>e.attr.toDouble)
    val sourceId = 5L

    val initialGraph = graph.mapVertices((id,_)=>if(id==sourceId) 0.0 else Double.PositiveInfinity)
//    println(initialGraph.vertices.collect.mkString("
")) println(initialGraph.edges.distinct().collect.mkString("
")) println("################################################") val sssp = initialGraph.pregel(Double.PositiveInfinity)( // verte program (id,dist,newDisst) =>{ println((id,dist,newDisst)) math.min(dist,newDisst)} , //Send Message triplet => {if (triplet.srcAttr + triplet.attr < triplet.dstAttr){ Iterator((triplet.dstId,triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, (a,b) => math.min(a,b) //Merge message ) //println(sssp.vertices.collect.mkString("
")) } }

좋은 웹페이지 즐겨찾기