기업 검색엔진 개발의 연결기connector(27)

39124 단어 connector
ChangeQueue 클래스는 ChangeSource 인터페이스를 구현하고 다음 Change 객체를 끌어오는 방법을 설명합니다.
 * A source of {@link Change} objects.
 *
 * @since 2.8
 */
public interface ChangeSource {
  /**
   * @return the next change, or {@code null} if there is no change available
   */
  public Change getNextChange();
}

ChangeQueue 클래스 인스턴스에서 차단 대기열private final BlockingQueue pendingChanges를 초기화하여 Change 대상 컨테이너로 저장
/**
   *        pendingChanges
   * @param size
   * @param sleepInterval
   * @param introduceDelayAfterEachScan
   * @param activityLogger
   */
  private ChangeQueue(int size, long sleepInterval, 
      boolean introduceDelayAfterEachScan, CrawlActivityLogger activityLogger) {
    pendingChanges = new ArrayBlockingQueue<Change>(size);
    this.sleepInterval = sleepInterval;
    this.activityLogger = activityLogger;
    this.introduceDelayAfterEveryScan = introduceDelayAfterEachScan;
  }

introduceDelayAfterEveryScan 매개 변수가 데이터 교체가 끝났을 때 지연 여부를 설정합니다
위에서 언급한 내부 클래스 CallBack에서 제출한 데이터를 차단 대기열 BlockingQueue pendingChanges에 추가합니다
ChangeQueue에서 ChangeSource 인터페이스를 실현하는 방법에서 차단된 대기열에서 Change 대상을 가져오는 것을 실현한다
/**
   *       pendingChanges  
   * Gets the next available change from the ChangeQueue.  Will wait up to
   * 1/4 second for a change to appear if none is immediately available.
   *
   * @return the next available change, or {@code null} if no changes are
   *         available
   */
  public Change getNextChange() {
    try {
      return pendingChanges.poll(250L, TimeUnit.MILLISECONDS);
    } catch (InterruptedException ie) {
      return null;
    }
  }

ChangeQueue 대상은 Change 대상을 저장하는 버퍼 용기로서 위에서 분석한 바와 같이 Change 대상은 모니터 대상을 시작하는 DocumentSnapshot Repository Monitor의 스레드 방법으로 추가되었다
그러면 변경 Queue 대상을 호출하는 getNextChange () 방법으로 변경 대상 데이터를 꺼낼 수 있는 대상은 무엇입니까?
CheckpointAndChangeQueue 클래스를 추적하는loadUpFromChangeSource 방법으로 getNextChange () 방법을 호출했습니다. 이 방법에서 획득한 Chnage 대상을 CheckpointAndChange 유형의 대상으로 포장한 후 구성원 속성List checkpointAndChangeList에 추가합니다.
우선 관련 구성원의 속성과 구조 함수를 익히도록 하겠습니다.
 private final AtomicInteger maximumQueueSize =
      new AtomicInteger(DEFAULT_MAXIMUM_QUEUE_SIZE);
  private final List<CheckpointAndChange> checkpointAndChangeList;
  private final ChangeSource changeSource;
  private final DocumentHandleFactory internalDocumentHandleFactory;
  private final DocumentHandleFactory clientDocumentHandleFactory;

  private volatile DiffingConnectorCheckpoint lastCheckpoint;
  private final File persistDir;  // place to persist enqueued values
  private MonitorRestartState monitorPoints = new MonitorRestartState();

public CheckpointAndChangeQueue(ChangeSource changeSource, File persistDir,
      DocumentHandleFactory internalDocumentHandleFactory,
      DocumentHandleFactory clientDocumentHandleFactory) {
    this.changeSource = changeSource;
    this.checkpointAndChangeList
        = Collections.synchronizedList(
            new ArrayList<CheckpointAndChange>(maximumQueueSize.get()));
    this.persistDir = persistDir;
    this.internalDocumentHandleFactory = internalDocumentHandleFactory;
    this.clientDocumentHandleFactory = clientDocumentHandleFactory;
    ensurePersistDirExists();
  }

