Python 은 다 중 프로 세 스 를 구현 하여 CSV 데 이 터 를 MySQL 로 가 져 옵 니 다.

얼마 전에 동료 들 에 게 CSV 데 이 터 를 MySQL 로 가 져 오 라 는 요 구 를 처리 해 주 었 습 니 다.큰 CSV 파일 2 개 는 각각 3GB,2 천 100 만 개,7GB,3 천 500 만 개가 기록 됐다.이 급 의 데 이 터 는 간단 한 단일 프로 세 스/단일 스 레 드 로 가 져 오 는 데 오래 걸 리 며,최종 적 으로 다 중 프로 세 스 방식 으로 이 루어 집 니 다.구체 적 인 과정 은 군더더기 없 이 몇 가지 요점 을 기록 하 세 요.
  • 일괄 삽입 이 아니 라 일괄 삽입
  • 삽입 속 도 를 높이 기 위해 색인 을 만 들 지 마 십시오
  • 생산자 와 소비자 모델,메 인 프로 세 스 가 파일 을 읽 고 여러 worker 프로 세 스 가 삽입
  • worker 의 수량 을 조절 하여 MySQL 에 너무 큰 압력 을 주지 않도록 주의 하 세 요
  • 더러 운 데이터 로 인 한 이상 처리 에 주의 하 세 요
  • 원본 데 이 터 는 GBK 인 코딩 이 므 로 UTF-8
  • 로 전환 하 는 데 도 주의해 야 한다.
  • 클릭 으로 명령 행 도구 봉인
  • 구체 적 인 코드 는 다음 과 같다.
    
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import codecs
    import csv
    import logging
    import multiprocessing
    import os
    import warnings
    
    import click
    import MySQLdb
    import sqlalchemy
    
    warnings.filterwarnings('ignore', category=MySQLdb.Warning)
    
    #          
    BATCH = 5000
    
    DB_URI = 'mysql://root@localhost:3306/example?charset=utf8'
    
    engine = sqlalchemy.create_engine(DB_URI)
    
    
    def get_table_cols(table):
      sql = 'SELECT * FROM `{table}` LIMIT 0'.format(table=table)
      res = engine.execute(sql)
      return res.keys()
    
    
    def insert_many(table, cols, rows, cursor):
      sql = 'INSERT INTO `{table}` ({cols}) VALUES ({marks})'.format(
          table=table,
          cols=', '.join(cols),
          marks=', '.join(['%s'] * len(cols)))
      cursor.execute(sql, *rows)
      logging.info('process %s inserted %s rows into table %s', os.getpid(), len(rows), table)
    
    
    def insert_worker(table, cols, queue):
      rows = []
      #            engine   
      cursor = sqlalchemy.create_engine(DB_URI)
      while True:
        row = queue.get()
        if row is None:
          if rows:
            insert_many(table, cols, rows, cursor)
          break
    
        rows.append(row)
        if len(rows) == BATCH:
          insert_many(table, cols, rows, cursor)
          rows = []
    
    
    def insert_parallel(table, reader, w=10):
      cols = get_table_cols(table)
    
      #     ,            ,worker         
      #            ,              ,      
      queue = multiprocessing.Queue(maxsize=w*BATCH*2)
      workers = []
      for i in range(w):
        p = multiprocessing.Process(target=insert_worker, args=(table, cols, queue))
        p.start()
        workers.append(p)
        logging.info('starting # %s worker process, pid: %s...', i + 1, p.pid)
    
      dirty_data_file = './{}_dirty_rows.csv'.format(table)
      xf = open(dirty_data_file, 'w')
      writer = csv.writer(xf, delimiter=reader.dialect.delimiter)
    
      for line in reader:
        #         :        
        if len(line) != len(cols):
          writer.writerow(line)
          continue
    
        #   None      'NULL'
        clean_line = [None if x == 'NULL' else x for x in line]
    
        #        
        queue.put(tuple(clean_line))
        if reader.line_num % 500000 == 0:
          logging.info('put %s tasks into queue.', reader.line_num)
    
      xf.close()
    
      #     worker          
      logging.info('send close signal to worker processes')
      for i in range(w):
        queue.put(None)
    
      for p in workers:
        p.join()
    
    
    def convert_file_to_utf8(f, rv_file=None):
      if not rv_file:
        name, ext = os.path.splitext(f)
        if isinstance(name, unicode):
          name = name.encode('utf8')
        rv_file = '{}_utf8{}'.format(name, ext)
      logging.info('start to process file %s', f)
      with open(f) as infd:
        with open(rv_file, 'w') as outfd:
          lines = []
          loop = 0
          chunck = 200000
          first_line = infd.readline().strip(codecs.BOM_UTF8).strip() + '
    ' lines.append(first_line) for line in infd: clean_line = line.decode('gb18030').encode('utf8') clean_line = clean_line.rstrip() + '
    ' lines.append(clean_line) if len(lines) == chunck: outfd.writelines(lines) lines = [] loop += 1 logging.info('processed %s lines.', loop * chunck) outfd.writelines(lines) logging.info('processed %s lines.', loop * chunck + len(lines)) @click.group() def cli(): logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s') @cli.command('gbk_to_utf8') @click.argument('f') def convert_gbk_to_utf8(f): convert_file_to_utf8(f) @cli.command('load') @click.option('-t', '--table', required=True, help=' ') @click.option('-i', '--filename', required=True, help=' ') @click.option('-w', '--workers', default=10, help='worker , 10') def load_fac_day_pro_nos_sal_table(table, filename, workers): with open(filename) as fd: fd.readline() # skip header reader = csv.reader(fd) insert_parallel(table, reader, w=workers) if __name__ == '__main__': cli()
    이상 은 본문 이 여러분 에 게 공유 하 는 모든 사람 이 없습니다.여러분 들 이 좋아 하 시 기 를 바 랍 니 다.

    좋은 웹페이지 즐겨찾기