pyspark 몽골 DB 조작 방법 절차

데 이 터 를 가 져 오 는 방법
데 이 터 는 여러 가지 형식 이 있 을 수 있 습 니 다.흔히 볼 수 있 는 것 은 HDFS 이지 만 Python 파충류 에서 데이터 베 이 스 를 많이 사용 하 는 것 은 MongoDB 이기 때문에 spark 로 MongoDB 의 데 이 터 를 가 져 오 는 방법 에 중점 을 두 겠 습 니 다.
물론,우선 자신의 컴퓨터 에 spark 환경 을 설치 해 야 합 니 다.쉽게 말 하면 여기spark 다운로드에 JAVA,Scala 환경 을 설정 해 야 합 니 다.
Jupyter notebook 을 사용 하 는 것 을 권장 합 니 다.환경 변수 에 이렇게 설정 하 는 것 이 편리 합 니 다.
PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
만약 당신 의 환경 에 여러 개의 Python 버 전이 있다 면,당신 이 사용 하고 자 하 는 해석 기 를 만 들 수 있 습 니 다.저 는 python 36 입 니 다.수요 에 따라 수정 할 수 있 습 니 다.
PYSPARK_PYTHON=/usr/bin/python36
pyspark 가 mongo 데이터베이스 에 대한 기본 동작(๑•๑)

몇 가지 주의해 야 할 것 이 있다.
  • 최신 pyspark 버 전 을 설치 하지 마 십시오.pip3 install pyspark==2.3.2
  •   spark-connector일반적인 MongoDB 쓰기 와 달리 형식 은 다음 과 같다.mongodb://127.0.0.1:database.collection
  • 계산 데이터 양 이 많 으 면 컴퓨터 가 좀 걸 릴 수 있 습 니 다.^ ^
  • 
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    """
    @author: zhangslob
    @file: spark_count.py 
    @time: 2019/01/03
    @desc:
             pyspark  
      `pip3 install pyspark==2.3.2`
        pyspark  MongoDB  https://docs.mongodb.com/spark-connector/master/python-api/
    """
    
    import os
    from pyspark.sql import SparkSession
    
    # set PYSPARK_PYTHON to python36
    os.environ['PYSPARK_PYTHON'] = '/usr/bin/python36'
    
    # load mongodb data
    #    :"mongodb://127.0.0.1:database.collection"
    input_uri = "mongodb://127.0.0.1:27017/spark.spark_test"
    output_uri = "mongodb://127.0.0.1:27017/spark.spark_test"
    
    #   spark,        ,  "spark://master:7077"
    spark = SparkSession \
      .builder \
      .master("local") \
      .appName("MyApp") \
      .config("spark.mongodb.input.uri", input_uri) \
      .config("spark.mongodb.output.uri", output_uri) \
      .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.0') \
      .getOrCreate()
    
    
    def except_id(collection_1, collection_2, output_collection, pipeline):
      """
         1  2      
      :param collection_1:    1
      :param collection_2:    2
      :param output_collection:     
      :param pipeline: MongoDB     str
      :return:
      """
      #                ,          input_uri。         
      # .option("collection", "mongodb://127.0.0.1:27017/spark.spark_test")
      # .option("database", "people").option("collection", "contacts")
    
      df_1 = spark.read.format('com.mongodb.spark.sql.DefaultSource').option("collection", collection_1) \
        .option("pipeline", pipeline).load()
    
      df_2 = spark.read.format('com.mongodb.spark.sql.DefaultSource').option("collection", collection_2) \
        .option("pipeline", pipeline).load()
    
      # df_1      df_2,      df_2 ,df_1  
      df = df_1.subtract(df_2)
      df.show()
    
      # mode       
      # * `append`: Append contents of this :class:`DataFrame` to existing data.
      # * `overwrite`: Overwrite existing data.
      # * `error` or `errorifexists`: Throw an exception if data already exists.
      # * `ignore`: Silently ignore this operation if data already exists.
    
      df.write.format("com.mongodb.spark.sql.DefaultSource").option("collection", output_collection).mode("append").save()
      spark.stop()
    
    
    if __name__ == '__main__':
      # mongodb query, MongoDB    ,         
      pipeline = "[{'$project': {'uid': 1, '_id': 0}}]"
    
      collection_1 = "spark_1"
      collection_2 = "spark_2"
      output_collection = 'diff_uid'
      except_id(collection_1, collection_2, output_collection, pipeline)
      print('success')
    전체 코드 주소:spark_count_diff_uid.py
    이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.

    좋은 웹페이지 즐겨찾기