HDFS에서 두 가지 randomread 비교

11799 단어 hadoophdfsrandom read
code version: hadoop-0.19.1
 
먼저 프리드.pread는 읽을 크기를 데이터 노드에 명확하게 전달합니다 (new Block Reader에서)
 
 /**
     * Read bytes starting from the specified position.
     * 
     * @param position start read from this position
     * @param buffer read buffer
     * @param offset offset into buffer
     * @param length number of bytes to read
     * 
     * @return actual number of bytes read
     */
    @Override
    public int read(long position, byte[] buffer, int offset, int length)
      throws IOException {
      // sanity checks
      checkOpen();
      if (closed) {
        throw new IOException("Stream closed");
      }
      long filelen = getFileLength();
      if ((position < 0) || (position >= filelen)) {
        return -1;
      }
      int realLen = length;
      if ((position + length) > filelen) {
        realLen = (int)(filelen - position);
      }
      
      // determine the block and byte range within the block
      // corresponding to position and realLen
      List<LocatedBlock> blockRange = getBlockRange(position, realLen);
      int remaining = realLen;
      for (LocatedBlock blk : blockRange) {
        long targetStart = position - blk.getStartOffset();
        long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
        fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, buffer, offset);
        remaining -= bytesToRead;
        position += bytesToRead;
        offset += bytesToRead;
      }
      assert remaining == 0 : "Wrong number of bytes read.";
      if (stats != null) {
        stats.incrementBytesRead(realLen);
      }
      return realLen;
    }
     

 
private void fetchBlockByteRange(LocatedBlock block, long start,
                                     long end, byte[] buf, int offset) throws IOException {
      //
      // Connect to best DataNode for desired Block, with potential offset
      //
      Socket dn = null;
      int numAttempts = block.getLocations().length;
      IOException ioe = null;
      
      while (dn == null && numAttempts-- > 0 ) {
	long prepareRealReadStart = System.currentTimeMillis();
        DNAddrPair retval = chooseDataNode(block);
        DatanodeInfo chosenNode = retval.info;
        InetSocketAddress targetAddr = retval.addr;
        BlockReader reader = null;
            
        try {
          dn = socketFactory.createSocket();
          NetUtils.connect(dn, targetAddr, socketTimeout);
          dn.setSoTimeout(socketTimeout);
              
          int len = (int) (end - start + 1);
              
          
          reader = BlockReader.newBlockReader(dn, src, 
                                              block.getBlock().getBlockId(),
                                              block.getBlock().getGenerationStamp(),
                                              start, len, buffersize, 
                                              verifyChecksum, clientName);
          int nread = reader.readAll(buf, offset, len);
          if (nread != len) {
            throw new IOException("truncated return from reader.read(): " +
                                  "excpected " + len + ", got " + nread);
          }
          return;
        } catch (ChecksumException e) {
          ioe = e;
          LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
                   src + " at " + block.getBlock() + ":" + 
                   e.getPos() + " from " + chosenNode.getName());
          reportChecksumFailure(src, block.getBlock(), chosenNode);
        } catch (IOException e) {
          ioe = e;
          LOG.warn("Failed to connect to " + targetAddr + 
                   " for file " + src + 
                   " for block " + block.getBlock().getBlockId() + ":"  +
                   StringUtils.stringifyException(e));
        } finally {
          IOUtils.closeStream(reader);
          IOUtils.closeSocket(dn);
          dn = null;
        }
        // Put chosen node into dead list, continue
        addToDeadNodes(chosenNode);
      }
      throw (ioe == null) ? new IOException("Could not read data") : ioe;
    }

 
pread 절차:
읽을 데이터의 오프셋과readLen에 따라 읽을 BlockRange, 즉 어떤 Block이 읽을 범위 내에 있는지 계산해 낸다.구체적인 get Block Range () 함수에서 읽을 Blocks가 유지보수하는located Blocks라는 Block cache에 있는지 판단해야 합니다. 그렇지 않으면namenode에 문의한 다음cache에 넣으십시오.
그리고 획득한 Block Range에서 모든 Block이 데이터를 읽고 데이터 노드를 선택하여 연결을 만들고 모든 Block Reader를 다시 생성합니다.
 
그리고 seek+read를 보면 read는 현재 위치에서 Block 끝까지의 길이를 데이터 노드에 전송합니다. (또한 new Block Reader에 있을 때입니다.) 이렇게 하면 데이터 노드는read ahead를 할 수 있습니다. 그리고 TCPWINDOW의 버퍼 작용(hadoop code 안은 128K)으로 연속 읽기 성능을 가속화할 수 있다.
 
    /**
     * Seek to a new arbitrary location
     */
    @Override
    public synchronized void seek(long targetPos) throws IOException {
      if (targetPos > getFileLength()) {
        throw new IOException("Cannot seek after EOF");
      }
      boolean done = false;
      if (pos <= targetPos && targetPos <= blockEnd) {
        //
        // If this seek is to a positive position in the current
        // block, and this piece of data might already be lying in
        // the TCP buffer, then just eat up the intervening data.
        //
        int diff = (int)(targetPos - pos);
        if (diff <= TCP_WINDOW_SIZE) {
          try {
            pos += blockReader.skip(diff);
            if (pos == targetPos) {
              done = true;
            }
          } catch (IOException e) {//make following read to retry
            LOG.debug("Exception while seek to " + targetPos + " from "
                      + currentBlock +" of " + src + " from " + currentNode + 
                      ": " + StringUtils.stringifyException(e));
          }
        }
      }
      if (!done) {
        pos = targetPos;
        blockEnd = -1;
      }
    }

