elasticsearch는 어떤 집단의 인덱스를 다른 집단으로 가져옵니다.

6175 단어 elasticsearch
elasticsearch를 사용하는 과정에서 많은 친구들이 이런 요구를 했을 것이다. 바로 어떤 집단의 인덱스를 다른 집단에 가져오는 것이다. 이런 상황은 두 가지 방법이 있다. 하나는 바로 파일을 직접 복사하는 것이다. 간단하고 난폭하지만 많은 폐단이 있다. 예를 들어 파일이 매우 크면 복사하는 것도 번거롭고 복사하는 과정에서도 혼란스럽고 낮은 편이다.마지막으로 복사한 후에 그룹을 다시 시작해야 하는 것은 유연하지 않다.오늘 내가 주로 말한 것은 두 번째 방식이다. 스크롤을 통해 처리하는 것이다.
처리할 때 필요한 매개 변수는 원본 인덱스, 원본 집단 이름, 원본 IP, 목적 인덱스, 목적 집단 이름, 목적 IP, 인덱스 유형이다.원본 settings, 목적 settings를 구축하여 처리합니다.
구체적으로 아래의 코드를 참고하십시오. (이 코드는 2.x에서 2.x 버전에 적용되며, 다른 버전이 있다면, es와 관련된 클래스 이름만 수정하면 적용됩니다. 변경이 있으면)
또한 테스트를 통해 효율도 괜찮고 한 시간에 5천만 원 정도의 데이터를 내보낼 수 있으며 데이터의 양이 크지 않으면 더욱 빠를 것이다
됐어. 쓸데없는 소리 하지 말고 바로 코드를 올려.
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;

/**
 * java -cp "/root/es/lib/*" org.elasticsearch.ScrollCreateIndex 
 * -argInfo "a;c1;ip;b;c2;ip;ddd"
 *
 */
public class ScrollCreateIndex {

    public static void main(String[] args) throws Exception {
    	String sourceIdx = null;
        String sourceClustName = null;
        String sourceIp = null;
        String destiIdx = null;
        String destiClustName = null;
        String destiIp = null;
        String idxType = null;
        
        String argInfos = null;
        for (int i = 0; i < args.length; i++) {
            if (args[i].equals("-argInfo")) {
            	argInfos = args[(i + 1)];
            }
        }
        String[] arg_info_arr = argInfos.split(";");
        sourceIdx = arg_info_arr[0];
        sourceClustName = arg_info_arr[1];
        sourceIp = arg_info_arr[2];
        
        destiIdx = arg_info_arr[3];
        destiClustName = arg_info_arr[4];
        destiIp = arg_info_arr[5];
        idxType = arg_info_arr[6];
        executeScrollCreate(sourceIdx,sourceClustName,sourceIp,destiIdx,destiClustName,destiIp,idxType);
    }
    
    public static void executeScrollCreate(String indexName,String clustName,String sourceIp,
    		String destiIndexName,String destiClustName,String destiIp,String idxType) throws Exception{
    	//build source settings
    	Settings settings = Settings.settingsBuilder()
    			.put("cluster.name", clustName).put("client.transport.sniff", true)
    			.put("client.transport.ping_timeout", "30s")
    			.put("client.transport.nodes_sampler_interval", "30s").build();
    	TransportClient client = TransportClient.builder().settings(settings).build();
    	client.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(sourceIp, 9300)));
    	
    	//build destination settings
    	Settings destiSettings = Settings.settingsBuilder()
    			.put("cluster.name", destiClustName).put("client.transport.sniff", true)
    			.put("client.transport.ping_timeout", "30s")
    			.put("client.transport.nodes_sampler_interval", "30s").build();
    	TransportClient destiClient = TransportClient.builder().settings(destiSettings).build();
    	destiClient.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress(destiIp, 9300)));
    	
    	SearchResponse scrollResp = client.prepareSearch(indexName)
    			.setScroll(new TimeValue(90000)).setSize(1000).execute().actionGet();
    	
    	//build destination bulk
    	BulkRequestBuilder bulk = destiClient.prepareBulk();
    	
    	ExecutorService executor = Executors.newFixedThreadPool(5);
    	while(true){
    		bulk = destiClient.prepareBulk();
    		final BulkRequestBuilder bulk_new = bulk;
    		for(SearchHit hit : scrollResp.getHits().getHits()){
    			IndexRequest req = destiClient.prepareIndex().setIndex(destiIndexName).setType(idxType).setSource(hit.getSourceAsString()).request();
    			bulk_new.add(req);
    		}
    		executor.execute(new Runnable() {
				@Override
				public void run() {
					bulk_new.execute();
				}
			});
    		
    		Thread.sleep(10);
    		
    		scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
    				.setScroll(new TimeValue(90000)).execute().actionGet();
    		if(scrollResp.getHits().getHits().length == 0){
    			break;
    		}
    	}
    	
    }

}

위의 코드는 주석에 있는 자바-cp 명령을 통해 직접 실행할 수 있지만 스크립트를 통해 백엔드에서 실행하는 것이 좋습니다. 끝나면 자바 프로세스를 자동으로 키울 수 있습니다. 본인이 사용하는 과정에서 스크립트를 통해 호출됩니다. 여기서도 스크립트를 붙여서 유용하게 사용할 수 있기를 바랍니다.
#!/bin/bash

source_idx=$1
source_clustername=$2
source_ip=$3
desti_idx=$4
desti_clustername=$5
desti_ip=$6
idx_type=$7

curl -sXPUT "http://$desti_ip:9200/create_index_action/$desti_idx/?master_timeout=2m"
while true ; do
    stat=$(curl -sXGET http://$desti_ip:9200/_cat/health | awk '{print $4}')
    if [ "$stat" = "green" ] ; then
	break
    fi
done

java -cp "/root/es/root/lib/*" org.elasticsearch.ScrollCreateIndex -argInfo "$1;$2;$3;$4;$5;$6;$7" &

pid="$!"
source_docnum=$(curl -sXGET http://$source_ip:9200/_cat/indices/$source_idx | awk '{print $6}')
temp_docnum=0
same_num=0
while true ; do
    desti_docnum=$(curl -sXGET http://$desti_ip:9200/_cat/indices/$desti_idx | awk '{print $6}')
    [[ ! "$desti_docnum" ]] && desti_docnum=0
    if [ "$desti_docnum" = "$temp_docnum" ] ; then
	same_num=$(expr $same_num + 1)
    else
	same_num=0
    fi
    if [ "$same_num" -gt 3 ] ; then
	break
    fi
    temp_docnum=$desti_docnum
    dif_value=$(expr $source_docnum - $desti_docnum)
    if [[ -n "$dif_value" && "$dif_value" -lt 10000 ]] ; then
	sleep 10
        kill -9 $pid
	break
    fi
    sleep 100
done
echo "finish"

이상은 온전한 과정입니다. 필요한 친구들에게 도움이 되었으면 좋겠습니다. 물론 문제가 있으면 함께 토론할 수 있습니다.

좋은 웹페이지 즐겨찾기