博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
企业搜索引擎开发之连接器connector(二十)
阅读量:6325 次
发布时间:2019-06-22

本文共 12366 字,大约阅读时间需要 41 分钟。

连接器里面衔接数据源与数据推送对象的是QueryTraverser类对象,该类实现了Traverser接口

/** * Interface presented by a Traverser.  Used by the Scheduler. */public interface Traverser {  /**   * Interval to wait after a transient error before retrying a traversal.   */  public static final int ERROR_WAIT_MILLIS = 15 * 60 * 1000;  /**   * Runs a batch of documents. The Traversal method may be hard (impossible?)   * to interrupt while it is executing runBatch(). It is expected that a   * thread loop running a traversal method would call runBatch(), then check   * for InterruptedException, then decide whether it wants to stop of itself,   * for scheduling reasons, or for a clean shutdown. It could then re-adjust   * the batch hint if desired, then repeat.   *   * @param  batchSize A {
@link BatchSize} instructs the traversal method to * process approximately {
@code batchSize.getHint()}, but no more * than {
@code batchSize.getMaximum()} number of documents in this * batch. * @return A {
@link BatchResult} containing the actual number of documents * from this batch given to the feed and a possible policy to delay * before requesting another batch. */ public BatchResult runBatch(BatchSize batchSize); /** * Cancel the Batch in progress. Discard the batch. This might be called * when the workItem times out, connector deletion or reconfiguration, or * during shutdown. */ public void cancelBatch();}

即上面的BatchResult runBatch(BatchSize batchSize)方法,参数BatchSize batchSize表示批次大小

QueryTraverser类对象通过引用TraversalManager queryTraversalManager对象实例获取数据源数据,同时引用PusherFactory pusherFactory对象实例实例化docPuser对象实例发送document对象数据,成员变量TraversalStateStore stateStore用于获取状态和保存状态(用于断点发送)

