[ES 시리즈 9] - ES 에 데 이 터 를 대량으로 동기 화 합 니 다.

8181 단어 Elastic Search 연구
방안    1. es 홈 페이지 에서 제공 하 는 bulk 방법 으로 실현    2. 데 이 터 를 규칙 에 따라 json 파일 에 기록 하고 curl 명령 을 통 해 일괄 제출 작업 을 합 니 다.         주: 다음 과 같은 실험 es 는 군집 이 고 3 대 2c8g 입 니 다.mongodb 는 군집, 2c16g 세 대;자바 프로그램 을 실행 하 는 기 계 는 2c4g 입 니 다.
과정    1.1. es 홈 페이지 에서 제공 하 는 bulk 방법 으로 이 루어 집 니 다. 코드 는 다음 과 같 습 니 다.
package com.yunshi.timedtask.dao;

import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

@Component
public class TestElasticSearch4J {
	private static RestHighLevelClient client = new RestHighLevelClient(
	        RestClient.builder(
	                new HttpHost("es1.yunshicloud.com", 9200, "http")
	        ));

	public static void main(String[] args) throws IOException {
		TestElasticSearch4J testElasticSearch4J = new TestElasticSearch4J();
		
		List> mapList = new ArrayList<>();
		
		System.out.println("    ,  "+mapList.size()+" ");

		testElasticSearch4J.batchInsert(mapList,"material");
		
		client.close();

	}

	public  void batchInsert(List> mapList,String type) throws IOException {
		BulkRequest request = new BulkRequest();
		
		for (Map map : mapList) {
			IndexRequest indexRequest= new IndexRequest("dev-rms-resource", type).source(map);
			request.add(indexRequest);
		}

		BulkResponse bulkItemResponses = client.bulk(request);

		for (BulkItemResponse bulkItemResponse : bulkItemResponses) {
			if (bulkItemResponse.isFailed()) {
				BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
				System.out.println("      "+failure.getStatus());
			}
		}
	}
     
}

    1.2. 이 방법 은 본인 이 테스트 효율 이 없다.         2.1. 조회 데 이 터 를 통 해 json 파일 에 기록 하고 구조 형식 은 다음 과 같다.        {"index":{"_index":"dev-rms-resource","_type":"material"}}         {"id":"123","name":"aaa","url":"https://123123"}     2.2. curl 명령 을 통 해 일괄 제출 작업        curl -XPOST  http://192.168.1.211:9200/_bulk --data-binary @material.json     2.3. 제출 후 되 돌아 오 는 모든 상 태 를 파일 에 기록 하고 다음 Liux 명령 을 통 해 실 패 했 는 지 확인 합 니 다.        grep -rn "400" *     2.4. 테스트 효율 은 다음 과 같다.        160 만 데이터, 총 크기 2.4G, 대량 매번 2 만 전송, 약 8 분 정도 소요 (mongodb 읽 기 시간 포함)        54 만 데이터, 총 크기 6.4G, 대량 매번 1 천 전송, 약 30 분 정도 걸 립 니 다.    2.4. 자바 코드 구현    
package com.yunshi.timedtask.scheduler;

import com.alibaba.fastjson.JSON;
import com.yunshi.timedtask.common.Constants;
import com.yunshi.timedtask.dao.BasicDao;
import com.yunshi.timedtask.dao.IEsBasicDao;
import com.yunshi.timedtask.dao.TestElasticSearch4J;
import com.yunshi.timedtask.dao.es.ESConfig;
import com.yunshi.timedtask.domain.*;
import com.yunshi.timedtask.util.DateUtil;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import java.io.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Created by makang on 2019 12 30 .
 */
public class InitDataTaskService implements Runnable {

    private final Logger log = LoggerFactory.getLogger(InitDataTaskService.class);

    //     bean
    private BasicDao basicDao;
    private IEsBasicDao iEsBasicDao;
    private ESConfig esConfig;
    private int pageNum;

