pyspark Mysql 데이터베이스 읽 기와 쓰기 실현

pyspark 는 Spark 가 Python 에 대한 api 인터페이스 로 Python 환경 에서 pyspark 모듈 을 호출 하여 spark 를 조작 하여 빅 데이터 프레임 워 크 에서 의 데이터 분석 과 발굴 을 완성 할 수 있 습 니 다.그 중에서 데이터 의 읽 기와 쓰 기 는 기본 작업 이 고 pyspark 의 서브 모듈 pyspark.sql 은 대부분의 유형의 데이터 읽 기와 쓰 기 를 완성 할 수 있 습 니 다.텍스트 는 pyspark 에서 Mysql 데이터 베 이 스 를 읽 고 쓰 는 것 을 소개 합 니 다.
소프트웨어 버 전
Python 에서 Spark 를 사용 하려 면 설정 Spark 를 설치 해 야 합 니 다.설정 과정 을 건 너 뛰 고 실행 환경 과 관련 프로그램 버 전 정 보 를 제공 합 니 다.
  • win10 64bit
  • java 13.0.1
  • spark 3.0
  • python 3.8
  • pyspark 3.0
  • pycharm 2019.3.4
  • 2 환경 설정
    pyspark Mysql 연결 은 자바 로 이 루어 지기 때문에 Mysql 에 연 결 된 jar 패 키 지 를 다운로드 해 야 합 니 다.
    다운로드 주소
    在这里插入图片描述
    다운로드Connector/J를 선택 한 다음 운영 체 제 를Platform Independent로 선택 하고 압축 패 키 지 를 로 컬 로 다운로드 합 니 다.
    在这里插入图片描述
    그리고 압축 파일 을 풀 고 그 중의 jar 패키지mysql-connector-java-8.0.19.jar를 spark 의 설치 디 렉 터 리 에 넣 습 니 다.예 를 들 어D:\spark\spark-3.0.0-preview2-bin-hadoop2.7\jars.
    在这里插入图片描述
    환경 설정 완료!
    3 Mysql 읽 기
    스 크 립 트 는 다음 과 같 습 니 다:
    
    from pyspark.sql import SQLContext, SparkSession
    
    if __name__ == '__main__':
      # spark    
      spark = SparkSession. \
        Builder(). \
        appName('sql'). \
        master('local'). \
        getOrCreate()
      # mysql   (    )
      prop = {'user': 'xxx', 
          'password': 'xxx', 
          'driver': 'com.mysql.cj.jdbc.Driver'}
      # database   (    )
      url = 'jdbc:mysql://host:port/database'
      #    
      data = spark.read.jdbc(url=url, table='tb_newCity', properties=prop)
      #   data    
      print(type(data))
      #     
      data.show()
      #   spark  
      spark.stop()
  • 주의 점:
  • prop매개 변 수 는 실제 상황 에 따라 수정 해 야 한다.글 에서 사용자 이름과 비밀 번 호 는 xxx 로 대체 되 고driver매개 변수 도 필요 하지 않 을 수 있다.
  • url매개 변 수 는 실제 상황 에 따라 수정 해 야 하고 형식 은jdbc:mysql:// : / 이다.
  • 호출 방법read.jdbc을 통 해 읽 고 되 돌아 오 는 데이터 형식 은 spark DataFrame 입 니 다.
  • 스 크 립 트 를 실행 합 니 다.출력 은 다음 과 같 습 니 다.
    在这里插入图片描述
    4 Mysql 에 쓰기
    스 크 립 트 는 다음 과 같 습 니 다:
    
    import pandas as pd
    from pyspark import SparkContext
    from pyspark.sql import SQLContext, Row
    
    if __name__ == '__main__':
      # spark    
      sc = SparkContext(master='local', appName='sql')
      spark = SQLContext(sc)
      # mysql   (    )
      prop = {'user': 'xxx',
          'password': 'xxx',
          'driver': 'com.mysql.cj.jdbc.Driver'}
      # database   (    )
      url = 'jdbc:mysql://host:port/database'
    
      #   spark DataFrame
      #   1:list spark DataFrame
      l = [(1, 12), (2, 22)]
      #        
      list_df = spark.createDataFrame(l, schema=['id', 'value']) 
      
      #   2:rdd spark DataFrame
      rdd = sc.parallelize(l) # rdd
      col_names = Row('id', 'value') #   
      tmp = rdd.map(lambda x: col_names(*x)) #     
      rdd_df = spark.createDataFrame(tmp) 
      
      #   3:pandas dataFrame  spark DataFrame
      df = pd.DataFrame({'id': [1, 2], 'value': [12, 22]})
      pd_df = spark.createDataFrame(df)
    
      #      
      pd_df.write.jdbc(url=url, table='new', mode='append', properties=prop)
      #   spark  
      sc.stop()
    주의 점:propurl매개 변 수 는 실제 상황 에 따라 수정 해 야 한다.
    데이터 베 이 스 를 기록 하 는 데 필요 한 대상 유형 은 spark DataFrame 로 세 가지 흔 한 데이터 형식 을 spark DataFrame 로 바 꾸 는 방법 을 제공 합 니 다.
    호출write.jdbc방법 으로 기록 합 니 다.그 중의model매개 변 수 는 데 이 터 를 기록 하 는 행 위 를 제어 합 니 다.
    model
    매개 변수 해석
    error
    기본 값,원래 표 가 존재 하면 오 류 를 보고 합 니 다.
    ignore
    원본 표 가 존재 합 니 다.잘못 보고 하지 않 고 데 이 터 를 기록 하지 않 습 니 다.
    append
    새 데 이 터 는 원래 표 줄 끝 에 추가 합 니 다.
    overwrite
    원본 테이블 덮어 쓰기
    5.자주 오 류 를 보고 합 니 다.
    Access denied for user …
    在这里插入图片描述
    원인:mysql 설정 매개 변수 오류
    해결 방법:user,password 맞 춤 법 을 검사 하고 계 정 비밀번호 가 정확 한 지 확인 하 며 다른 도구 로 my sql 이 정상적으로 연결 되 는 지 비교 검 사 를 합 니 다.
    No suitable driver

    원인:실행 환경 이 설정 되 지 않 았 습 니 다.
    해결 방법:jar 패 키 지 를 다운로드 하여 설정 하고 구체 적 인 과정 은 본 고의 2 환경 설정 을 참고 합 니 다.
    pyspark 가 Mysql 데이터베이스 에 대한 읽 기와 쓰기 실현 에 관 한 이 글 은 여기까지 소개 되 었 습 니 다.더 많은 관련 pyspark Mysql 읽 기와 쓰기 내용 은 우리 의 이전 글 을 검색 하거나 아래 의 관련 글 을 계속 조회 하 시기 바 랍 니 다.앞으로 많은 응원 바 랍 니 다!

    좋은 웹페이지 즐겨찾기