ChangeSource 유형 초기화 대상changeSource(즉 ChangeQueue 유형 대상)와 List 용기List checkpointAndChangeList 포함
loadUpFromChangeSource 방법을 다시 한 번 살펴보겠습니다.
 /**
   *  ChangeSource  Change,  checkpointAndChangeList
   */
  private void loadUpFromChangeSource() {
    int max = maximumQueueSize.get();
    if (checkpointAndChangeList.size() < max) {
      lastCheckpoint = lastCheckpoint.nextMajor();
    }   
    while (checkpointAndChangeList.size() < max) {
      Change newChange = changeSource.getNextChange();
      if (newChange == null) {
        break;
      }
      lastCheckpoint = lastCheckpoint.next();
      checkpointAndChangeList.add(new CheckpointAndChange(
          lastCheckpoint, newChange));      
    }
  }

방법의 주요 동작은changeSource 대상에서 Change 대상을 꺼내서 CheckPointAndChange 대상으로 포장하여 용기List checkpointAndChangeList에 추가하는 것입니다
리소스 방법에서loadUpFromChangeSource 방법이 호출되었습니다. (리소스 방법은DiffingConnectorDocumentList 클래스의 구조 함수에서 호출됩니다.)
/**
   *   List<CheckpointAndChange>  
   * Returns an {@link Iterator} for currently available
   * {@link CheckpointAndChange} objects that occur after the passed in
   * checkpoint. The {@link String} form of a {@link DiffingConnectorCheckpoint}
   * passed in is produced by calling
   * {@link DiffingConnectorCheckpoint#toString()}. As a side effect, Objects
   * up to and including the object with the passed in checkpoint are removed
   * from this queue.
   *
   * @param checkpointString null means return all {@link CheckpointAndChange}
   *        objects and a non null value means to return
   *        {@link CheckpointAndChange} objects with checkpoints after the
   *        passed in value.
   * @throws IOException if error occurs while manipulating recovery state
   */
  synchronized List<CheckpointAndChange> resume(String checkpointString)
      throws IOException {
      //       
    removeCompletedChanges(checkpointString);
    // ChangeSource  Change,  checkpointAndChangeList
    loadUpFromChangeSource();
    //  monitorPoints
    monitorPoints.updateOnGuaranteed(checkpointAndChangeList);
    try {
        //   checkpointAndChangeList     
        //  resume      
      writeRecoveryState();
    } finally {
      // TODO: Enahnce with mechanism that remembers
      // information about recovery files to avoid re-reading.
        //          (       )
      removeExcessRecoveryState();
    }
    return getList();
  }

List checkpointAndChangeList 용기를 채운 후, 그 데이터를 json 형식으로 대기열 파일에 영구화합니다
/** 
   *    json  
   * @throws IOException
   */
  private void writeRecoveryState() throws IOException {
    // TODO(pjo): Move this method into RecoveryFile.
    File recoveryFile = new RecoveryFile(persistDir);
    FileOutputStream outStream = new FileOutputStream(recoveryFile);
    Writer writer = new OutputStreamWriter(outStream, Charsets.UTF_8);
    try {
      try {
        writeJson(writer);
      } catch (JSONException e) {
        throw IOExceptionHelper.newIOException("Failed writing recovery file.", e);
      }
      writer.flush();
      outStream.getFD().sync();
    } finally {
      writer.close();
    }
  }

