빅 데이터 시스템 과 대규모 데이터 분석의 작업 3

  • 빅 데이터 시스템 과 대규모 데이터 분석의 작업 3
  • 문제 설명
  • SSSP 프로 그래 밍
  • 동기 화 그림 계산
  • 프로그램 소스 코드

  • 빅 데이터 시스템 과 대규모 데이터 분석의 작업 3
    문제 설명
    작업 3: 동기 화 그림 연산 프로 그래 밍
  • 총체 적 임무
  • SSSP 의 그림 연산 실현
  • 입력: 그림, v0
  • 출력: 정점 ID, 최 단 길이

  • SSSP 프로 그래 밍
  • SSSP
  • 정의:
  • 단일 소스 최 단 경로 단일 소스 최 단 경로
  • 정점 v0 을 정 하고, 정점 마다 가장 짧 은 경 로 를 구한다
  • 위조 코드:
    function Dijkstra(Graph, source):
        dist[source] = 0 //Distance from source to source
        prev[source] = undefined
        for each vertex v in Graph:
            if v != source:
                dist[v] = inf
                prev[v] = undefined
            add v to Q // Q = unvisited nodes
        while Q is not empty:
            u = vertex in Q with min dist[u] // source node in first case
            remove u from Q
            for each neighbor v of u: 
                alt = dist[u] + length(u, v)
                if alt < dist[v]:
                    dist[v] = alt
                    prev[v] = u
        return dist[], prev[]
  • Dijkstra 의 C + + 실현:
    struct edge { int to, length; };
    
    int dijkstra(const vector< vector > &graph, int source, int target) {
        vector<int> min_distance( graph.size(), INT_MAX );
        min_distance[ source ] = 0;
        set< pair<int,int> > active_vertices;
        active_vertices.insert( {0,source} );
    
        while (!active_vertices.empty()) {
            int where = active_vertices.begin()->second;
            if (where == target) return min_distance[where];
            active_vertices.erase( active_vertices.begin() );
            for (auto ed : graph[where]) 
                if (min_distance[ed.to] > min_distance[where] + ed.length) {
                    active_vertices.erase( { min_distance[ed.to], ed.to } );
                    min_distance[ed.to] = min_distance[where] + ed.length;
                    active_vertices.insert( { min_distance[ed.to], ed.to } );
                }
        }
        return INT_MAX;
    }
  • 동기 화 다이어그램 계산
  • 그림 계산
  • 동기 화 그림 계산:
  • 그림 계산 모델:
  • 그림 연산 종료: Active 와 Inactive
  • 모든 정점 이 Inactive 로 변 할 때 종료
  • 모든 정점 을 Active 로 초기 화

  • 시스템 구조:
  • master 할당, 각각 worker 가 하나의 Graph partition
  • 에 대해
  • 초과 시작: master 발표 시작 메시지
  • 초과 계산: 모든 worker 는 로 컬 계산 을 하고 본 partition 의 모든 정점 에 coptute
  • 를 호출 합 니 다.
  • 걸음 마 끝: 걸음 마 k 완성 은 가장 느 린 걸음 마 완성 시간
  • 에 달 려 있다.
  • 스텝 오 버 시작: k + 1 시작
  • 특징:
  • 여러 번 의 교체 가 필요 하 다

  • 비동기 도 연산:
  • 사고방식: 서로 다른 정점 에 서로 다른 업데이트 속 도 를 허용 한다
  • GraphLab:
  • 메모리 공유, 메모리 직접 접근


  • Graphlite:
  • GraphiLite 그림 계산 프레임 워 크 는 BSP 모델 에 속 합 니 다.
  • GraphiLite github 주소https://github.com/schencoding/GraphLite
  • 그림 은 분포 식 병행 계산 에 적합 하 다. 예 를 들 어 최 단 경로, PageRank 등 문제
  • 비교적 유명한 그림 계산 프레임 워 크 는 Prege, cmu 의 Graph Lab, apache 의 Giraph 등 이 있다.

  • 시스템 함수:
  • getValue: 읽 기
  • mutableValue: 수정
  • sendmessage ToAllNeighbors (): 이웃 마다 똑 같은 소식 을 보 냅 니 다
  • getOutEdgeIterator (): 다른 값 을 보 냅 니 다.
  • OutEdgeIterator 를 받 아 이웃 을 차례로 방문 하고 sendmessageTo () 에서 메 시 지 를 보 냅 니 다
  • voteToHalt():
  • 슈퍼 스텝 (): 현재 걸음 수 를 가 져 오고 0 부터 계산 합 니 다
  • 실현 함수:
  • Vertex: 정점, 변, 보 낸 메 시 지 는 모두 double
  • 입 니 다.
  • compute(msg);



  • 프로그램 소스 코드
    #include 
    #include 
    #include 
    
    #include 
    #include 
    using namespace std;
    
    //#include 
    //#include 
    
    #include "GraphLite.h"
    
    #define VERTEX_CLASS_NAME(name) SSSP##name
    
    #define EPS 1e-6
    
    #include
    //#define INF (numeric_limits::max())
    #define INF INT_MAX
    int v0_id; // source_id
    
    class VERTEX_CLASS_NAME(InputFormatter): public InputFormatter {
    public:
        int64_t getVertexNum() {
            unsigned long long n;
            sscanf(m_ptotal_vertex_line, "%lld", &n);
            m_total_vertex= n;
            return m_total_vertex;
        }
        int64_t getEdgeNum() {
            unsigned long long n;
            sscanf(m_ptotal_edge_line, "%lld", &n);
            m_total_edge= n;
            return m_total_edge;
        }
        int getVertexValueSize() {
            m_n_value_size = sizeof(double);
            return m_n_value_size;
        }
        int getEdgeValueSize() {
            m_e_value_size = sizeof(double);
            return m_e_value_size;
        }
        int getMessageValueSize() {
            m_m_value_size = sizeof(double);
            return m_m_value_size;
        }
        void loadGraph() {
            unsigned long long last_vertex;
            unsigned long long from;
            unsigned long long to;
            double weight = 0;
    
            double value = 1;
            int outdegree = 0;
    
            const char *line= getEdgeLine();
    
            // Note: modify this if an edge weight is to be read
            //       modify the 'weight' variable
    
            sscanf(line, "%lld %lld %lf", &from, &to, &weight);
            addEdge(from, to, &weight);
    
            last_vertex = from;
            ++outdegree;
            for (int64_t i = 1; i < m_total_edge; ++i) {
                line= getEdgeLine();
    
                // Note: modify this if an edge weight is to be read
                //       modify the 'weight' variable
    
                sscanf(line, "%lld %lld %lf", &from, &to, &weight);
                if (last_vertex != from) {
                    addVertex(last_vertex, &value, outdegree);
                    last_vertex = from;
                    outdegree = 1;
                } else {
                    ++outdegree;
                }
                addEdge(from, to, &weight);
            }
            addVertex(last_vertex, &value, outdegree);
        }
    };
    
    class VERTEX_CLASS_NAME(OutputFormatter): public OutputFormatter {
    public:
        void writeResult() {
            int64_t vid;
            double value;
            char s[1024];
    
            for (ResultIterator r_iter; ! r_iter.done(); r_iter.next() ) {
                r_iter.getIdValue(vid, &value);
                int n = sprintf(s, "%lld: %d
    "
    , (unsigned long long)vid, (int)value); writeNextResLine(s, n); } } }; // An aggregator that records a double value tom compute sum class VERTEX_CLASS_NAME(Aggregator): public Aggregator<double> { public: void init() { m_global = 0; m_local = 0; } void* getGlobal() { return &m_global; } void setGlobal(const void* p) { m_global = * (double *)p; } void* getLocal() { return &m_local; } void merge(const void* p) { m_global += * (double *)p; } void accumulate(const void* p) { m_local += * (double *)p; } }; class VERTEX_CLASS_NAME(): public Vertex <double, double, double> { public: void compute(MessageIterator* pmsgs) { int val; int source_id = (int)v0_id; //if(getSuperstep() == 0){ if ((double)getVertexId() == source_id){ val = 0; //printf("12312312312312111111111111"); } else { val = INF; } //printf(" 12 %lf", (double)getVertexId()); //if (getSuperstep() == 0) { //val= 10000; // sendMessageToAllNeighbors(val); //} else { if (getSuperstep() >= 50) { double global_val = * (double *)getAggrGlobal(0); if (global_val < EPS) { voteToHalt(); return; } } //printf("!!!!!!!msg value %f
    ", (double)pmsgs->getValue());
    for ( ; ! pmsgs->done(); pmsgs->next() ) { //sum += pmsgs->getValue(); if (pmsgs->getValue() < val){ val = pmsgs->getValue(); /*if (1 == getVertexId()){ printf("this %lf",(double)pmsgs->getValue());} */ } /* if(1==getVertexId()){ printf("testttttt %f",(double)val);}*/ } // if(1==getVertexId()){printf("%f
    ",(double)val);}
    //printf("the getValue()= %f
    ",(double)getValue());
    if((val < getValue())|| getSuperstep() == 0) { if(1==getVertexId()){printf("%f
    "
    ,(double)val);} printf("getValue() = %f
    "
    ,(double)getValue()); * mutableValue() = val; //double acc = fabs(getValue() - val); //accumulateAggr(0, &acc); OutEdgeIterator otherEdge = getOutEdgeIterator(); for (; ! otherEdge.done(); otherEdge.next()){ double vertex_id = otherEdge.target(); double edge_value = otherEdge.getValue(); //sendMessageTo(vertex_id, val + edge_value); //printf("the edge_value: %f
    ", (double)edge_value);
    /*if ( getVertexId() == 0){ printf("the vertex_id:%f
    ", (double)vertex_id); }*/
    //if (vertex_id == source_id) { // val = 0; // //* mutableValue() = val; // sendMessageTo(vertex_id, val + edge_value); // //printf("111: %lf", (double)val); //} else { sendMessageTo(vertex_id, val + edge_value); //} } } // * mutableValue() = val; //if(getVertexId() < 4){printf("
    %lf here is %lf
    ",(double)getVertexId(),(double)val);}
    //const int64_t n = getOutEdgeIterator().size(); //sendMessageToAllNeighbors(val / n); //printf("the end msg value %d", (int)pmsgs->getValue()); voteToHalt(); } }; class VERTEX_CLASS_NAME(Graph): public Graph { public: VERTEX_CLASS_NAME(Aggregator)* aggregator; public: // argv[0]: PageRankVertex.so // argv[1]: // argv[2]: // argv[3]: void init(int argc, char* argv[]) { setNumHosts(5); setHost(0, "localhost", 1411); setHost(1, "localhost", 1421); setHost(2, "localhost", 1431); setHost(3, "localhost", 1441); setHost(4, "localhost", 1451); if (argc < 3) { printf ("Usage: %s
    "
    , argv[0]); exit(1); } m_pin_path = argv[1]; m_pout_path = argv[2]; v0_id = atoi(argv[3]); aggregator = new VERTEX_CLASS_NAME(Aggregator)[1]; regNumAggr(1); regAggr(0, &aggregator[0]); } void term() { delete[] aggregator; } }; /* STOP: do not change the code below. */ extern "C" Graph* create_graph() { Graph* pgraph = new VERTEX_CLASS_NAME(Graph); pgraph->m_pin_formatter = new VERTEX_CLASS_NAME(InputFormatter); pgraph->m_pout_formatter = new VERTEX_CLASS_NAME(OutputFormatter); pgraph->m_pver_base = new VERTEX_CLASS_NAME(); return pgraph; } extern "C" void destroy_graph(Graph* pobject) { delete ( VERTEX_CLASS_NAME()* )(pobject->m_pver_base); delete ( VERTEX_CLASS_NAME(OutputFormatter)* )(pobject->m_pout_formatter); delete ( VERTEX_CLASS_NAME(InputFormatter)* )(pobject->m_pin_formatter); delete ( VERTEX_CLASS_NAME(Graph)* )pobject; }

    좋은 웹페이지 즐겨찾기