elasticsearch는 어떤 집단의 인덱스를 다른 집단으로 가져옵니다.
6175 단어 elasticsearch
처리할 때 필요한 매개 변수는 원본 인덱스, 원본 집단 이름, 원본 IP, 목적 인덱스, 목적 집단 이름, 목적 IP, 인덱스 유형이다.원본 settings, 목적 settings를 구축하여 처리합니다.
구체적으로 아래의 코드를 참고하십시오. (이 코드는 2.x에서 2.x 버전에 적용되며, 다른 버전이 있다면, es와 관련된 클래스 이름만 수정하면 적용됩니다. 변경이 있으면)
또한 테스트를 통해 효율도 괜찮고 한 시간에 5천만 원 정도의 데이터를 내보낼 수 있으며 데이터의 양이 크지 않으면 더욱 빠를 것이다
됐어. 쓸데없는 소리 하지 말고 바로 코드를 올려.
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
/**
* java -cp "/root/es/lib/*" org.elasticsearch.ScrollCreateIndex
* -argInfo "a;c1;ip;b;c2;ip;ddd"
*
*/
public class ScrollCreateIndex {
public static void main(String[] args) throws Exception {
String sourceIdx = null;
String sourceClustName = null;
String sourceIp = null;
String destiIdx = null;
String destiClustName = null;
String destiIp = null;
String idxType = null;
String argInfos = null;
for (int i = 0; i < args.length; i++) {
if (args[i].equals("-argInfo")) {
argInfos = args[(i + 1)];
}
}
String[] arg_info_arr = argInfos.split(";");
sourceIdx = arg_info_arr[0];
sourceClustName = arg_info_arr[1];
sourceIp = arg_info_arr[2];
destiIdx = arg_info_arr[3];
destiClustName = arg_info_arr[4];
destiIp = arg_info_arr[5];
idxType = arg_info_arr[6];
executeScrollCreate(sourceIdx,sourceClustName,sourceIp,destiIdx,destiClustName,destiIp,idxType);
}
public static void executeScrollCreate(String indexName,String clustName,String sourceIp,
String destiIndexName,String destiClustName,String destiIp,String idxType) throws Exception{
//build source settings
Settings settings = Settings.settingsBuilder()
.put("cluster.name", clustName).put("client.transport.sniff", true)
.put("client.transport.ping_timeout", "30s")
.put("client.transport.nodes_sampler_interval", "30s").build();
TransportClient client = TransportClient.builder().settings(settings).build();
client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(sourceIp, 9300)));
//build destination settings
Settings destiSettings = Settings.settingsBuilder()
.put("cluster.name", destiClustName).put("client.transport.sniff", true)
.put("client.transport.ping_timeout", "30s")
.put("client.transport.nodes_sampler_interval", "30s").build();
TransportClient destiClient = TransportClient.builder().settings(destiSettings).build();
destiClient.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(destiIp, 9300)));
SearchResponse scrollResp = client.prepareSearch(indexName)
.setScroll(new TimeValue(90000)).setSize(1000).execute().actionGet();
//build destination bulk
BulkRequestBuilder bulk = destiClient.prepareBulk();
ExecutorService executor = Executors.newFixedThreadPool(5);
while(true){
bulk = destiClient.prepareBulk();
final BulkRequestBuilder bulk_new = bulk;
for(SearchHit hit : scrollResp.getHits().getHits()){
IndexRequest req = destiClient.prepareIndex().setIndex(destiIndexName).setType(idxType).setSource(hit.getSourceAsString()).request();
bulk_new.add(req);
}
executor.execute(new Runnable() {
@Override
public void run() {
bulk_new.execute();
}
});
Thread.sleep(10);
scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
.setScroll(new TimeValue(90000)).execute().actionGet();
if(scrollResp.getHits().getHits().length == 0){
break;
}
}
}
}
위의 코드는 주석에 있는 자바-cp 명령을 통해 직접 실행할 수 있지만 스크립트를 통해 백엔드에서 실행하는 것이 좋습니다. 끝나면 자바 프로세스를 자동으로 키울 수 있습니다. 본인이 사용하는 과정에서 스크립트를 통해 호출됩니다. 여기서도 스크립트를 붙여서 유용하게 사용할 수 있기를 바랍니다.
#!/bin/bash
source_idx=$1
source_clustername=$2
source_ip=$3
desti_idx=$4
desti_clustername=$5
desti_ip=$6
idx_type=$7
curl -sXPUT "http://$desti_ip:9200/create_index_action/$desti_idx/?master_timeout=2m"
while true ; do
stat=$(curl -sXGET http://$desti_ip:9200/_cat/health | awk '{print $4}')
if [ "$stat" = "green" ] ; then
break
fi
done
java -cp "/root/es/root/lib/*" org.elasticsearch.ScrollCreateIndex -argInfo "$1;$2;$3;$4;$5;$6;$7" &
pid="$!"
source_docnum=$(curl -sXGET http://$source_ip:9200/_cat/indices/$source_idx | awk '{print $6}')
temp_docnum=0
same_num=0
while true ; do
desti_docnum=$(curl -sXGET http://$desti_ip:9200/_cat/indices/$desti_idx | awk '{print $6}')
[[ ! "$desti_docnum" ]] && desti_docnum=0
if [ "$desti_docnum" = "$temp_docnum" ] ; then
same_num=$(expr $same_num + 1)
else
same_num=0
fi
if [ "$same_num" -gt 3 ] ; then
break
fi
temp_docnum=$desti_docnum
dif_value=$(expr $source_docnum - $desti_docnum)
if [[ -n "$dif_value" && "$dif_value" -lt 10000 ]] ; then
sleep 10
kill -9 $pid
break
fi
sleep 100
done
echo "finish"
이상은 온전한 과정입니다. 필요한 친구들에게 도움이 되었으면 좋겠습니다. 물론 문제가 있으면 함께 토론할 수 있습니다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
kafka connect e elasticsearch를 관찰할 수 있습니다.No menu lateral do dashboard tem a opção de connectors onde ele mostra todos os clusters do kafka connect conectados atu...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.