대기열 파일 이름에는 현재 시스템 시간이 포함되어 있으며 파일이 생성되는 시기를 비교할 수 있습니다.
/** 
   *             
   * A File that has some of the recovery logic. 
   *  Original recovery files' names contained a single nanosecond timestamp,
   *  eg.  recovery.10220010065599398 .  These turned out to be flawed
   *  because nanosecond times can go "back in time" between JVM restarts.
   *  Updated recovery files' names contain a wall clock millis timestamp 
   *  followed by an underscore followed by a nanotimestamp, eg.
   *  recovery.702522216012_10220010065599398 .
   */
  static class RecoveryFile extends File {
    final static long NO_TIME_AVAIL = -1;
    long milliTimestamp = NO_TIME_AVAIL;
    long nanoTimestamp;

    long parseTime(String s) throws IOException {
      try {
        return Long.parseLong(s);
      } catch(NumberFormatException e) {
        throw new LoggingIoException("Invalid recovery filename: "
            + getAbsolutePath());
      }
    }
    
    /**
     *             
     * @throws IOException
     */
    void parseOutTimes() throws IOException {
      try {
        String basename = getName();
        if (!basename.startsWith(RECOVERY_FILE_PREFIX)) {
          throw new LoggingIoException("Invalid recovery filename: "
              + getAbsolutePath());
        } else {
          String extension = basename.substring(RECOVERY_FILE_PREFIX.length());
          if (!extension.contains("_")) {  // Original name format.
            nanoTimestamp = parseTime(extension);
          } else {  // Updated name format.
            String timeParts[] = extension.split("_");
            if (2 != timeParts.length) {
              throw new LoggingIoException("Invalid recovery filename: "
                  + getAbsolutePath());
            }
            milliTimestamp = parseTime(timeParts[0]);
            nanoTimestamp = parseTime(timeParts[1]);
          }
        }
      } catch(IndexOutOfBoundsException e) {
        throw new LoggingIoException("Invalid recovery filename: "
            + getAbsolutePath());
      }
    }

    RecoveryFile(File persistanceDir) throws IOException {
      super(persistanceDir, RECOVERY_FILE_PREFIX + System.currentTimeMillis()
          + "_" + System.nanoTime());
      parseOutTimes();
    }
    
    /**
     *                 
     * @param absolutePath
     * @throws IOException
     */
    RecoveryFile(String absolutePath) throws IOException {
      super(absolutePath);
      parseOutTimes();
    }

    boolean isOlder(RecoveryFile other) {
      boolean weHaveMillis = milliTimestamp != NO_TIME_AVAIL;
      boolean otherHasMillis = other.milliTimestamp != NO_TIME_AVAIL;
      boolean bothHaveMillis = weHaveMillis && otherHasMillis;
      boolean neitherHasMillis = (!weHaveMillis) && (!otherHasMillis);
      if (bothHaveMillis) {
        if (this.milliTimestamp < other.milliTimestamp) {
          return true;
        } else if (this.milliTimestamp > other.milliTimestamp) {
          return false;
        } else {
          return this.nanoTimestamp < other.nanoTimestamp;
        }
      } else if (neitherHasMillis) {
        return this.nanoTimestamp < other.nanoTimestamp;
      } else if (weHaveMillis) {  // and other doesn't; we are newer.
        return false;
      } else {  // other has millis; other is newer.
        return true;
      }
    }
    
    /** A delete method that logs failures. */
    /**
     *     
     */
    public void logOnFailDelete() {
      boolean deleted = super.delete();
      if (!deleted) {
        LOG.severe("Failed to delete: " + getAbsolutePath());
      }
    }
    // TODO(pjo): Move more recovery logic into this class.
  }

다음은 그 시작 방법(start 방법)에서 무엇을 했는지 살펴보겠습니다.
 /**
   * Initialize to start processing from after the passed in checkpoint
   * or from the beginning if the passed in checkpoint is null.  Part of
   * making DocumentSnapshotRepositoryMonitorManager go from "cold" to "warm".
   */
  public synchronized void start(String checkpointString) throws IOException {
    LOG.info("Starting CheckpointAndChangeQueue from " + checkpointString);
    //      
    ensurePersistDirExists();
    checkpointAndChangeList.clear();
    lastCheckpoint = constructLastCheckpoint(checkpointString);
    if (null == checkpointString) {
        //      
      removeAllRecoveryState();
    } else {
      RecoveryFile current = removeExcessRecoveryState();
      //  monitorPoints checkpointAndChangeList  
      loadUpFromRecoveryState(current);
      //this.monitorPoints.points.entrySet();
      
    }
  }

원래 저장된 대기열 파일에서 CheckPointAndChange 대상 목록을List checkpointAndChangeList 용기에 불러올 수 있습니다. (또한MonitorCheckoint 대상도 포함됩니다)
/**
   *     
   * @param file
   * @throws IOException
   */
  private void loadUpFromRecoveryState(RecoveryFile file) throws IOException {
    // TODO(pjo): Move this method into RecoveryFile.
    new LoadingQueueReader().readJson(file);
  }