    //      ,             
    public InitDataTaskService(BasicDao basicDao, IEsBasicDao iEsBasicDao, ESConfig esConfig, int pageNum){
        this.basicDao = basicDao;
        this.iEsBasicDao = iEsBasicDao;
        this.esConfig = esConfig;
        this.pageNum = pageNum;
    }

    //   Runnable    run  
    @Override
    public  void run(){

        //    
        long stertTime = System.currentTimeMillis();

        Map querymaterialAll = new HashMap(1);
        Map sortFilter = new HashMap(1);
        int pageNum = this.pageNum;

        // 、        
        long materialAllSize = basicDao.count("material",querymaterialAll);
        log.info("      ,     :"+materialAllSize);
        //1.1.             
        long allMaterialSize = 0;
        for (int pageSize=1;((pageSize-1)*pageNum*20)> materialList = basicDao.find("material",sortFilter,querymaterialAll,pageSize,pageNum*20);

            try {

                String filePath = "./material"+pageSize+".json";

                for (Map materialMap:materialList) {
                    Map materialesMap = new HashMap<>();

                    materialesMap.putAll(materialMap);

                    //   json   
                    FileWriter fw = new FileWriter(filePath, true);
                    BufferedWriter bw = new BufferedWriter(fw);
                    bw.append("{\"index\":{\"_index\":\""+esConfig.getIndexname()+"\",\"_type\":\"material\"}} 
" ); bw.append(JSON.toJSONString(materialesMap)+"
"); bw.close(); fw.close(); materialSize ++; allMaterialSize++; } // execCommand("curl -XPOST "+esConfig.getAnalyzerServerIp()+"/_bulk --data-binary @material"+pageSize+".json","./logs/material"+pageSize+".log"); } catch (Exception e) { log.info(" :"+pageSize); e.printStackTrace(); } log.info(" :mongodb :"+materialSize); materialList.clear(); } long endTime = System.currentTimeMillis(); log.info("=== :"+DateUtil.getDistanceTime(stertTime,endTime) +"=== :"+allMaterialSize); } /** * curl * @param cmd * @param logPath */ public void execCommand(String cmd,String logPath) { try { log.info(" shell , es "); long startTime = System.currentTimeMillis(); Runtime rt = Runtime.getRuntime(); Process proc = rt.exec(cmd,null,null); InputStream stderr = proc.getInputStream(); InputStreamReader isr = new InputStreamReader(stderr, "GBK"); BufferedReader br = new BufferedReader(isr); String line = ""; int lineInt = 0; FileWriter fw = new FileWriter(logPath, true); BufferedWriter bw = new BufferedWriter(fw); while ((line = br.readLine()) != null) { lineInt++; bw.append(line+"
"); } bw.close(); fw.close(); long endTime = System.currentTimeMillis(); log.info(" shell , es , :"+(endTime-startTime)+"== :"+lineInt); stderr.close(); isr.close(); br.close(); } catch (Exception e) { e.printStackTrace(); } } }

3. 만 나 는 구덩이 & 최적화 공간    1. json 파일 의 크기 는 es 군집 설정 의 bulk. queue 보다 크 면 안 됩 니 다.size 매개 변수 크기 (프로필 기본 설정 은 10M)         1. json 파일 의 크기 를 제어 할 수 있 습 니 다. 제한 크기 를 초과 하면 다음 파일 에 데 이 터 를 저장 하여 일괄 제출 파일 의 크기 를 합 리 적 인 범위 내 에서 확보 합 니 다.    2. 코드 에서 일괄 제출 에 실패 한 내용 을 확인 하여 기록 하고 마지막 으로 다시 시도 합 니 다.    3. 대량 작업 에 실패 하지 않 으 면 생 성 된 json 파일 을 삭제 해 야 합 니 다.     총화    하 는 과정 에서 도 여러 가지 방법 을 시도 하고 있다. 검 은 고양이 와 흰 고양 이 는 먼저 쥐 를 잡 아 현재 의 문 제 를 해결한다.그리고 후속 효율 과 최적화 에 관 한 문 제 를 고려한다.    편집장 의 총 결 이 독자 들 에 게 도움 이 되 기 를 바란다.

좋은 웹페이지 즐겨찾기