7rep - Insert Dataframe To Elasitcsearch

TSV (CSV)를 Elasicsearch에 넣는 코드 샘플


from elasticsearch import Elasticsearch
from elasticsearch import helpers
import pandas as pd
import datetime
import time
import json
import random
from pandas.io.json import json_normalize

# Elasticsearch
es = Elasticsearch("{ES_IP}")
INDEX = "{ES_Index_Name}"

fname="{FileName}"
reader = pd.read_csv(fname, chunksize=1000, sep='\t',low_memory = False)
df_all = reader.get_chunk() # chunkをdataframe

# json
df_lines = df_all.to_json(force_ascii=False, orient='records', lines=True)

# Bulk inser
actions = []
for i in iter(df_lines.split("\n")):
    v_json = json.loads(i)
    actions.append({
        "_index": INDEX,
        "_type": "{ES_Type}",
        "_source": v_json
    })

helpers.bulk(es, actions)
  • ES Mapping Conf
    > "format": "yyyy-MM-dd HH ss||yyyy-MM-dd||yyyyMMdd||epoch_millis"
    파이프 두 개를 연결하면 다른 날짜 형식을 지정할 수 있으며 모양이 다른 날짜에서도 Kibana에서 데리러
  • PUT hoge
    {
      "mappings": {
        "books": { 
          "properties": {
            "hoge1":     { "type": "integer"  },
            "hoge2":    { "type": "text"  }, 
            "hoge3":     { "type": "text"  },
            "hoge4":     { "type": "text"  },
            "hoge5":     { "type": "integer"  },
            "hoge6":     { "type": "text"  },         
            "hoge7":     { "type": "integer"  },
            "hoge8":     { "type": "text"  },
            "hoge9":     { "type": "text"  },
            "create_date":  {
              "type":   "date", 
              "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||yyyyMMdd||epoch_millis"
            }
          }
        }
      }
    }
    

    좋은 웹페이지 즐겨찾기