Giraph: The process of read Vertices
21103 단어 graphdb
GraphTaskManager.execute
/**
* Perform the work assigned to this compute node for this job run.
* 1) Run checkpoint per frequency policy.
* 2) For every vertex on this mapper, run the compute() function
* 3) Wait until all messaging is done.
* 4) Check if all vertices are done. If not goto 2).
* 5) Dump output.
*/
public void execute() throws IOException, InterruptedException {
// omit some lines
serviceWorker.setup();
// omit some lines
}
The type of serviceWorker is BspServiceWorker.
serviceWorker = new BspServiceWorker(context, this);
BspServiceWorker.setup()
public FinishedSuperstepStats setup() {
// Unless doing a restart, prepare for computation:
// 1. Start superstep INPUT_SUPERSTEP (no computation)
// 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
// 3. Process input splits until there are no more.
// 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
// 5. Process any mutations deriving from add edge requests
// 6. Wait for superstep INPUT_SUPERSTEP to complete.
public FinishedSuperstepStats setup() {
// omit some lines
vertexEdgeCount = loadVertices();
// omit some lines
}
BspServiceWorker.loadVertices
private VertexEdgeCount loadVertices() throws KeeperException,
InterruptedException {
VertexInputSplitsCallableFactory inputSplitsCallableFactory =
new VertexInputSplitsCallableFactory(
getConfiguration().createWrappedVertexInputFormat(),
getContext(),
getConfiguration(),
this,
inputSplitsHandler);
return loadInputSplits(inputSplitsCallableFactory);
}
BspServiceWorker.loadInputSplits
/**
* Load the vertices/edges from input slits. Do this until all the
* InputSplits have been processed.
* All workers will try to do as many InputSplits as they can. The master
* will monitor progress and stop this once all the InputSplits have been
* loaded and check-pointed. Keep track of the last input split path to
* ensure the input split cache is flushed prior to marking the last input
* split complete.
*
* Use one or more threads to do the loading.
*
* @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
* @return Statistics of the vertices and edges loaded
* @throws InterruptedException
* @throws KeeperException
*/
private VertexEdgeCount loadInputSplits(
CallableFactory inputSplitsCallableFactory)
throws KeeperException, InterruptedException {
VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
int numThreads = getConfiguration().getNumInputSplitsThreads();
List results =
ProgressableUtils.getResultsWithNCallables(inputSplitsCallableFactory,
numThreads, "load-%d", getContext());
for (VertexEdgeCount result : results) {
vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(result);
}
workerClient.waitAllRequests();
return vertexEdgeCount;
}
ProgressableUtils.getResultsWithNCallables
/**
* Create {#link numThreads} callables from {#link callableFactory},
* execute them and gather results.
*/
public static List getResultsWithNCallables(
CallableFactory callableFactory, int numThreads,
String threadNameFormat, Progressable progressable) {
ExecutorService executorService = Executors.newFixedThreadPool(numThreads,
ThreadUtils.createThreadFactory(threadNameFormat));
HashMap> futures = new HashMap<>(numThreads);
for (int i = 0; i < numThreads; i++) {
Callable callable = callableFactory.newCallable(i);
Future future = executorService.submit(
new LogStacktraceCallable(callable));
futures.put(i, future);
}
InputSplitsCallable.call
InputSplitsCallable.call deal with all splits
@Override
public VertexEdgeCount call() {
VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
int inputSplitsProcessed = 0;
try {
while (true) {
byte[] serializedInputSplit = splitsHandler.reserveInputSplit(
getInputType(), inputSplitsProcessed == 0);
if (serializedInputSplit == null) {
// No splits left
break;
}
vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
loadInputSplit(serializedInputSplit));
context.progress();
++inputSplitsProcessed;
}
} catch (InterruptedException e) {
// ignoring
}
return vertexEdgeCount;
}
InputSplitsCallable.loadInputSplit for one split
/**
* Extract vertices from input split, saving them into a mini cache of
* partitions. Periodically flush the cache of vertices when a limit is
* reached in readVerticeFromInputSplit.
* Mark the input split finished when done.
*/
private VertexEdgeCount loadInputSplit(byte[] serializedInputSplit)
throws IOException, ClassNotFoundException, InterruptedException {
InputSplit inputSplit = getInputSplit(serializedInputSplit);
VertexEdgeCount vertexEdgeCount = readInputSplit(inputSplit);
return vertexEdgeCount;
}
VertexInputSplitsCallable.readInputSplit
@Override
protected VertexEdgeCount readInputSplit(
InputSplit inputSplit) throws IOException, InterruptedException {
VertexReader vertexReader =
vertexInputFormat.createVertexReader(inputSplit, context);
vertexReader.initialize(inputSplit, context);
int count = 0;
while (vertexReader.nextVertex()) {
Vertex readerVertex = vertexReader.getCurrentVertex();
if (readerVertex.getValue() == null) {
readerVertex.setValue(configuration.createVertexValue());
}
readerVertex.setConf(configuration);
++inputSplitVerticesLoaded;
// Before saving to partition-store translate all edges (if present)
if (translateEdge != null) {
// only iff vertexInput reads edges also
if (readerVertex.getEdges() != null && readerVertex.getNumEdges() > 0) {
OutEdges vertexOutEdges = configuration
.createAndInitializeOutEdges(readerVertex.getNumEdges());
for (Edge edge : readerVertex.getEdges()) {
vertexOutEdges.add(configuration.createEdge(translateEdge, edge));
}
// set out edges to translated instance -> old instance is released
readerVertex.setEdges(vertexOutEdges);
}
}
PartitionOwner partitionOwner =
bspServiceWorker.getVertexPartitionOwner(readerVertex.getId());
workerClientRequestProcessor.sendVertexRequest(
partitionOwner, readerVertex);
edgesSinceLastUpdate += readerVertex.getNumEdges();
}
vertexReader.close();
WorkerProgress.get().addVerticesLoaded(
inputSplitVerticesLoaded % VERTICES_UPDATE_PERIOD);
WorkerProgress.get().incrementVertexInputSplitsLoaded();
return new VertexEdgeCount(inputSplitVerticesLoaded,
inputSplitEdgesLoaded + edgesSinceLastUpdate, 0);
}
NettyWorkerClientRequestProcessor.sendVertexRequest
@Override
public boolean sendVertexRequest(PartitionOwner partitionOwner,
Vertex vertex) {
// Add the vertex to the cache
int workerMessageSize = sendPartitionCache.addVertex(
partitionOwner, vertex);
// Send a request if the cache of outgoing message to
// the remote worker 'workerInfo' is full enough to be flushed
if (workerMessageSize >= maxVerticesSizePerWorker) {
PairList
workerPartitionVertices =
sendPartitionCache.removeWorkerData(partitionOwner.getWorkerInfo());
WritableRequest writableRequest =
new SendWorkerVerticesRequest(
configuration, workerPartitionVertices);
doRequest(partitionOwner.getWorkerInfo(), writableRequest);
return true;
}
return false;
}
Vertex reader read vertex and i’ts edges information, first store in local cache. catch size: 629145. maxVerticesSizePerWorker: default 512k
public void doRequest(WorkerInfo workerInfo,
WritableRequest writableRequest) {
// If this is local, execute locally
if (serviceWorker.getWorkerInfo().getTaskId() ==
workerInfo.getTaskId()) {
((WorkerRequest) writableRequest).doRequest(serverData);
localRequests.inc();
} else {
workerClient.sendWritableRequest(
workerInfo.getTaskId(), writableRequest);
remoteRequests.inc();
}
}
Target Work
Target the work receives the request. call
SendWorkerVerticesRequest.doRequest
SendWorkerVerticesRequest.doRequest
public void doRequest(ServerData serverData) {
PairList.Iterator
iterator = workerPartitions.getIterator();
while (iterator.hasNext()) {
iterator.next();
serverData.getPartitionStore()
.addPartitionVertices(iterator.getCurrentFirst(),
iterator.getCurrentSecond());
}
}
If DiskBackedPartitionStore use used, it will DiskBackedPartitionStore.addEntry
DiskBackedPartitionStore.addPartitionVertices
@Override
public void addPartitionVertices(Integer partitionId,
ExtendedDataOutput extendedDataOutput) {
ReadWriteLock rwLock = getPartitionLock(partitionId);
rwLock.readLock().lock();
addEntry(partitionId, extendedDataOutput);
rwLock.readLock().unlock();
}
DiskBackedPartitionStore.addEntry
addEntry will check whether the partition hasPartitionDataOnDisk. At first, it will be false, so call addEntryToInMemoryPartitionData.
protected void addEntry(int partitionId, T entry) {
ReadWriteLock rwLock = getPartitionLock(partitionId);
rwLock.readLock().lock();
if (hasPartitionDataOnDisk.contains(partitionId)) {
// omit some lines
} else {
addEntryToInMemoryPartitionData(partitionId, entry);
}
rwLock.readLock().unlock();
}
DiskBackedPartitionStore.addEntryToInMemoryPartitionData
@Override
protected void addEntryToInMemoryPartitionData(int partitionId,
ExtendedDataOutput vertices) {
if (!partitionStore.hasPartition(partitionId)) {
oocEngine.getMetaPartitionManager().addPartition(partitionId);
}
partitionStore.addPartitionVertices(partitionId, vertices);
}
MetaPartitionManager manages the partition meta information. partitionStore stores the vertex data.
MetaPartitionManager.addPartition
public void addPartition(int partitionId) {
MetaPartition meta = new MetaPartition(partitionId);
MetaPartition temp = partitions.putIfAbsent(partitionId, meta);
// Check if the given partition is new
if (temp == null) {
int index = indexCounter.getAndIncrement();
checkState(partitionIndex.putIfAbsent(partitionId, index) == null);
int ownerThread = getOwnerThreadId(partitionId);
perThreadPartitionDictionary.get(ownerThread).addPartition(meta);
numInMemoryPartitions.getAndIncrement();
}
}
MetaPartitionManager.addPartition put the partition for specific io thread, and increase numInMemoryPartitions, to it can let OutOfCoreCallable offload the partition asynchronously. Please refer The process of OutOfCoreCallable
/**
* Get the thread id that is responsible for a particular partition
*
* @param partitionId id of the given partition
* @return id of the thread responsible for the given partition
*/
public int getOwnerThreadId(int partitionId) {
Integer index = partitionIndex.get(partitionId);
checkState(index != null);
return index % numIOThreads;
}
perThreadPartitionDictionary matains the IO thread for the partition.
MetaPartition
public MetaPartition(int partitionId) {
this.partitionId = partitionId;
this.processingState = ProcessingState.UNPROCESSED;
this.partitionState = StorageState.IN_MEM;
this.currentMessagesState = StorageState.IN_MEM;
this.incomingMessagesState = StorageState.IN_MEM;
}
The type of partitionStore in DiskBackedPartitionStore is SimplePartitionStore
SimplePartitionStore.addPartitionVertices
@Override
public void addPartitionVertices(Integer partitionId,
ExtendedDataOutput extendedDataOutput) {
VertexIterator vertexIterator =
new VertexIterator(extendedDataOutput, conf);
Partition partition = getOrCreatePartition(partitionId);
partition.addPartitionVertices(vertexIterator);
putPartition(partition);
}
getOrCreatePartition
private Partition getOrCreatePartition(Integer partitionId) {
Partition oldPartition = partitions.get(partitionId);
if (oldPartition == null) {
Partition newPartition =
conf.createPartition(partitionId, context);
oldPartition = partitions.putIfAbsent(partitionId, newPartition);
if (oldPartition == null) {
return newPartition;
}
}
return oldPartition;
}
The type of Partition is SimplePartition, SimplePartition extends BasicPartition.
BasicPartition.addPartitionVertices
public void addPartitionVertices(VertexIterator vertexIterator) {
while (vertexIterator.hasNext()) {
vertexIterator.next();
// Release the vertex if it was put, otherwise reuse as an optimization
if (putOrCombine(vertexIterator.getVertex())) {
vertexIterator.releaseVertex();
}
}
}
public boolean putOrCombine(Vertex vertex) {
Vertex originalVertex = vertexMap.get(vertex.getId());
if (originalVertex == null) {
originalVertex =
vertexMap.putIfAbsent(vertex.getId(), vertex);
if (originalVertex == null) {
return true;
}
}
SimplePartition uses a vertexMap to store the vertices of the partition.
private ConcurrentMap> vertexMap;
offload the partition
If the partition is in memory, the backend IOCallable thread can offload the partition. What if the partition has been offloaded.
DiskBackedDataStore.addEntry
If
hasPartitionDataOnDisk.contains(partitionId)
returns true, it will add in dataBuffers, It will be checked in The process of OutOfCoreCallable // Lock partitoin lock first.
if (hasPartitionDataOnDisk.contains(partitionId)) {
List entryList = new ArrayList<>();
entryList.add(entry);
int entrySize = entrySerializedSize(entry);
MutablePair> newPair =
new MutablePair<>(entrySize, entryList);
Pair> oldPair =
dataBuffers.putIfAbsent(partitionId, newPair);
if (oldPair != null) {
synchronized (oldPair) {
newPair = (MutablePair>) oldPair;
newPair.setLeft(oldPair.getLeft() + entrySize);
newPair.getRight().add(entry);
}
}
}
OutOfCoreIOScheduler.getNextIOCommand
The actions getNextIOActions of FixedPartitionOracle returned always contains STORE_MESSAGES_AND_BUFFERS.
IOAction.STORE_MESSAGES_AND_BUFFERS};
case STORE_MESSAGES_AND_BUFFERS:
partitionId = oocEngine.getMetaPartitionManager()
.getOffloadPartitionBufferId(threadId);
MetaPartitionManager.getOffloadPartitionBufferId
public Integer getOffloadPartitionBufferId(int threadId) {
if (oocEngine.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
Integer partitionId =
popFromSet(perThreadVertexEdgeBuffers.get(threadId));
if (partitionId == null) {
DiskBackedPartitionStore, ?, ?> partitionStore =
(DiskBackedPartitionStore, ?, ?>) (oocEngine.getServerData()
.getPartitionStore());
perThreadVertexEdgeBuffers.get(threadId)
.addAll(partitionStore.getCandidateBuffersToOffload(threadId));
DiskBackedEdgeStore, ?, ?> edgeStore =
(DiskBackedEdgeStore, ?, ?>) (oocEngine.getServerData())
.getEdgeStore();
perThreadVertexEdgeBuffers.get(threadId)
.addAll(edgeStore.getCandidateBuffersToOffload(threadId));
partitionId = popFromSet(perThreadVertexEdgeBuffers.get(threadId));
}
return partitionId;
}
return null;
}
DiskBackedDataStore.getCandidateBuffersToOffload
The default value of minBufferSizeToOffload is 8M, so if the buffer size less than 8M, it will not store on disk, you can set it to 1kb to trace the process. The result contains the partition ids to be processed.
public Set getCandidateBuffersToOffload(int ioThreadId) {
Set result = new HashSet<>();
for (Map.Entry>> entry :
dataBuffers.entrySet()) {
int partitionId = entry.getKey();
long aggregateBufferSize = entry.getValue().getLeft();
if (aggregateBufferSize > minBufferSizeToOffload &&
oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId) ==
ioThreadId) {
result.add(partitionId);
}
}
return result;
}
StoreDataBufferIOCommand
When StoreDataBufferIOCommand is executed.
case PARTITION:
DiskBackedPartitionStore partitionStore =
(DiskBackedPartitionStore)
oocEngine.getServerData().getPartitionStore();
numBytesTransferred +=
partitionStore.offloadBuffers(partitionId);
DiskBackedEdgeStore edgeStore =
(DiskBackedEdgeStore) oocEngine.getServerData().getEdgeStore();
numBytesTransferred += edgeStore.offloadBuffers(partitionId);
DiskBackedPartitionStore.offloadBuffers
@Override
public long offloadBuffers(int partitionId)
throws IOException {
return offloadBuffersProxy(partitionId,
new DataIndex().addIndex(DataIndex.TypeIndexEntry.PARTITION));
}
DiskBackedDataStore.offloadBuffersProxy
it will check the
pair.getLeft() < minBufferSizeToOffload
protected long offloadBuffersProxy(int partitionId, DataIndex index)
throws IOException {
Pair> pair = dataBuffers.get(partitionId);
if (pair == null || pair.getLeft() < minBufferSizeToOffload) {
return 0;
}
ReadWriteLock rwLock = getPartitionLock(partitionId);
rwLock.writeLock().lock();
pair = dataBuffers.remove(partitionId);
rwLock.writeLock().unlock();
checkNotNull(pair);
checkState(!pair.getRight().isEmpty());
int ioThreadId =
oocEngine.getMetaPartitionManager().getOwnerThreadId(partitionId);
index.addIndex(NumericIndexEntry.createPartitionEntry(partitionId))
.addIndex(DataIndex.TypeIndexEntry.BUFFER);
OutOfCoreDataAccessor.DataOutputWrapper outputWrapper =
oocEngine.getDataAccessor().prepareOutput(ioThreadId, index.copy(),
true);
for (T entry : pair.getRight()) {
writeEntry(entry, outputWrapper.getDataOutput());
}
long numBytes = outputWrapper.finalizeOutput();
index.removeLastIndex().removeLastIndex();
int numBuffers = pair.getRight().size();
Integer oldNumBuffersOnDisk =
numDataBuffersOnDisk.putIfAbsent(partitionId, numBuffers);
if (oldNumBuffersOnDisk != null) {
numDataBuffersOnDisk.replace(partitionId,
oldNumBuffersOnDisk + numBuffers);
}
return numBytes;
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Giraph: The process of read VerticesGraphTaskManager.execute The type of serviceWorker is BspServiceWorker. BspServiceWorker.setup() BspServiceWorker.loadVe...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.