이 seek은 사실 아무 일도 하지 않는다.이것은 주로pos라는 커서를 이동합니다: 현재 Block에서 정확한 위치로 이동합니다. 그렇지 않으면pos를 목표 위치로 설정하지만 BlockEnd는 -1로 설정합니다.이렇게 해서 사실 최종 seek 임무는 뒤에 있는read에서 이루어진 것이다.
read 코드 보기:
/**
     * Read the entire buffer.
     */
    @Override
    public synchronized int read(byte buf[], int off, int len) throws IOException {
      checkOpen();
      if (closed) {
        throw new IOException("Stream closed");
      }
      if (pos < getFileLength()) {
        int retries = 2;
        while (retries > 0) {
          try {
            if (pos > blockEnd) {
	      currentNode = blockSeekTo(pos);
	    }
            int realLen = Math.min(len, (int) (blockEnd - pos + 1));
	     int result = readBuffer(buf, off, realLen);
	    	                
            if (result >= 0) {
              pos += result;
            } else {
              // got a EOS from reader though we expect more data on it.
              throw new IOException("Unexpected EOS from the reader");
            }
            if (stats != null && result != -1) {
              stats.incrementBytesRead(result);
            }
            return result;
          } catch (ChecksumException ce) {
            throw ce;            
          } catch (IOException e) {
            if (retries == 1) {
              LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
            }
            blockEnd = -1;
            if (currentNode != null) { addToDeadNodes(currentNode); }
            if (--retries == 0) {
              throw e;
            }
          }
        }
      }
      return -1;
    }

만약 위 seek에서 seek의 위치가 같은 Block에 있어야 한다면 지금 바로 읽기만 하면 됩니다.없으면 아까 BlockEnd가 -1로 설정될 줄 알았는데, 이제 진정한 seek 조작을 해야 한다. 함수 BlockSeekTo()가 이 기능을 실현한다.
BlockSeekTo()가 어떤 일을 했는지 보기:
/**
     * Open a DataInputStream to a DataNode so that it can be read from.
     * We get block ID and the IDs of the destinations at startup, from the namenode.
     */
    private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
      if (target >= getFileLength()) {
        throw new IOException("Attempted to read past end of file");
      }

      if ( blockReader != null ) {
        blockReader.close(); 
        blockReader = null;
      }
      
      if (s != null) {
        s.close();
        s = null;
      }

      //
      // Compute desired block
      //
      LocatedBlock targetBlock = getBlockAt(target);
      assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
      long offsetIntoBlock = target - targetBlock.getStartOffset();

      //
      // Connect to best DataNode for desired Block, with potential offset
      //
      DatanodeInfo chosenNode = null;
      while (s == null) {
        DNAddrPair retval = chooseDataNode(targetBlock);
        chosenNode = retval.info;
        InetSocketAddress targetAddr = retval.addr;
        try {
          s = socketFactory.createSocket();
          NetUtils.connect(s, targetAddr, socketTimeout);
          s.setSoTimeout(socketTimeout);
          Block blk = targetBlock.getBlock();
          
          blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
              blk.getGenerationStamp(),
              offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
              buffersize, verifyChecksum, clientName);
          return chosenNode;
        } catch (IOException ex) {
          // Put chosen node into dead list, continue
          LOG.debug("Failed to connect to " + targetAddr + ":" 
                    + StringUtils.stringifyException(ex));
          addToDeadNodes(chosenNode);
          if (s != null) {
            try {
              s.close();
            } catch (IOException iex) {
            }                        
          }
          s = null;
        }
      }
      return chosenNode;
    }

이 함수가 위의fetch Block Byte Range와 같은 장작을 발견하면 이전의 Block Reader close를 제거한 다음에 목적의 데이터 노드에 새로운 연결을 만듭니다. 주로 다른 곳은 어디에 있습니까? get Block Range와 get BlockAt의 차이점에 있습니다. 위의 pread 모드는 두 번째 인자인read Len을 제공했습니다. 목표 Blocks를 찾을 때 이 범위 내의 것을 찾으면 됩니다.한편, seek+read 이런 모델은 seek을 가정할 때 뒤에 읽을 길이를 몰라서 부족한 prefetchSize를 사용했고 부족한 것은 10개의 Block size 크기였다.
 
두 가지read를 종합적으로 고려하면randomread의 실현으로서 첫 번째pread는 언제든지 연결을 다시 만들어야 한다. 두 번째는 읽을 데이터가 현재 Block에 있을 때 지난번 연결을 다시 사용할 수 있기 때문에 이론적으로 두 번째 효율이 높아야 한다.

좋은 웹페이지 즐겨찾기