Java High Level REST Client API 로 작 성 된 ElasticSearch 도구 클래스

34223 단어 ElasticSearch
최근 에 elasticsearch(버 전 6.4.0)를 배치 하여 로그 정 보 를 저장 해 야 합 니 다.여기 에는 홈 페이지 예 시 를 결합 하여 제 공 된 자바 High Level REST Client API 를 사용 하여 자주 사용 하 는 도구 류 를 직접 만 들 었 습 니 다.잔말 말고 다음은 코드 입 니 다.부족 한 점 많이 가르쳐 주세요.
/**
 * elasticSearh    
 * @author Stephen
 * @version 1.0
 * @date 2018/09/17 11:12:08
 */
public class ElasticClient implements Closeable {
    
    private static final String INDEX_KEY = "index";
    private static final String TYPE_KEY = "type";   
    private static final String INDEX = "spider";
    private static final String TYPE = "doc";
    private static final String TIMESTAMP = "timestamp";
    
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticClient.class);
    
    private String[] hosts;
    
    protected RestHighLevelClient client;
    
    public ElasticClient(String[] hosts) {
        this.hosts = hosts;
    }

    @Override
    public void close() throws IOException {
        if (Objects.nonNull(client)) {
            client.close();
        }
    }
    
    /**
     *      
     */
    public void configure() {
        Validate.noNullElements(hosts, "Elastic        !");
        HttpHost[] httpHosts = Arrays.stream(hosts).map(host -> {
            String[] hostParts = host.split(":");
            String hostName = hostParts[0];
            int port = Integer.parseInt(hostParts[1]);
            return new HttpHost(hostName, port, Const.SCHEMA);
        }).filter(Objects::nonNull).toArray(HttpHost[]::new);       
        client = new RestHighLevelClient(RestClient.builder(httpHosts));
    }
    
    /**
     *     (      5     1)
     * @param indexName
     * @throws IOException
     */
    public void createIndex(String indexName) throws IOException {
        if (checkIndexExists(indexName)) {
            LOGGER.error("\"index={}\"      !", indexName);
            return;
        }
        CreateIndexRequest request = new CreateIndexRequest(indexName);
        request.mapping(TYPE, generateBuilder());        
        CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
        //               
        boolean acknowledged = response.isAcknowledged();
        //                              
        boolean shardsAcknowledged = response.isShardsAcknowledged();
        if (acknowledged || shardsAcknowledged) {
            LOGGER.info("      !     {}", indexName);
        }
    }
    
    /**
     *     (   index   type)
     * @param index
     * @param type
     * @throws IOException
     */
    public void createIndex(String index, String type) throws IOException {
        if (checkIndexExists(index)) {
            LOGGER.error("\"index={}\"     !", index);
            return;
        }
        CreateIndexRequest request = new CreateIndexRequest(index);
        request.mapping(type, generateBuilder());        
        CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
        boolean acknowledged = response.isAcknowledged();
        boolean shardsAcknowledged = response.isShardsAcknowledged();
        if (acknowledged || shardsAcknowledged) {
            LOGGER.info("      !     {}", index);
        }
    }
    
    /**
     *     (    :   、   )
     * @param indexName
     * @param shards
     * @param replicas
     * @throws IOException
     */
    public void createIndex(String indexName, int shards, int replicas) throws IOException {
        if (checkIndexExists(indexName)) {
            LOGGER.error("\"index={}\"     !", indexName);
            return;
        }
        Builder builder = Settings.builder().put("index.number_of_shards", shards).put("index.number_of_replicas", replicas);
        CreateIndexRequest request = new CreateIndexRequest(indexName).settings(builder);
        request.mapping(TYPE, generateBuilder());
        CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
        if (response.isAcknowledged() || response.isShardsAcknowledged()) {
            LOGGER.info("      !     {}", indexName);
        }
    }
    
    /**
     *     
     * @param indexName
     * @throws IOException
     */
    public void deleteIndex(String indexName) throws IOException {
        try {
            DeleteIndexResponse response = client.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
            if (response.isAcknowledged()) {
                LOGGER.info("{}       !", indexName);
            }
        } catch (ElasticsearchException ex) {
            if (ex.status() == RestStatus.NOT_FOUND) {
                LOGGER.error("{}       ", indexName);
            }
            LOGGER.error("    !");
        }
    }
    
    /**
     *         
     * @param indexName
     * @return
     * @throws IOException
     */
    public boolean checkIndexExists(String indexName) {
        GetIndexRequest request = new GetIndexRequest().indices(indexName);
        try {
            return client.indices().exists(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            LOGGER.error("        ,    !");
        }
        return false;
    }
    
    /**
     *     
     * @param indexName
     * @throws IOException
     */
    public void openIndex(String indexName) throws IOException{
        if (!checkIndexExists(indexName)) {
            LOGGER.error("     !");
            return;
        }
        OpenIndexRequest request = new OpenIndexRequest(indexName);
        OpenIndexResponse response = client.indices().open(request, RequestOptions.DEFAULT);
        if (response.isAcknowledged() || response.isShardsAcknowledged()) {
            LOGGER.info("{}       !", indexName);
        }
    }
    
    /**
     *     
     * @param indexName
     * @throws IOException
     */
    public void closeIndex(String indexName) throws IOException {
        if (!checkIndexExists(indexName)) {
            LOGGER.error("     !");
            return;
        }
        CloseIndexRequest request = new CloseIndexRequest(indexName); 
        CloseIndexResponse response = client.indices().close(request, RequestOptions.DEFAULT);
        if (response.isAcknowledged()) {
            LOGGER.info("{}      !", indexName);
        }
    }
    
    /**
     *          (     message     ik   )
     * @param index
     * @param type
     * @throws IOException
     */
    public void setFieldsMapping(String index, String type) {
        PutMappingRequest request = new PutMappingRequest(index).type(type);
        try {
            request.source(generateBuilder());
            PutMappingResponse response = client.indices().putMapping(request, RequestOptions.DEFAULT);
            if (response.isAcknowledged()) {
                LOGGER.info("    \"index={}, type={}\"         !", index, type);
            }
        } catch (IOException e) {
            LOGGER.error("\"index={}, type={}\"           ,     !", index, type);
        }
    }
    private XContentBuilder generateBuilder() throws IOException {
        XContentBuilder builder = XContentFactory.jsonBuilder();        
        builder.startObject();
            builder.startObject("properties");           
                builder.startObject("message");               
                    builder.field("type", "text");
                    //  message  ,       ik_smart(    )
                    builder.field("analyzer", "ik_smart");                
                builder.endObject();
                builder.startObject(TIMESTAMP);               
                    builder.field("type", "date");
                    //                  long  
                    builder.field("format", "epoch_millis");                
                builder.endObject();
            builder.endObject();        
        builder.endObject();        
        return builder;
    }
   
    /**
     *     
     * @param indexName
     * @param typeName
     * @param id
     * @param jsonStr
     */
    public void addDocByJson(String indexName, String typeName, String id, String jsonString) throws IOException{
        if (!JsonValidator.validate(jsonString)) {
            LOGGER.error("   json   ,    !");
            return;
        }
        if (!checkIndexExists(indexName)) {
            createIndex(indexName, typeName);
        }       
        IndexRequest request = new IndexRequest(indexName, typeName, id).source(jsonString, XContentType.JSON);
        // request opType   INDEX(    id    document,CREATE       )
        // request.opType(DocWriteRequest.OpType.CREATE)
        IndexResponse response = null;
        try {
            response = client.index(request, RequestOptions.DEFAULT);
            
            String index = response.getIndex();
            String type = response.getType();
            String documentId = response.getId();
            if (response.getResult() == DocWriteResponse.Result.CREATED) {
                LOGGER.info("      ! index: {}, type: {}, id: {}", index , type, documentId);
            } else if (response.getResult() == DocWriteResponse.Result.UPDATED) {
                LOGGER.info("      ! index: {}, type: {}, id: {}", index , type, documentId);
            }
            //       
            ReplicationResponse.ShardInfo shardInfo = response.getShardInfo();
            if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
               LOGGER.error("           !"); 
            }
            //          ,          
            if (shardInfo.getFailed() > 0) {
                for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                    String reason = failure.reason(); 
                    LOGGER.error("      :{}", reason);
                }
            }
        } catch (ElasticsearchException e) {
            if (e.status() == RestStatus.CONFLICT) {
                LOGGER.error("    !");
            }
            LOGGER.error("      !");
        }
    }
    
    /**
     *     
     * @param index
     * @param type
     * @param id
     * @return
     * @throws IOException
     */
    public Map getDocument(String index, String type, String id) throws IOException{
        Map resultMap = new HashMap<>();        
        GetRequest request = new GetRequest(index, type, id);
        //   ( )
        request.realtime(false);
        //         ( )
        request.refresh(true);
        
        GetResponse response = null;
        try {
            response = client.get(request, RequestOptions.DEFAULT);
        } catch (ElasticsearchException e) {
            if (e.status() == RestStatus.NOT_FOUND) {
                LOGGER.error("     ,     !" );
            }
            if (e.status() == RestStatus.CONFLICT) {
                LOGGER.error("    !" );
            }
            LOGGER.error("    !");
        }
        
        if(Objects.nonNull(response)) {
            if (response.isExists()) { //           
                resultMap = response.getSourceAsMap();                               
            } else {
                //           。    ,         404    ,       GetResponse       。
                //               ,   isExists    false。
                LOGGER.error("     ,     !" );
            }
        }
        return resultMap;
    }
    
    /**
     *     
     * @param index
     * @param type
     * @param id
     * @throws IOException
     */
    public void deleteDocument(String index, String type, String id) throws IOException {
        DeleteRequest request = new DeleteRequest(index, type, id);
        DeleteResponse response = null;
        try {
            response = client.delete(request, RequestOptions.DEFAULT);
        } catch (ElasticsearchException e) {
            if (e.status() == RestStatus.CONFLICT) {
                LOGGER.error("    !" );
            }
            LOGGER.error("    !");
        }
        if (Objects.nonNull(response)) {
            if (response.getResult() == DocWriteResponse.Result.NOT_FOUND) {
                LOGGER.error("      ,     !");
            }
            LOGGER.info("     !");
            ReplicationResponse.ShardInfo shardInfo = response.getShardInfo();
            if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
                LOGGER.error("         ");
            }
            if (shardInfo.getFailed() > 0) {
                for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                    String reason = failure.reason();
                    LOGGER.error("    :{}", reason);
                }
            }
        }
    }
    
    /**
     *         ( :"ctx._source.posttime=\"2018-09-18\"")    
     * @param index
     * @param type
     * @param id
     * @param script
     */
    public void updateDocByScript(String index, String type, String id, String script) throws IOException{
        Script inline = new Script(script);
        UpdateRequest request = new UpdateRequest(index, type, id).script(inline);
        try {
            UpdateResponse response  = client.update(request, RequestOptions.DEFAULT);            
            if (response.getResult() == DocWriteResponse.Result.UPDATED) {
                LOGGER.info("      !");
            } else if (response.getResult() == DocWriteResponse.Result.DELETED) {
                LOGGER.error("\"index={},type={},id={}\"       ,    !", response.getIndex(), response.getType(), response.getId());
            } else if(response.getResult() == DocWriteResponse.Result.NOOP) {
                LOGGER.error("       !");
            }
            
            ReplicationResponse.ShardInfo shardInfo = response.getShardInfo();
            if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
                LOGGER.error("         ");
            }
            if (shardInfo.getFailed() > 0) {
                for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                    String reason = failure.reason();
                    LOGGER.error("     :{}", reason);
                }
            }
        } catch (ElasticsearchException e) {
            if (e.status() == RestStatus.NOT_FOUND) {
                LOGGER.error("       ,     !" );
            } else if (e.status() == RestStatus.CONFLICT) {
                LOGGER.error("      !" );
            }
            LOGGER.error("    !");
        }
    }
    
    /**
     *     JSON       (        ,           )
     * @param index
     * @param type
     * @param id
     * @param jsonString
     * @throws IOException
     */
    public void updateDocByJson(String index, String type, String id, String jsonString) throws IOException {
        if (!JsonValidator.validate(jsonString)) {
            LOGGER.error("   json   ,    !");
            return;
        }
        if (!checkIndexExists(index)) {
            createIndex(index, type);
        }
        UpdateRequest request = new UpdateRequest(index, type, id);       
        request.doc(jsonString, XContentType.JSON);
        //            ,              
        request.docAsUpsert(true);
        try {            
            UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
            String indexName = response.getIndex();
            String typeName = response.getType();
            String documentId = response.getId();
            if (response.getResult() == DocWriteResponse.Result.CREATED) {
                LOGGER.info("      !index: {}, type: {}, id: {}", indexName, typeName, documentId);
            } else if (response.getResult() == DocWriteResponse.Result.UPDATED) {
                LOGGER.info("      !");
            } else if (response.getResult() == DocWriteResponse.Result.DELETED) {
                LOGGER.error("\"index={},type={},id={}\"       ,    !", indexName, typeName, documentId);
            } else if (response.getResult() == DocWriteResponse.Result.NOOP) {
                LOGGER.error("       !");
            }
            
            ReplicationResponse.ShardInfo shardInfo = response.getShardInfo();
            if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
                LOGGER.error("         ");
            }
            if (shardInfo.getFailed() > 0) {
                for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                    String reason = failure.reason();
                    LOGGER.error("     :{}", reason);
                }
            }
        } catch (ElasticsearchException e) {
            if (e.status() == RestStatus.NOT_FOUND) {
                LOGGER.error("       ,     !" );
            } else if (e.status() == RestStatus.CONFLICT) {
                LOGGER.error("      !" );
            }
            LOGGER.error("    !");
        }
    }
    
    /**
     *       
     * @param params
     * @throws IOException
     */
    public void bulkAdd(List> params) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        for (Map dataMap : params) {
            String index = dataMap.getOrDefault(INDEX_KEY, INDEX);
            String type = dataMap.getOrDefault(TYPE_KEY, TYPE);
            String id = dataMap.get("id");
            String jsonString = dataMap.get("json");
            if (StringUtils.isNotBlank(id) && JsonValidator.validate(jsonString)) {
                IndexRequest request = new IndexRequest(index, type, id).source(jsonString, XContentType.JSON);
                bulkRequest.add(request);
            }            
        }
        //     (2  )
        bulkRequest.timeout(TimeValue.timeValueMinutes(2L));
        //     
        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        
        if (bulkRequest.numberOfActions() == 0) {           
            LOGGER.error("    ,        !");
            return;
        } 
        BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        //       
        if (!bulkResponse.hasFailures()) {
            LOGGER.info("        !");
        } else {
            for (BulkItemResponse bulkItemResponse : bulkResponse) {
                if (bulkItemResponse.isFailed()) {
                    BulkItemResponse.Failure failure = bulkItemResponse.getFailure();                   
                    LOGGER.error("\"index={}, type={}, id={}\"       !", failure.getIndex(), failure.getType(), failure.getId());
                    LOGGER.error("      : {}", failure.getMessage());
                } else {
                    LOGGER.info("\"index={}, type={}, id={}\"       !", bulkItemResponse.getIndex(), bulkItemResponse.getType(), bulkItemResponse.getId()); 
                }
            }
        }
    }
    
    /**
     *       
     * @param params
     * @throws IOException
     */
    public void bulkUpdate(List> params) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        for (Map dataMap : params) {
            String index = dataMap.getOrDefault(INDEX_KEY, INDEX);
            String type = dataMap.getOrDefault(TYPE_KEY, TYPE);
            String id = dataMap.get("id");
            String jsonString = dataMap.get("json");
            if (StringUtils.isNotBlank(id) && JsonValidator.validate(jsonString)) {
                UpdateRequest request = new UpdateRequest(index, type, id).doc(jsonString, XContentType.JSON);
                request.docAsUpsert(true);
                bulkRequest.add(request);
            }           
        }
        if (bulkRequest.numberOfActions() == 0) {           
            LOGGER.error("    ,        !");
            return;
        }
        bulkRequest.timeout(TimeValue.timeValueMinutes(2L));
        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);        
        BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        if (!bulkResponse.hasFailures()) {
            LOGGER.info("        !");
        } else {
            for (BulkItemResponse bulkItemResponse : bulkResponse) {
                if (bulkItemResponse.isFailed()) {
                    BulkItemResponse.Failure failure = bulkItemResponse.getFailure();                   
                    LOGGER.error("\"index={}, type={}, id={}\"       !", failure.getIndex(), failure.getType(), failure.getId());
                    LOGGER.error("      : {}", failure.getMessage());
                } else {
                    LOGGER.info("\"index={}, type={}, id={}\"       !", bulkItemResponse.getIndex(), bulkItemResponse.getType(), bulkItemResponse.getId());  
                }
            }
        }
    }
    
    /**
     *       
     * @param params
     * @throws IOException
     */
    public void bulkDelete(List> params) throws IOException {
        BulkRequest bulkRequest = new BulkRequest();
        for (Map dataMap : params) {
            String index = dataMap.getOrDefault(INDEX_KEY, INDEX);
            String type = dataMap.getOrDefault(TYPE_KEY, TYPE);
            String id = dataMap.get("id");
            if (StringUtils.isNotBlank(id)){
                DeleteRequest request = new DeleteRequest(index, type, id);
                bulkRequest.add(request);
            }
        }
        if (bulkRequest.numberOfActions() == 0) {           
            LOGGER.error("    ,     !");
            return;
        }
        bulkRequest.timeout(TimeValue.timeValueMinutes(2L));
        bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
        BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        if (!bulkResponse.hasFailures()) {
            LOGGER.info("        !");
        } else {
            for (BulkItemResponse bulkItemResponse : bulkResponse) {
                if (bulkItemResponse.isFailed()) {
                    BulkItemResponse.Failure failure = bulkItemResponse.getFailure();                   
                    LOGGER.error("\"index={}, type={}, id={}\"       !", failure.getIndex(), failure.getType(), failure.getId());
                    LOGGER.error("      : {}", failure.getMessage());
                } else {
                    LOGGER.info("\"index={}, type={}, id={}\"       !", bulkItemResponse.getIndex(), bulkItemResponse.getType(), bulkItemResponse.getId()); 
                }
            }
        }
    }
    
    /**
     *       
     * @param params
     * @return
     * @throws IOException
     */
    public List> multiGet(List> params) throws IOException {
        List> resultList = new ArrayList<>();

        MultiGetRequest request = new MultiGetRequest();
        for (Map dataMap : params) {
            String index = dataMap.getOrDefault(INDEX_KEY, INDEX);
            String type = dataMap.getOrDefault(TYPE_KEY, TYPE);
            String id = dataMap.get("id");
            if (StringUtils.isNotBlank(id)) {
                request.add(new MultiGetRequest.Item(index, type, id));
            }
        }
        request.realtime(false);
        request.refresh(true);
        MultiGetResponse response = client.mget(request, RequestOptions.DEFAULT);
        List> list = parseMGetResponse(response);
        if (!list.isEmpty()) {
            resultList.addAll(list);
        }                      
        return resultList;
    }
    private List> parseMGetResponse(MultiGetResponse response) {       
        List> list = new ArrayList<>();
        MultiGetItemResponse[] responses = response.getResponses();
        for (MultiGetItemResponse item : responses) {
            GetResponse getResponse = item.getResponse();
            if (Objects.nonNull(getResponse)) {
                if (!getResponse.isExists()) {
                    LOGGER.error("\"index={}, type={}, id={}\"       ,     !", getResponse.getIndex(), getResponse.getType(), getResponse.getId());
                } else {
                    list.add(getResponse.getSourceAsMap()); 
                }
            } else {
                MultiGetResponse.Failure failure = item.getFailure();
                ElasticsearchException e = (ElasticsearchException) failure.getFailure();
                if (e.status() == RestStatus.NOT_FOUND) {
                    LOGGER.error("\"index={}, type={}, id={}\"      !", failure.getIndex(), failure.getType(), failure.getId());
                } else if (e.status() == RestStatus.CONFLICT) {
                    LOGGER.error("\"index={}, type={}, id={}\"       !", failure.getIndex(), failure.getType(), failure.getId());
                }
            }                                                                                                        
        }
        return list;
    }
    
    /**
     *           (  level messageKey      )
     * @param level     ,    
     * @param messageKey        ,    
     * @param startTime       ,    
     * @param endTime       ,    
     * @param size      ,    ,      10 。      10000,    10000     {@link #queryAllByConditions}
     * @return
     * @throws IOException
     */
    public List> queryByConditions(String level, String messageKey, Long startTime, Long endTime, Integer size) throws IOException {
        List> resultList = new ArrayList<>();        
        if (StringUtils.isBlank(level) && StringUtils.isBlank(messageKey)) {
            LOGGER.error("  level(    ) messageKey(       )      !");
            return resultList;
        }
        
        QueryBuilder query = generateQuery(level, messageKey, startTime, endTime);
        FieldSortBuilder order = SortBuilders.fieldSort(TIMESTAMP).order(SortOrder.DESC);        
        SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
        searchBuilder.timeout(TimeValue.timeValueMinutes(2L));
        searchBuilder.query(query);
        searchBuilder.sort(order);        
        if (Objects.nonNull(size)) {
            searchBuilder.size(size);
        }
        
        SearchRequest request = new SearchRequest(INDEX).types(TYPE);
        request.source(searchBuilder);       
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        int failedShards = response.getFailedShards();
        if (failedShards > 0) {
            LOGGER.error("          !");
            for (ShardSearchFailure failure : response.getShardFailures()) {
                String reason = failure.reason();
                LOGGER.error("        :{}", reason);
            }
        }
        List> list = parseSearchResponse(response);
        if (!list.isEmpty()) {
            resultList.addAll(list);
        }
        return resultList;
    }
    private QueryBuilder generateQuery(String level, String messageKey, Long startTime, Long endTime) {
        // term query(  level)
        TermQueryBuilder levelQuery = null;
        if (StringUtils.isNotBlank(level)) {
            levelQuery = QueryBuilders.termQuery("level", level.toLowerCase());
        }
        // match query(  message)
        MatchQueryBuilder messageQuery = null;
        if (StringUtils.isNotBlank(messageKey)) {
            messageQuery = QueryBuilders.matchQuery("message", messageKey);
        }
        // range query(  timestamp)
        RangeQueryBuilder timeQuery = QueryBuilders.rangeQuery(TIMESTAMP);
        timeQuery.format("epoch_millis");
        if (Objects.isNull(startTime)) {
            if (Objects.isNull(endTime)) {
                timeQuery = null;
            } else {
                timeQuery.lte(endTime);
            }
        } else {
            if (Objects.isNull(endTime)) {
                timeQuery.gte(startTime);
            } else {
                timeQuery.gte(startTime).lte(endTime);
            }
        }
        //      query  
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        if (Objects.nonNull(levelQuery)) {
            boolQuery.must(levelQuery);
        }
        if (Objects.nonNull(messageQuery)) {
            boolQuery.must(messageQuery);
        }
        if (Objects.nonNull(timeQuery)) {
            boolQuery.must(timeQuery);
        }
        return boolQuery;
    }
    private List> parseSearchResponse(SearchResponse response){
        List> resultList = new ArrayList<>();
        SearchHit[] hits = response.getHits().getHits();
        for (SearchHit hit : hits) {
            resultList.add(hit.getSourceAsMap());
        }
        return resultList;
    }
    
    /**
     *     ,         (  level messageKey      )
     * @param level     ,    
     * @param messageKey        ,    
     * @param startTime       ,    
     * @param endTime       ,    
     * @return
     */
    public List> queryAllByConditions(String level, String messageKey, Long startTime, Long endTime) throws IOException {
        List> resultList = new ArrayList<>();        
        if (StringUtils.isBlank(level) && StringUtils.isBlank(messageKey)) {
            LOGGER.error("  level(    ) messageKey(       )      !");
            return resultList;
        }
        
        QueryBuilder query = generateQuery(level, messageKey, startTime, endTime);
        FieldSortBuilder order = SortBuilders.fieldSort(TIMESTAMP).order(SortOrder.DESC);
        SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
        searchBuilder.query(query).sort(order);
        searchBuilder.size(500);
        
        //     scroll    
        SearchRequest request = new SearchRequest(INDEX).types(TYPE);
        final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
        request.source(searchBuilder).scroll(scroll);
        SearchResponse response = client.search(request, RequestOptions.DEFAULT); 
        String scrollId = response.getScrollId();
        SearchHit[] searchHits = response.getHits().getHits();
        //     scroll        List 
        for (SearchHit searchHit : searchHits) {
            resultList.add(searchHit.getSourceAsMap());
        }
        //     scrollId          
        while (searchHits != null && searchHits.length > 0) { 
            SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); 
            scrollRequest.scroll(scroll);
            response = client.scroll(scrollRequest, RequestOptions.DEFAULT);
            scrollId = response.getScrollId();
            searchHits = response.getHits().getHits();
            //          
            for (SearchHit searchHit : searchHits) {
                resultList.add(searchHit.getSourceAsMap());
            }
        }
        //    scroll    
        ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); 
        clearScrollRequest.addScrollId(scrollId);
        client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
        return resultList;
    }
    
    /**
     *          (  level messageKey      )
     * @param level     ,    
     * @param messageKey        ,    
     * @param startTime       ,    
     * @param endTime       ,    
     * @param pageNum     ,    (    1)
     * @param pageSize     ,    (    10)
     * @return
     * @throws IOException
     */
    public Page> queryPageByConditions(String level, String messageKey, Long startTime, Long endTime, Integer pageNum, Integer pageSize) throws IOException {
        if (StringUtils.isBlank(level) && StringUtils.isBlank(messageKey)) {
            LOGGER.error("  level(    )、messageKey(       )      !");
            return null;
        }
        
        if (Objects.isNull(pageNum)) {
            pageNum = 1;
        }
        if (Objects.isNull(pageSize)) {
            pageSize = 10;
        }       
        QueryBuilder query = generateQuery(level, messageKey, startTime, endTime);
        FieldSortBuilder order = SortBuilders.fieldSort(TIMESTAMP).order(SortOrder.DESC);        
        SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
        searchBuilder.timeout(TimeValue.timeValueMinutes(2L));        
        searchBuilder.query(query);
        searchBuilder.sort(order);
        searchBuilder.from(pageNum - 1).size(pageSize);
        
        SearchRequest request = new SearchRequest(INDEX).types(TYPE);
        request.source(searchBuilder);       
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        SearchHits hits = response.getHits();
        int totalRecord = (int) hits.getTotalHits();
        List> results = new ArrayList<>();
        for (SearchHit hit : hits.getHits()) {
            results.add(hit.getSourceAsMap());
        }
        
        Page> page = new Page<>();
        page.setPageNum(pageNum);
        page.setPageSize(pageSize);
        page.setTotalRecord(totalRecord);
        page.setResults(results);
        return page;
    }
}

좋은 웹페이지 즐겨찾기