[ES 시리즈 9] - ES 에 데 이 터 를 대량으로 동기 화 합 니 다.
8181 단어 Elastic Search 연구
과정 1.1. es 홈 페이지 에서 제공 하 는 bulk 방법 으로 이 루어 집 니 다. 코드 는 다음 과 같 습 니 다.
package com.yunshi.timedtask.dao;
import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Component
public class TestElasticSearch4J {
private static RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("es1.yunshicloud.com", 9200, "http")
));
public static void main(String[] args) throws IOException {
TestElasticSearch4J testElasticSearch4J = new TestElasticSearch4J();
List
1.2. 이 방법 은 본인 이 테스트 효율 이 없다. 2.1. 조회 데 이 터 를 통 해 json 파일 에 기록 하고 구조 형식 은 다음 과 같다. {"index":{"_index":"dev-rms-resource","_type":"material"}} {"id":"123","name":"aaa","url":"https://123123"} 2.2. curl 명령 을 통 해 일괄 제출 작업 curl -XPOST http://192.168.1.211:9200/_bulk --data-binary @material.json 2.3. 제출 후 되 돌아 오 는 모든 상 태 를 파일 에 기록 하고 다음 Liux 명령 을 통 해 실 패 했 는 지 확인 합 니 다. grep -rn "400" * 2.4. 테스트 효율 은 다음 과 같다. 160 만 데이터, 총 크기 2.4G, 대량 매번 2 만 전송, 약 8 분 정도 소요 (mongodb 읽 기 시간 포함) 54 만 데이터, 총 크기 6.4G, 대량 매번 1 천 전송, 약 30 분 정도 걸 립 니 다. 2.4. 자바 코드 구현
package com.yunshi.timedtask.scheduler;
import com.alibaba.fastjson.JSON;
import com.yunshi.timedtask.common.Constants;
import com.yunshi.timedtask.dao.BasicDao;
import com.yunshi.timedtask.dao.IEsBasicDao;
import com.yunshi.timedtask.dao.TestElasticSearch4J;
import com.yunshi.timedtask.dao.es.ESConfig;
import com.yunshi.timedtask.domain.*;
import com.yunshi.timedtask.util.DateUtil;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import java.io.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by makang on 2019 12 30 .
*/
public class InitDataTaskService implements Runnable {
private final Logger log = LoggerFactory.getLogger(InitDataTaskService.class);
// bean
private BasicDao basicDao;
private IEsBasicDao iEsBasicDao;
private ESConfig esConfig;
private int pageNum;
// ,
public InitDataTaskService(BasicDao basicDao, IEsBasicDao iEsBasicDao, ESConfig esConfig, int pageNum){
this.basicDao = basicDao;
this.iEsBasicDao = iEsBasicDao;
this.esConfig = esConfig;
this.pageNum = pageNum;
}
// Runnable run
@Override
public void run(){
//
long stertTime = System.currentTimeMillis();
Map querymaterialAll = new HashMap(1);
Map sortFilter = new HashMap(1);
int pageNum = this.pageNum;
// 、
long materialAllSize = basicDao.count("material",querymaterialAll);
log.info(" , :"+materialAllSize);
//1.1.
long allMaterialSize = 0;
for (int pageSize=1;((pageSize-1)*pageNum*20)> materialList = basicDao.find("material",sortFilter,querymaterialAll,pageSize,pageNum*20);
try {
String filePath = "./material"+pageSize+".json";
for (Map materialMap:materialList) {
Map materialesMap = new HashMap<>();
materialesMap.putAll(materialMap);
// json
FileWriter fw = new FileWriter(filePath, true);
BufferedWriter bw = new BufferedWriter(fw);
bw.append("{\"index\":{\"_index\":\""+esConfig.getIndexname()+"\",\"_type\":\"material\"}}
" );
bw.append(JSON.toJSONString(materialesMap)+"
");
bw.close();
fw.close();
materialSize ++;
allMaterialSize++;
}
//
execCommand("curl -XPOST "+esConfig.getAnalyzerServerIp()+"/_bulk --data-binary @material"+pageSize+".json","./logs/material"+pageSize+".log");
} catch (Exception e) {
log.info(" :"+pageSize);
e.printStackTrace();
}
log.info(" :mongodb :"+materialSize);
materialList.clear();
}
long endTime = System.currentTimeMillis();
log.info("=== :"+DateUtil.getDistanceTime(stertTime,endTime)
+"=== :"+allMaterialSize);
}
/**
* curl
* @param cmd
* @param logPath
*/
public void execCommand(String cmd,String logPath) {
try {
log.info(" shell , es ");
long startTime = System.currentTimeMillis();
Runtime rt = Runtime.getRuntime();
Process proc = rt.exec(cmd,null,null);
InputStream stderr = proc.getInputStream();
InputStreamReader isr = new InputStreamReader(stderr, "GBK");
BufferedReader br = new BufferedReader(isr);
String line = "";
int lineInt = 0;
FileWriter fw = new FileWriter(logPath, true);
BufferedWriter bw = new BufferedWriter(fw);
while ((line = br.readLine()) != null) {
lineInt++;
bw.append(line+"
");
}
bw.close();
fw.close();
long endTime = System.currentTimeMillis();
log.info(" shell , es , :"+(endTime-startTime)+"== :"+lineInt);
stderr.close();
isr.close();
br.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
3. 만 나 는 구덩이 & 최적화 공간 1. json 파일 의 크기 는 es 군집 설정 의 bulk. queue 보다 크 면 안 됩 니 다.size 매개 변수 크기 (프로필 기본 설정 은 10M) 1. json 파일 의 크기 를 제어 할 수 있 습 니 다. 제한 크기 를 초과 하면 다음 파일 에 데 이 터 를 저장 하여 일괄 제출 파일 의 크기 를 합 리 적 인 범위 내 에서 확보 합 니 다. 2. 코드 에서 일괄 제출 에 실패 한 내용 을 확인 하여 기록 하고 마지막 으로 다시 시도 합 니 다. 3. 대량 작업 에 실패 하지 않 으 면 생 성 된 json 파일 을 삭제 해 야 합 니 다. 총화 하 는 과정 에서 도 여러 가지 방법 을 시도 하고 있다. 검 은 고양이 와 흰 고양 이 는 먼저 쥐 를 잡 아 현재 의 문 제 를 해결한다.그리고 후속 효율 과 최적화 에 관 한 문 제 를 고려한다. 편집장 의 총 결 이 독자 들 에 게 도움 이 되 기 를 바란다.