JavaPairRDD에 조치 적용

JavaPairRDD.foreachPartition 함수를 올바르게 적용하는 방법은 무엇입니까?
저는 Apache Spark를 처음 사용하고 사용자 정의 파티셔너를 사용하여 2개 부분으로 분할된 RDD에서 사용자 정의 최근접 이웃 알고리즘을 실행하려고 합니다. JavaPairRDD에는 그래프 세부 정보와 그래프에서 생성된 임의 개체가 포함됩니다.





내 논리에 따르면 각 파티션에 대한 하위 그래프를 만들고 각 하위 그래프에서 사용자 지정 알고리즘을 실행하고 있습니다. "제대로 작동하지 않지만"작동하는 것 같습니다. 이것이 각 파티션에 조치를 적용하는 올바른 방법인지 확실하지 않습니다. 내 코드와 결과도 추가하고 있습니다. 의견과 제안을 높이 평가합니다.





// <Partition_Index_Key, Map<Source_vertex, Map<Destination Vertex, Tuple2<Edge_Length, ArrayList of Random Objects>>
            JavaPairRDD<Object, Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>>> adjVertForSubgraphsRDD = jscontext
                    .parallelizePairs(adjacentVerticesForSubgraphs)
                    .partitionBy(new CustomPartitioner(CustomPartitionSize));

            //applying foreachPartition action on JavaPairRDD
            adjVertForSubgraphsRDD.foreachPartition(
                    new VoidFunction<Iterator<Tuple2<Object, Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>>>>>() {

                        /**
                         * 
                         */
                        private static final long serialVersionUID = 1L;

                        @Override
                        public void call(
                                Iterator<Tuple2<Object, Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>>>> tupleRow)
                                throws Exception {
                            int sourceVertex;
                            int destVertex;
                            double edgeLength;

                            int roadObjectId;
                            boolean roadObjectType;
                            double distanceFromStart;

                            CoreGraph subgraph0 = new CoreGraph();
                            CoreGraph subgraph1 = new CoreGraph();

                            while (tupleRow.hasNext()) {


                                Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>> newMap = tupleRow.next()
                                        ._2();

                                if ((Integer.parseInt(String.valueOf(tupleRow.next()._1())) == 0)) {

                                    for (Object srcVertex : newMap.keySet()) {

                                        for (Object dstVertex : newMap.get(srcVertex).keySet()) {
                                            if (newMap.get(srcVertex).get(dstVertex)._2() != null) {
                                                sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
                                                destVertex = Integer.parseInt(String.valueOf(dstVertex));
                                                edgeLength = newMap.get(srcVertex).get(dstVertex)._1();

                                                subgraph0.addEdge(sourceVertex, destVertex, edgeLength);

                                                for (int i = 0; i < newMap.get(srcVertex).get(dstVertex)._2()
                                                        .size(); i++) {
                                                    int currentEdgeId = subgraph0.getEdgeId(sourceVertex, destVertex);

                                                    roadObjectId = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getObjectId();
                                                    roadObjectType = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getType();
                                                    distanceFromStart = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getDistanceFromStartNode();
                                                    RoadObject rn0 = new RoadObject();
                                                    rn0.setObjId(roadObjectId);
                                                    rn0.setType(roadObjectType);
                                                    rn0.setDistanceFromStartNode(distanceFromStart);

                                                    subgraph0.addObjectOnEdge(currentEdgeId, rn0);
                                                }
                                            } else {
                                                sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
                                                destVertex = Integer.parseInt(String.valueOf(dstVertex));
                                                edgeLength = newMap.get(srcVertex).get(dstVertex)._1();

                                                subgraph0.addEdge(sourceVertex, destVertex, edgeLength);
                                            }

                                        }
                                    }

                                } else if ((Integer.parseInt(String.valueOf(tupleRow.next()._1())) == 1)) {

                                    for (Object srcVertex : newMap.keySet()) {
                                        for (Object dstVertex : newMap.get(srcVertex).keySet()) {
                                            if (newMap.get(srcVertex).get(dstVertex)._2() != null) {
                                                sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
                                                destVertex = Integer.parseInt(String.valueOf(dstVertex));
                                                edgeLength = newMap.get(srcVertex).get(dstVertex)._1();

                                                subgraph1.addEdge(sourceVertex, destVertex, edgeLength);

                                                for (int i = 0; i < newMap.get(srcVertex).get(dstVertex)._2()
                                                        .size(); i++) {
                                                    int currentEdgeId = subgraph1.getEdgeId(sourceVertex, destVertex);

                                                    roadObjectId = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getObjectId();
                                                    roadObjectType = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getType();
                                                    distanceFromStart = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getDistanceFromStartNode();
                                                    RoadObject rn1 = new RoadObject();
                                                    rn1.setObjId(roadObjectId);
                                                    rn1.setType(roadObjectType);
                                                    rn1.setDistanceFromStartNode(distanceFromStart);

                                                    subgraph1.addObjectOnEdge(currentEdgeId, rn1);
                                                }
                                            } else {
                                                sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
                                                destVertex = Integer.parseInt(String.valueOf(dstVertex));
                                                edgeLength = newMap.get(srcVertex).get(dstVertex)._1();

                                                subgraph1.addEdge(sourceVertex, destVertex, edgeLength);
                                            }

                                        }
                                    }
                                }

                            }
                            // Straight forward nearest neighbor algorithm from each true to false.
                            ANNNaive ann = new ANNNaive();
                            System.err.println("-------------------------------");
                            Map<Integer, Integer> nearestNeighorPairsSubg0 = ann.compute(subgraph0, true);
                            System.out.println("for subgraph0");
                            System.out.println(nearestNeighorPairsSubg0);
                            System.err.println("-------------------------------");

                            System.err.println("-------------------------------");
                            Map<Integer, Integer> nearestNeighorPairsSubg1 = ann.compute(subgraph1, true);
                            System.out.println("for subgraph1");
                            System.out.println(nearestNeighorPairsSubg1);
                            System.err.println("-------------------------------");

                        }
                    });`



좋은 웹페이지 즐겨찾기