CheckpointAndChangeQueue 클래스에서 내부 클래스를 정의합니다. 즉, json 형식 파일에서 CheckPointAndChange 대상 목록을List checkpointAndChange List 용기에 불러오는 데 사용됩니다.
추상 대기열 읽기 추상 클래스 AbstractQueueReader
/**
   *  json         
   * Reads JSON recovery files. Uses the Template Method pattern to
   * delegate what to do with the parsed objects to subclasses.
   *
   * Note: This class uses gson for streaming support.
   */
  private abstract class AbstractQueueReader {
    public void readJson(File file) throws IOException {
      readJson(new BufferedReader(new InputStreamReader(
                  new FileInputStream(file), Charsets.UTF_8)));
    }

    /**
     * Reads and parses the stream, calling the abstract methods to
     * take whatever action is required. The given stream will be
     * closed automatically.
     *
     * @param reader the stream to parse
     */
    @VisibleForTesting
    void readJson(Reader reader) throws IOException {
      JsonReader jsonReader = new JsonReader(reader);
      try {
        readJson(jsonReader);
      } finally {
        jsonReader.close();
      }
    }

    /**
     * Reads and parses the stream, calling the abstract methods to
     * take whatever action is required.
     */
    private void readJson(JsonReader reader) throws IOException {
      JsonParser parser = new JsonParser();
      reader.beginObject();
      while (reader.hasNext()) {
        String name = reader.nextName();
        if (name.equals(MONITOR_STATE_JSON_TAG)) {
          readMonitorPoints(parser.parse(reader));
        } else if (name.equals(QUEUE_JSON_TAG)) {
          reader.beginArray();
          while (reader.hasNext()) {
            readCheckpointAndChange(parser.parse(reader));
          }
          reader.endArray();
        } else {
          throw new IOException("Read invalid recovery file.");
        }
      }
      reader.endObject();

      reader.setLenient(true);
      String name = reader.nextString();
      if (!name.equals(SENTINAL)) {
        throw new IOException("Read invalid recovery file.");
      }
    }

    protected abstract void readMonitorPoints(JsonElement gson)
        throws IOException;

    protected abstract void readCheckpointAndChange(JsonElement gson)
        throws IOException;
  }

추상적인 방법은 자류에 의해 실현된다
/**
   *           
   * Verifies that a JSON recovery file is valid JSON with a
   * trailing sentinel.
   */
  private class ValidatingQueueReader extends AbstractQueueReader {
    protected void readMonitorPoints(JsonElement gson) throws IOException {
    }

    protected void readCheckpointAndChange(JsonElement gson)
        throws IOException {
    }
  }
   
  /**
   *  json         
   */
  /** Loads the queue from a JSON recovery file. */
  /*
   * TODO(jlacey): Change everything downstream to gson. For now, we
   * reserialize the individual gson objects and deserialize them
   * using org.json.
   */
  @VisibleForTesting
  class LoadingQueueReader extends AbstractQueueReader {
    /**
     *   MonitorRestartState checkpoint(HashMap<String, MonitorCheckpoint> points)
     */
    protected void readMonitorPoints(JsonElement gson) throws IOException {
      try {
        JSONObject json = gsonToJson(gson);
        monitorPoints = new MonitorRestartState(json);
        //monitorPoints.updateOnGuaranteed(checkpointAndChangeList)
      } catch (JSONException e) {
        throw IOExceptionHelper.newIOException(
            "Failed reading persisted JSON queue.", e);
      }
    }
    
    /**
     *   checkpointAndChangeList
     */
    protected void readCheckpointAndChange(JsonElement gson)
        throws IOException {
      try {
        JSONObject json = gsonToJson(gson);
        checkpointAndChangeList.add(new CheckpointAndChange(json,
            internalDocumentHandleFactory, clientDocumentHandleFactory));
      } catch (JSONException e) {
        throw IOExceptionHelper.newIOException(
            "Failed reading persisted JSON queue.", e);
      }
    }

    // TODO(jlacey): This could be much more efficient, especially
    // with LOBs, if we directly transformed the objects with a little
    // recursive parser. This code is only used when recovering failed
    // batches, so I don't know if that's worth the effort.
    private JSONObject gsonToJson(JsonElement gson) throws JSONException {
      return new JSONObject(gson.toString());
    }
  }

---------------------------------------------------------------------------
본 시리즈 기업 검색엔진 개발의 연결기connector계 본인 오리지널
전재 는 출처 가 블로그 정원 고슴도치 의 온순함 을 밝혀 주십시오
본인 이메일:chenying998179@163#com
본문 링크http://www.cnblogs.com/chenying99/p/3789560.html

좋은 웹페이지 즐겨찾기