기업 검색엔진 개발의 연결기connector(20)

30052 단어 connector
연결기 안에서 데이터 원본과 데이터 전송 대상을 연결하는 것은Query Traverser 클래스의 대상이고 이 클래스는Traverser 인터페이스를 실현했다.
/**
 * Interface presented by a Traverser.  Used by the Scheduler.
 */
public interface Traverser {

  /**
   * Interval to wait after a transient error before retrying a traversal.
   */
  public static final int ERROR_WAIT_MILLIS = 15 * 60 * 1000;

  /**
   * Runs a batch of documents. The Traversal method may be hard (impossible?)
   * to interrupt while it is executing runBatch(). It is expected that a
   * thread loop running a traversal method would call runBatch(), then check
   * for InterruptedException, then decide whether it wants to stop of itself,
   * for scheduling reasons, or for a clean shutdown. It could then re-adjust
   * the batch hint if desired, then repeat.
   *
   * @param  batchSize A {@link BatchSize} instructs the traversal method to
   *         process approximately {@code batchSize.getHint()}, but no more
   *         than {@code batchSize.getMaximum()} number of documents in this
   *         batch.
   * @return A {@link BatchResult} containing the actual number of documents
   *         from this batch given to the feed and a possible policy to delay
   *         before requesting another batch.
   */
  public BatchResult runBatch(BatchSize batchSize);

  /**
   * Cancel the Batch in progress.  Discard the batch.  This might be called
   * when the workItem times out, connector deletion or reconfiguration, or
   * during shutdown.
   */
  public void cancelBatch();
}

