elasticsearch 가져오기 내보내기 CSV

솔직히 말하면, 이것은 첫번째python 프로그램이다. 비록 보기에는 매우 엉망으로 보이지만, 너는 안심해라. 나는 무독을 시험해 보았고, 게다가 실행 결과도 옳았다.
CSV 내보내기
import csv
import sys
import logging
import datetime 
from elasticsearch import Elasticsearch

reload(sys)  
sys.setdefaultencoding('gbk')
logging.basicConfig()
es = Elasticsearch()

def exportCSV(indexName):
    count = 0
    finish=False
    csvfile = file(indexName+'.csv','wb')
    writer = csv.writer(csvfile)
    starttime = datetime.datetime.now()
    searchRes = es.search(index=indexName,size=100,body={"query": {"match_all": {}}},search_type="scan",scroll="60s")
    while True:
        scrollRes=es.scroll(scroll_id=searchRes["_scroll_id"],scroll="60s",ignore=[400, 404])
        res_list = scrollRes["hits"]["hits"]
        data=[]
        
        if not len(res_list) or finish:
            break
        if count==0:
            writer.writerow(tuple(res_list[0]["_source"].keys()))
        for item in res_list:
            #print tuple(item["_source"].values())
            data.append(tuple(item["_source"].values()))
            count+=1
            if count>=100000:
                finish=True
                break

        writer.writerows(data)
    csvfile.close()
    endtime = datetime.datetime.now()
    print "export size = "+str(count)
    print "export cost = "+str(endtime - starttime)
    

if __name__=="__main__":
   exportCSV("test")

CSV 가져오기
# -*- coding:utf-8 -*- 
import csv
import sys
import os
import logging
import datetime 
from elasticsearch import Elasticsearch
from elasticsearch import helpers

reload(sys)  
sys.setdefaultencoding('gbk')
logging.basicConfig()
es = Elasticsearch()

def importCSV(indexName,typeName,fileName):
    if not os.path.exists(fileName):
        print "file not found"
        return
    actions=[]
    if not es.indices.exists(index=indexName,allow_no_indices=True):
        #print "not found index"
        es.indices.create(index=indexName,body={},ignore=400)
    for item in csv.DictReader(open(fileName, 'rb')):  
        actions.append({"_index":indexName,"_type":typeName,"_source":encoding(item)})
    res = helpers.bulk(es,actions,chunk_size=100)
    es.indices.flush(index=[indexName])
    return len(actions)

def encoding(item):
    for i in item:
        item[i]=str(item[i]).encode('utf-8')
    return item    

if __name__=="__main__":
    starttime = datetime.datetime.now()
    result=importCSV("test","base","test.csv")
    print "import size = "+str(result)
    endtime = datetime.datetime.now()
    print "import cost = "+str(endtime - starttime)

좋은 웹페이지 즐겨찾기