hbase 0.96 put 프로 세 스 소스 코드 분석

29776 단어
본의 아니 게 hbase 0.98 코드 를 한 번 더 쳐 다 보 았 습 니 다. put 프로 세 스 를 복습 하려 고 했 는데 htable 에서 processBatchOfPuts () 를 찾 을 수 없 었 습 니 다. 이상 합 니 다.한참 동안 보 니 변화 가 정말 컸 다. 사실은 0.96 에 이것 이 없 었 다. 그래서 0.96 코드 를 만들어 보 았 다.
이전 에는 차 이 를 비교 할 수 있 는 편 이 있 습 니 다. 다음으로 이동 하 십시오.http://blog.csdn.net/luyee2010/article/details/8435739 근 데 조판 이 너무 어 지 러 우 니까 그냥 봐!HTable.java
  public void put(final Put put)
      throws InterruptedIOException, RetriesExhaustedWithDetailsException {
    doPut(put);
    if (autoFlush) {
      flushCommits();
    }
  }
//  
  @Override
  public void put(final List<Put> puts)
      throws InterruptedIOException, RetriesExhaustedWithDetailsException {
    for (Put put : puts) {
      doPut(put);
    }
    if (autoFlush) {
      flushCommits();
    }
  }

여기 writeAsyncBuffer 는 원래 의 writeBuffer 를 교 체 했 습 니 다. 사실은 이름 만 다 를 뿐 입 니 다.
  private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
    if (ap.hasError()){
      backgroundFlushCommits(true);
    }

    validatePut(put);

    currentWriteBufferSize += put.heapSize();
    writeAsyncBuffer.add(put);

    while (currentWriteBufferSize > writeBufferSize) {
      backgroundFlushCommits(false);
    }
  }

여 기 는 background Flush Commits 가 원래 의 flush Commits () 와 차이 가 많 지 않 지만 추적 해서 들 어가 서 홈!다 어디 고 어디 고 차이 가 좀 큽 니 다. 이전 줄 에 connection. processBatchOfputs (writeBuffer, table Name, pool) 가 있 었 습 니 다.1, 현재 writeAsyncBuffer 가 비어 있 지 않 거나 이전에 실행 되 지 않 았 거나 오류 가 없 으 면 writeAsyncBuffer 를 제출 합 니 다.
  private void backgroundFlushCommits(boolean synchronous) throws
      InterruptedIOException, RetriesExhaustedWithDetailsException {

    try {
      // If there is an error on the operations in progress, we don't add new operations.
      if (writeAsyncBuffer.size() > 0 && !ap.hasError()) {
        ap.submit(writeAsyncBuffer, true);
      }

      if (synchronous || ap.hasError()) {
        if (ap.hasError() && LOG.isDebugEnabled()) {
          LOG.debug(tableName + ": One or more of the operations have failed -" +
              " waiting for all operation in progress to finish (successfully or not)");
        }
        ap.waitUntilDone();
      }

      if (ap.hasError()) {
        if (!clearBufferOnFail) {
          // if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the
          // write buffer. This is a questionable feature kept here for backward compatibility
          writeAsyncBuffer.addAll(ap.getFailedOperations());
        }
        RetriesExhaustedWithDetailsException e = ap.getErrors();
        ap.clearErrors();
        throw e;
      }
    } finally {
      currentWriteBufferSize = 0;
      for (Row mut : writeAsyncBuffer) {
        if (mut instanceof Mutation) {
          currentWriteBufferSize += ((Mutation) mut).heapSize();
        }
      }
    }
  }

이 background Flush Committes 는 오랫동안 보 았 지만 아무것도 보지 못 해서 ap. submit (writeAsyncBuffer, true) 와 함께 할 수 밖 에 없 었 다.이거 보니까.
Map<HRegionLocation, MultiAction<Row>> actionsByServer =new HashMap<HRegionLocation, MultiAction<Row>>();