@Override  public BatchResult runBatch(BatchSize batchSize) {      //开始时间    final long startTime = clock.getTimeMillis();    //超时时间    final long timeoutTime = startTime      + traversalContext.traversalTimeLimitSeconds() * 1000;    //已取消    if (isCancelled()) {        LOGGER.warning("Attempting to run a cancelled QueryTraverser");      return new BatchResult(TraversalDelayPolicy.ERROR);    }    try {      //批次大小      queryTraversalManager.setBatchHint(batchSize.getHint());    } catch (RepositoryException e) {      LOGGER.log(Level.WARNING, "Unable to set batch hint", e);    }    String connectorState;    try {      if (stateStore != null) {        //获取断点状态        connectorState = stateStore.getTraversalState();      } else {        throw new IllegalStateException("null TraversalStateStore");      }    } catch (IllegalStateException ise) {      // We get here if the store for the connector is disabled.      // That happens if the connector was deleted while we were asleep.      // Our connector seems to have been deleted.  Don't process a batch.      LOGGER.fine("Halting traversal for connector " + connectorName                  + ": " + ise.getMessage());      return new BatchResult(TraversalDelayPolicy.ERROR);    }    DocumentList resultSet = null;    if (connectorState == null) {      try {        LOGGER.fine("START TRAVERSAL: Starting traversal for connector "                    + connectorName);        resultSet = queryTraversalManager.startTraversal();      } catch (Exception e) {        LOGGER.log(Level.WARNING, "startTraversal threw exception: ", e);        return new BatchResult(TraversalDelayPolicy.ERROR);      }    } else {      try {        LOGGER.fine("RESUME TRAVERSAL: Resuming traversal for connector "            + connectorName + " from checkpoint " + connectorState);        resultSet = queryTraversalManager.resumeTraversal(connectorState);      } catch (Exception e) {        LOGGER.log(Level.WARNING, "resumeTraversal threw exception: ", e);        return new BatchResult(TraversalDelayPolicy.ERROR);      }    }    // If the traversal returns null, that means that the repository has    // no new content to traverse.    if (resultSet == null) {      LOGGER.fine("Result set from connector " + connectorName                  + " is NULL, no documents returned for traversal.");      return new BatchResult(TraversalDelayPolicy.POLL, 0);    }    Pusher pusher = null;    //反馈信息    BatchResult result = null;    int counter = 0;    try {        //同一批次同一个pusher实例      // Get a Pusher for feeding the returned Documents.      pusher = pusherFactory.newPusher(connectorName);      while (true) {        if (Thread.currentThread().isInterrupted() || isCancelled()) {          LOGGER.fine("Traversal for connector " + connectorName                      + " has been interrupted; breaking out of batch run.");          break;        }        if (clock.getTimeMillis() >= timeoutTime) {          LOGGER.fine("Traversal batch for connector " + connectorName              + " is completing due to time limit.");          break;        }        String docid = null;        try {          LOGGER.finer("Pulling next document from connector " + connectorName);                             Document nextDocument = resultSet.nextDocument();                    //该resultSet数据集合批次已发送完毕          if (nextDocument == null) {            LOGGER.finer("Traversal batch for connector " + connectorName                + " at end after processing " + counter + " documents.");            break;          } else {            //System.out.println("resultSet.getClass().getName():"+resultSet.getClass().getName());            //System.out.println("nextDocument.getClass().getName():"+nextDocument.getClass().getName());            // Since there are a couple of places below that could throw            // exceptions but not exit the while loop, the counter should be            // incremented here to insure it represents documents returned from            // the list.  Note the call to nextDocument() could also throw a            // RepositoryDocumentException signaling a skipped document in which            // case the call will not be counted against the batch maximum.            counter++;            // Fetch DocId to use in messages.            try {              docid = Value.getSingleValueString(nextDocument,                                                 SpiConstants.PROPNAME_DOCID);            } catch (IllegalArgumentException e1) {                LOGGER.finer("Unable to get document id for document ("                             + nextDocument + "): " + e1.getMessage());            } catch (RepositoryException e1) {                LOGGER.finer("Unable to get document id for document ("                             + nextDocument + "): " + e1.getMessage());            }          }          LOGGER.finer("Sending document (" + docid + ") from connector "              + connectorName + " to Pusher");          //发布document          if (pusher.take(nextDocument) != PusherStatus.OK) {            LOGGER.fine("Traversal batch for connector " + connectorName                + " is completing at the request of the Pusher,"                + " after processing " + counter + " documents.");            break;          }        } catch (SkippedDocumentException e) {          /* TODO (bmj): This is a temporary solution and should be replaced.           * It uses Exceptions for non-exceptional cases.           */          // Skip this document.  Proceed on to the next one.          logSkippedDocument(docid, e);        } catch (RepositoryDocumentException e) {          // Skip individual documents that fail.  Proceed on to the next one.          logSkippedDocument(docid, e);        } catch (RuntimeException e) {          // Skip individual documents that fail.  Proceed on to the next one.          logSkippedDocument(docid, e);        }      }      // No more documents. Wrap up any accumulated feed data and send it off.      if (!isCancelled()) {        pusher.flush();      }    } catch (OutOfMemoryError e) {      pusher.cancel();      System.runFinalization();      System.gc();      result = new BatchResult(TraversalDelayPolicy.ERROR);      try {        LOGGER.severe("Out of JVM Heap Space.  Will retry later.");        LOGGER.log(Level.FINEST, e.getMessage(), e);      } catch (Throwable t) {        // OutOfMemory state may prevent us from logging the error.        // Don't make matters worse by rethrowing something meaningless.      }    } catch (RepositoryException e) {      // Drop the entire batch on the floor.  Do not call checkpoint      // (as there is a discrepancy between what the Connector thinks      // it has fed, and what actually has been pushed).      LOGGER.log(Level.SEVERE, "Repository Exception during traversal.", e);      result = new BatchResult(TraversalDelayPolicy.ERROR);    } catch (PushException e) {      LOGGER.log(Level.SEVERE, "Push Exception during traversal.", e);      // Drop the entire batch on the floor.  Do not call checkpoint      // (as there is a discrepancy between what the Connector thinks      // it has fed, and what actually has been pushed).      result = new BatchResult(TraversalDelayPolicy.ERROR);    } catch (FeedException e) {      LOGGER.log(Level.SEVERE, "Feed Exception during traversal.", e);      // Drop the entire batch on the floor.  Do not call checkpoint      // (as there is a discrepancy between what the Connector thinks      // it has fed, and what actually has been pushed).      result = new BatchResult(TraversalDelayPolicy.ERROR);    } catch (Throwable t) {      LOGGER.log(Level.SEVERE, "Uncaught Exception during traversal.", t);      // Drop the entire batch on the floor.  Do not call checkpoint      // (as there is a discrepancy between what the Connector thinks      // it has fed, and what actually has been pushed).      result = new BatchResult(TraversalDelayPolicy.ERROR);   } finally {      // If we have cancelled the work, abandon the batch.      if (isCancelled()) {        result = new BatchResult(TraversalDelayPolicy.ERROR);      }            //更新断点状态      // Checkpoint completed work as well as skip past troublesome documents      // (e.g. documents that are too large and will always fail).      if ((result == null) && (checkpointAndSave(resultSet) == null)) {        // Unable to get a checkpoint, so wait a while, then retry batch.        result = new BatchResult(TraversalDelayPolicy.ERROR);      }    }    if (result == null) {      result = new BatchResult(TraversalDelayPolicy.IMMEDIATE, counter,                               startTime, clock.getTimeMillis());    } else if (pusher != null) {      // We are returning an error from this batch. Cancel any feed that      // might be in progress.      pusher.cancel();    }    return result;  }

关键代码本人已作了注释,通过遍历该数据集合批次,向docPusher对象提交document对象,遍历document对象执行完毕后更新断点状态用于下次从数据源获取数据

/**   * 保存断点状态   * @param pm   * @return   */  private String checkpointAndSave(DocumentList pm) {    String connectorState = null;    LOGGER.fine("CHECKPOINT: Generating checkpoint for connector "                + connectorName);    try {      connectorState = pm.checkpoint();    } catch (RepositoryException re) {      // If checkpoint() throws RepositoryException, it means there is no      // new checkpoint.      LOGGER.log(Level.FINE, "Failed to obtain checkpoint for connector "                 + connectorName, re);      return null;    } catch (Exception e) {      LOGGER.log(Level.INFO, "Failed to obtain checkpoint for connector "                 + connectorName, e);      return null;    }    try {      if (connectorState != null) {        if (stateStore != null) {          stateStore.storeTraversalState(connectorState);        } else {          throw new IllegalStateException("null TraversalStateStore");        }        LOGGER.fine("CHECKPOINT: " + connectorState);      }      return connectorState;    } catch (IllegalStateException ise) {      // We get here if the store for the connector is disabled.      // That happens if the connector was deleted while we were working.      // Our connector seems to have been deleted.  Don't save a checkpoint.      LOGGER.fine("Checkpoint discarded: " + connectorState);    }    return null;  }

取消执行方法通过设置布尔变量值,注意需要考虑同步

/**   * 取消执行   */  @Override  public void cancelBatch() {    synchronized(cancelLock) {      cancelWork = true;    }    LOGGER.fine("Cancelling traversal for connector " + connectorName);  }

---------------------------------------------------------------------------

本系列企业搜索引擎开发之连接器connector系本人原创

转载请注明出处 博客园 刺猬的温驯

本人邮箱: chenying998179@163#com (#改为.)

本文链接  

你可能感兴趣的文章
查询指定时间段的数据
查看>>
XenServer 优化
查看>>
mysql中 decimal、numeric数据类型
查看>>
Android 访问网络须知
查看>>
p1341 无序字母对
查看>>
loj10099 矿场搭建
查看>>
JQ 1
查看>>
简单用CreateThread传递自定义参数
查看>>
机器学习资源
查看>>
DJANGO 自定义分页组件
查看>>
【LeetCode每天一题】Convert Sorted Array to Binary Search Tree(根据有序数组构建平衡二叉树)...
查看>>
DES加密系统的实现
查看>>
使用MVC创建API
查看>>
CodeForces Round 521 div3
查看>>
Spring Boot 使用 AOP 实现页面自适应
查看>>
如何检测被锁住的Oracle存储过程及处理办法汇总(转)
查看>>
GString及IntelliJIdea中调试Groovy的操作步骤
查看>>
setInterval 传参数
查看>>
2013长沙邀请赛A So Easy!(矩阵快速幂,共轭)
查看>>
python JoinableQueue在生产者消费者项目中的简单应用
查看>>