즉, 위의 BatchResult runBatch(BatchSize batchSize) 방법으로, 매개 변수BatchSize batchSize는 일괄 크기를 나타냅니다.
Query Traverser 클래스 대상은 Traversal Manager query Traversal Manager 대상의 실례를 인용하여 데이터 원본 데이터를 얻고, Pusher Factory 대상의 실례를 인용하여 실례화된 docPuser 대상의 실례를 인용하여 문서 대상 데이터를 보내며, 구성원 변수인 Traversal State Store state Store는 상태와 저장 상태를 얻기(단점 발송에 사용)
 @Override
  public BatchResult runBatch(BatchSize batchSize) {
      // 
    final long startTime = clock.getTimeMillis();
    // 
    final long timeoutTime = startTime
      + traversalContext.traversalTimeLimitSeconds() * 1000;
    // 
    if (isCancelled()) {
        LOGGER.warning("Attempting to run a cancelled QueryTraverser");
      return new BatchResult(TraversalDelayPolicy.ERROR);
    }
    try {
      // 
      queryTraversalManager.setBatchHint(batchSize.getHint());
    } catch (RepositoryException e) {
      LOGGER.log(Level.WARNING, "Unable to set batch hint", e);
    }

    String connectorState;
    try {
      if (stateStore != null) {
        // 
        connectorState = stateStore.getTraversalState();
      } else {
        throw new IllegalStateException("null TraversalStateStore");
      }
    } catch (IllegalStateException ise) {
      // We get here if the store for the connector is disabled.
      // That happens if the connector was deleted while we were asleep.
      // Our connector seems to have been deleted.  Don't process a batch.
      LOGGER.fine("Halting traversal for connector " + connectorName
                  + ": " + ise.getMessage());
      return new BatchResult(TraversalDelayPolicy.ERROR);
    }

    DocumentList resultSet = null;
    if (connectorState == null) {
      try {
        LOGGER.fine("START TRAVERSAL: Starting traversal for connector "
                    + connectorName);
        resultSet = queryTraversalManager.startTraversal();
      } catch (Exception e) {
        LOGGER.log(Level.WARNING, "startTraversal threw exception: ", e);
        return new BatchResult(TraversalDelayPolicy.ERROR);
      }
    } else {
      try {
        LOGGER.fine("RESUME TRAVERSAL: Resuming traversal for connector "
            + connectorName + " from checkpoint " + connectorState);
        resultSet = queryTraversalManager.resumeTraversal(connectorState);
      } catch (Exception e) {
        LOGGER.log(Level.WARNING, "resumeTraversal threw exception: ", e);
        return new BatchResult(TraversalDelayPolicy.ERROR);
      }
    }

    // If the traversal returns null, that means that the repository has
    // no new content to traverse.
    if (resultSet == null) {
      LOGGER.fine("Result set from connector " + connectorName
                  + " is NULL, no documents returned for traversal.");
      return new BatchResult(TraversalDelayPolicy.POLL, 0);
    }

    Pusher pusher = null;
    // 
    BatchResult result = null;
    int counter = 0;
    try {
        // pusher 
      // Get a Pusher for feeding the returned Documents.
      pusher = pusherFactory.newPusher(connectorName);

      while (true) {
        if (Thread.currentThread().isInterrupted() || isCancelled()) {
          LOGGER.fine("Traversal for connector " + connectorName
                      + " has been interrupted; breaking out of batch run.");
          break;
        }
        if (clock.getTimeMillis() >= timeoutTime) {
          LOGGER.fine("Traversal batch for connector " + connectorName
              + " is completing due to time limit.");
          break;
        }

        String docid = null;
        try {
          LOGGER.finer("Pulling next document from connector " + connectorName);         
          
          Document nextDocument = resultSet.nextDocument();          
          // resultSet 
          if (nextDocument == null) {
            LOGGER.finer("Traversal batch for connector " + connectorName
                + " at end after processing " + counter + " documents.");

            break;
          } else {
            //System.out.println("resultSet.getClass().getName():"+resultSet.getClass().getName());
            //System.out.println("nextDocument.getClass().getName():"+nextDocument.getClass().getName());
            // Since there are a couple of places below that could throw
            // exceptions but not exit the while loop, the counter should be
            // incremented here to insure it represents documents returned from
            // the list.  Note the call to nextDocument() could also throw a
            // RepositoryDocumentException signaling a skipped document in which
            // case the call will not be counted against the batch maximum.
            counter++;
            // Fetch DocId to use in messages.
            try {
              docid = Value.getSingleValueString(nextDocument,
                                                 SpiConstants.PROPNAME_DOCID);
            } catch (IllegalArgumentException e1) {
                LOGGER.finer("Unable to get document id for document ("
                             + nextDocument + "): " + e1.getMessage());
            } catch (RepositoryException e1) {
                LOGGER.finer("Unable to get document id for document ("
                             + nextDocument + "): " + e1.getMessage());
            }
          }
          LOGGER.finer("Sending document (" + docid + ") from connector "
              + connectorName + " to Pusher");
          // document
          if (pusher.take(nextDocument) != PusherStatus.OK) {
            LOGGER.fine("Traversal batch for connector " + connectorName
                + " is completing at the request of the Pusher,"
                + " after processing " + counter + " documents.");
            break;
          }
        } catch (SkippedDocumentException e) {
          /* TODO (bmj): This is a temporary solution and should be replaced.
           * It uses Exceptions for non-exceptional cases.
           */
          // Skip this document.  Proceed on to the next one.
          logSkippedDocument(docid, e);
        } catch (RepositoryDocumentException e) {
          // Skip individual documents that fail.  Proceed on to the next one.
          logSkippedDocument(docid, e);
        } catch (RuntimeException e) {
          // Skip individual documents that fail.  Proceed on to the next one.
          logSkippedDocument(docid, e);
        }
      }
      // No more documents. Wrap up any accumulated feed data and send it off.
      if (!isCancelled()) {
        pusher.flush();
      }
    } catch (OutOfMemoryError e) {
      pusher.cancel();
      System.runFinalization();
      System.gc();
      result = new BatchResult(TraversalDelayPolicy.ERROR);
      try {
        LOGGER.severe("Out of JVM Heap Space.  Will retry later.");
        LOGGER.log(Level.FINEST, e.getMessage(), e);
      } catch (Throwable t) {
        // OutOfMemory state may prevent us from logging the error.
        // Don't make matters worse by rethrowing something meaningless.
      }
    } catch (RepositoryException e) {
      // Drop the entire batch on the floor.  Do not call checkpoint
      // (as there is a discrepancy between what the Connector thinks
      // it has fed, and what actually has been pushed).
      LOGGER.log(Level.SEVERE, "Repository Exception during traversal.", e);
      result = new BatchResult(TraversalDelayPolicy.ERROR);
    } catch (PushException e) {
      LOGGER.log(Level.SEVERE, "Push Exception during traversal.", e);
      // Drop the entire batch on the floor.  Do not call checkpoint
      // (as there is a discrepancy between what the Connector thinks
      // it has fed, and what actually has been pushed).
      result = new BatchResult(TraversalDelayPolicy.ERROR);
    } catch (FeedException e) {
      LOGGER.log(Level.SEVERE, "Feed Exception during traversal.", e);
      // Drop the entire batch on the floor.  Do not call checkpoint
      // (as there is a discrepancy between what the Connector thinks
      // it has fed, and what actually has been pushed).
      result = new BatchResult(TraversalDelayPolicy.ERROR);
    } catch (Throwable t) {
      LOGGER.log(Level.SEVERE, "Uncaught Exception during traversal.", t);
      // Drop the entire batch on the floor.  Do not call checkpoint
      // (as there is a discrepancy between what the Connector thinks
      // it has fed, and what actually has been pushed).
      result = new BatchResult(TraversalDelayPolicy.ERROR);
   } finally {
      // If we have cancelled the work, abandon the batch.
      if (isCancelled()) {
        result = new BatchResult(TraversalDelayPolicy.ERROR);
      }
      
      // 
      // Checkpoint completed work as well as skip past troublesome documents
      // (e.g. documents that are too large and will always fail).
      if ((result == null) && (checkpointAndSave(resultSet) == null)) {
        // Unable to get a checkpoint, so wait a while, then retry batch.
        result = new BatchResult(TraversalDelayPolicy.ERROR);
      }
    }
    if (result == null) {
      result = new BatchResult(TraversalDelayPolicy.IMMEDIATE, counter,
                               startTime, clock.getTimeMillis());
    } else if (pusher != null) {
      // We are returning an error from this batch. Cancel any feed that
      // might be in progress.
      pusher.cancel();
    }
    return result;
  }