이 럴 때 는 희망 이 생 긴 것 같 아 요. 이 건 예전 과 비슷 하 겠 죠?
  public void submit(List<? extends Row> rows, boolean atLeastOne) throws InterruptedIOException {
    if (rows.isEmpty()) {
      return;
    }

    // This looks like we are keying by region but HRegionLocation has a comparator that compares
    // on the server portion only (hostname + port) so this Map collects regions by server.
    Map<HRegionLocation, MultiAction<Row>> actionsByServer =
      new HashMap<HRegionLocation, MultiAction<Row>>();
    List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());

    do {
      // Wait until there is at least one slot for a new task.
      waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);

      // Remember the previous decisions about regions or region servers we put in the
      // final multi.
      Map<String, Boolean> regionIncluded = new HashMap<String, Boolean>();
      Map<ServerName, Boolean> serverIncluded = new HashMap<ServerName, Boolean>();

      int posInList = -1;
      Iterator<? extends Row> it = rows.iterator();
      while (it.hasNext()) {
        Row r = it.next();
        HRegionLocation loc = findDestLocation(r, 1, posInList);

        if (loc != null && canTakeOperation(loc, regionIncluded, serverIncluded)) {
          // loc is null if there is an error such as meta not available.
          Action<Row> action = new Action<Row>(r, ++posInList);
          retainedActions.add(action);
          addAction(loc, action, actionsByServer);
          it.remove();
        }
      }

    } while (retainedActions.isEmpty() && atLeastOne && !hasError());

    HConnectionManager.ServerErrorTracker errorsByServer = createServerErrorTracker();
    sendMultiAction(retainedActions, actionsByServer, 1, errorsByServer);
  }

위치 추적 row loc 찾기 (HRegionLocation)
HRegionLocation loc = findDestLocation(r, 1, posInList);

region 취 합 action: addAction (loc, actions, actionsByServer);
//Group the actions per region server
private void addAction(HRegionLocation loc, Action<Row> action, Map<HRegionLocation,
  MultiAction<Row>> actionsByServer) {
final byte[] regionName = loc.getRegionInfo().getRegionName();
MultiAction<Row> multiAction = actionsByServer.get(loc);
if (multiAction == null) {
  multiAction = new MultiAction<Row>();
  actionsByServer.put(loc, multiAction);
}

multiAction.add(regionName, action);
}

그다음에 sendMultiAction ().
public void sendMultiAction(final List<Action<Row>> initialActions,
                              Map<HRegionLocation, MultiAction<Row>> actionsByServer,
                              final int numAttempt,
                              final HConnectionManager.ServerErrorTracker errorsByServer) {
    // Send the queries and add them to the inProgress list
    // This iteration is by server (the HRegionLocation comparator is by server portion only).
    for (Map.Entry<HRegionLocation, MultiAction<Row>> e : actionsByServer.entrySet()) {
      final HRegionLocation loc = e.getKey();
      final MultiAction<Row> multiAction = e.getValue();
      incTaskCounters(multiAction.getRegions(), loc.getServerName());
      Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() {
        @Override
        public void run() {
          MultiResponse res;
          try {
            MultiServerCallable<Row> callable = createCallable(loc, multiAction);
            try {
              res = createCaller(callable).callWithoutRetries(callable);
            } catch (IOException e) {
              LOG.warn("Call to " + loc.getServerName() + " failed numAttempt=" + numAttempt +
                ", resubmitting all since not sure where we are at", e);
              resubmitAll(initialActions, multiAction, loc, numAttempt + 1, e, errorsByServer);
              return;
            }

            receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer);
          } finally {
            decTaskCounters(multiAction.getRegions(), loc.getServerName());
          }
        }
      });

      try {
        this.pool.submit(runnable);
      } catch (RejectedExecutionException ree) {
        // This should never happen. But as the pool is provided by the end user, let's secure
        // this a little.
        decTaskCounters(multiAction.getRegions(), loc.getServerName());
        LOG.warn("The task was rejected by the pool. This is unexpected." +
            " Server is " + loc.getServerName(), ree);
        // We're likely to fail again, but this will increment the attempt counter, so it will
        // finish.
        resubmitAll(initialActions, multiAction, loc, numAttempt + 1, ree, errorsByServer);
      }
    }
  }

잘 모 르 는 모습 1, resubmitAll 2, receiveMultiAction 3, createCaller
뒤에 callable 의 call 방법 에서 responseProto = getStub (). multi (controller, requestProto) 를 볼 때 까지;HRegionServer. mlti () 가 아 닙 니 다. callable 을 먼저 보 세 요. 다른 것 은 천천히 보 세 요. 이 뒤에 this. pool. submit (runnable) 이 실 행 된 callable 생 성 을 제출 합 니 다.
  protected MultiServerCallable<Row> createCallable(final HRegionLocation location,
      final MultiAction<Row> multi) {
    return new MultiServerCallable<Row>(hConnection, tableName, location, multi);
  }

call () 방법
  public MultiResponse call() throws IOException {
    int countOfActions = this.multiAction.size();
    if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
    MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();
    List<CellScannable> cells = null;
    // The multi object is a list of Actions by region. Iterate by region.
    for (Map.Entry<byte[], List<Action<R>>> e: this.multiAction.actions.entrySet()) {
      final byte [] regionName = e.getKey();
      final List<Action<R>> actions = e.getValue();
      RegionAction.Builder regionActionBuilder;
      if (this.cellBlock) {
        // Presize. Presume at least a KV per Action. There are likely more.
        if (cells == null) cells = new ArrayList<CellScannable>(countOfActions);
        // Send data in cellblocks. The call to buildNoDataMultiRequest will skip RowMutations.
        // They have already been handled above. Guess at count of cells
        regionActionBuilder = RequestConverter.buildNoDataRegionAction(regionName, actions, cells);
      } else {
        regionActionBuilder = RequestConverter.buildRegionAction(regionName, actions);
      }
      multiRequestBuilder.addRegionAction(regionActionBuilder.build());
    }
    // Controller optionally carries cell data over the proxy/service boundary and also
    // optionally ferries cell response data back out again.
    PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cells);
    controller.setPriority(getTableName());
    ClientProtos.MultiResponse responseProto;
    ClientProtos.MultiRequest requestProto = multiRequestBuilder.build();
    try {
      responseProto = getStub().multi(controller, requestProto);
    } catch (ServiceException e) {
      return createAllFailedResponse(requestProto, ProtobufUtil.getRemoteException(e));
    }
    return ResponseConverter.getResults(requestProto, responseProto, controller.cellScanner());
  }

여 기 는 주로 HRegionServer. multi () 를 봅 니 다.
  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
  throws ServiceException {
    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
    // It is also the conduit via which we pass back data.
    PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
    CellScanner cellScanner = controller != null? controller.cellScanner(): null;
    if (controller != null) controller.setCellScanner(null);
    List<CellScannable> cellsToReturn = null;
     MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();

     for (RegionAction regionAction : request.getRegionActionList()) {
       this.requestCount.add(regionAction.getActionCount());
       RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
       HRegion region;
       try {
         region = getRegion(regionAction.getRegion());
       } catch (IOException e) {
         regionActionResultBuilder.setException(ResponseConverter.buildException(e));
         responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
         continue;  // For this region it's a failure.
       }

       if (regionAction.hasAtomic() && regionAction.getAtomic()) {
         // How does this call happen? It may need some work to play well w/ the surroundings.
         // Need to return an item per Action along w/ Action index. TODO.
         try {
           mutateRows(region, regionAction.getActionList(), cellScanner);
         } catch (IOException e) {
           // As it's atomic, we may expect it's a global failure.
           regionActionResultBuilder.setException(ResponseConverter.buildException(e));
         }
       } else {
         // doNonAtomicRegionMutation manages the exception internally
         cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
             regionActionResultBuilder, cellsToReturn);
       }
       responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
     }
     // Load the controller with the Cells to return.
     if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
       controller.setCellScanner(CellUtil.createCellScanner(cellsToReturn));
     }
     return responseBuilder.build();
   }

주요 코드, 기타 모두 build PB
 region = getRegion(regionAction.getRegion());
 mutateRows(region, regionAction.getActionList(), cellScanner);
  cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
             regionActionResultBuilder, cellsToReturn);

그 중에서 도 mutateRows () 는 PUT 와 DELETE 관련 이 있 습 니 다.
  protected void mutateRows(final HRegion region, final List<ClientProtos.Action> actions,
      final CellScanner cellScanner)
  throws IOException {
    if (!region.getRegionInfo().isMetaTable()) {
      cacheFlusher.reclaimMemStoreMemory();
    }
    RowMutations rm = null;
    for (ClientProtos.Action action: actions) {
      if (action.hasGet()) {
        throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
          action.getGet());
      }
      MutationType type = action.getMutation().getMutateType();
      if (rm == null) {
        rm = new RowMutations(action.getMutation().getRow().toByteArray());
      }
      switch (type) {
      case PUT:
        rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
        break;
      case DELETE:
        rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
        break;
        default:
          throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
      }
    }
    region.mutateRow(rm);
  }

좋은 웹페이지 즐겨찾기