키 코드 본인은 이미 주석을 달았습니다. 이 데이터 집합의 횟수를 옮겨다니며, docPusher 대상에게 문서 대상을 제출하고, 문서 대상이 실행된 후에 단점 상태를 업데이트하여 다음에 데이터 원본에서 데이터를 가져오는 데 사용합니다
/**
   *  
   * @param pm
   * @return
   */
  private String checkpointAndSave(DocumentList pm) {
    String connectorState = null;
    LOGGER.fine("CHECKPOINT: Generating checkpoint for connector "
                + connectorName);
    try {
      connectorState = pm.checkpoint();
    } catch (RepositoryException re) {
      // If checkpoint() throws RepositoryException, it means there is no
      // new checkpoint.
      LOGGER.log(Level.FINE, "Failed to obtain checkpoint for connector "
                 + connectorName, re);
      return null;
    } catch (Exception e) {
      LOGGER.log(Level.INFO, "Failed to obtain checkpoint for connector "
                 + connectorName, e);
      return null;
    }
    try {
      if (connectorState != null) {
        if (stateStore != null) {
          stateStore.storeTraversalState(connectorState);
        } else {
          throw new IllegalStateException("null TraversalStateStore");
        }
        LOGGER.fine("CHECKPOINT: " + connectorState);
      }
      return connectorState;
    } catch (IllegalStateException ise) {
      // We get here if the store for the connector is disabled.
      // That happens if the connector was deleted while we were working.
      // Our connector seems to have been deleted.  Don't save a checkpoint.
      LOGGER.fine("Checkpoint discarded: " + connectorState);
    }
    return null;
  }

취소 방법은 부울 변수 값을 설정하여 동기화를 고려해야 합니다
/**
   *  
   */
  @Override
  public void cancelBatch() {
    synchronized(cancelLock) {
      cancelWork = true;
    }
    LOGGER.fine("Cancelling traversal for connector " + connectorName);
  }

---------------------------------------------------------------------------
본 시리즈 기업 검색엔진 개발의 연결기connector계 본인 오리지널
전재 는 출처 가 블로그 정원 고슴도치 의 온순함 을 밝혀 주십시오
본인 이메일:chenying998179@163#com
본문 링크http://www.cnblogs.com/chenying99/p/3775534.html

좋은 웹페이지 즐겨찾기