pyspark 몽골 DB 조작 방법 절차
데 이 터 는 여러 가지 형식 이 있 을 수 있 습 니 다.흔히 볼 수 있 는 것 은 HDFS 이지 만 Python 파충류 에서 데이터 베 이 스 를 많이 사용 하 는 것 은 MongoDB 이기 때문에 spark 로 MongoDB 의 데 이 터 를 가 져 오 는 방법 에 중점 을 두 겠 습 니 다.
물론,우선 자신의 컴퓨터 에 spark 환경 을 설치 해 야 합 니 다.쉽게 말 하면 여기spark 다운로드에 JAVA,Scala 환경 을 설정 해 야 합 니 다.
Jupyter notebook 을 사용 하 는 것 을 권장 합 니 다.환경 변수 에 이렇게 설정 하 는 것 이 편리 합 니 다.
PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
만약 당신 의 환경 에 여러 개의 Python 버 전이 있다 면,당신 이 사용 하고 자 하 는 해석 기 를 만 들 수 있 습 니 다.저 는 python 36 입 니 다.수요 에 따라 수정 할 수 있 습 니 다.
PYSPARK_PYTHON=/usr/bin/python36
pyspark 가 mongo 데이터베이스 에 대한 기본 동작(๑•๑)
몇 가지 주의해 야 할 것 이 있다.
pip3 install pyspark==2.3.2
spark-connector
일반적인 MongoDB 쓰기 와 달리 형식 은 다음 과 같다.mongodb://127.0.0.1:database.collection
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: zhangslob
@file: spark_count.py
@time: 2019/01/03
@desc:
pyspark
`pip3 install pyspark==2.3.2`
pyspark MongoDB https://docs.mongodb.com/spark-connector/master/python-api/
"""
import os
from pyspark.sql import SparkSession
# set PYSPARK_PYTHON to python36
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python36'
# load mongodb data
# :"mongodb://127.0.0.1:database.collection"
input_uri = "mongodb://127.0.0.1:27017/spark.spark_test"
output_uri = "mongodb://127.0.0.1:27017/spark.spark_test"
# spark, , "spark://master:7077"
spark = SparkSession \
.builder \
.master("local") \
.appName("MyApp") \
.config("spark.mongodb.input.uri", input_uri) \
.config("spark.mongodb.output.uri", output_uri) \
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.2.0') \
.getOrCreate()
def except_id(collection_1, collection_2, output_collection, pipeline):
"""
1 2
:param collection_1: 1
:param collection_2: 2
:param output_collection:
:param pipeline: MongoDB str
:return:
"""
# , input_uri。
# .option("collection", "mongodb://127.0.0.1:27017/spark.spark_test")
# .option("database", "people").option("collection", "contacts")
df_1 = spark.read.format('com.mongodb.spark.sql.DefaultSource').option("collection", collection_1) \
.option("pipeline", pipeline).load()
df_2 = spark.read.format('com.mongodb.spark.sql.DefaultSource').option("collection", collection_2) \
.option("pipeline", pipeline).load()
# df_1 df_2, df_2 ,df_1
df = df_1.subtract(df_2)
df.show()
# mode
# * `append`: Append contents of this :class:`DataFrame` to existing data.
# * `overwrite`: Overwrite existing data.
# * `error` or `errorifexists`: Throw an exception if data already exists.
# * `ignore`: Silently ignore this operation if data already exists.
df.write.format("com.mongodb.spark.sql.DefaultSource").option("collection", output_collection).mode("append").save()
spark.stop()
if __name__ == '__main__':
# mongodb query, MongoDB ,
pipeline = "[{'$project': {'uid': 1, '_id': 0}}]"
collection_1 = "spark_1"
collection_2 = "spark_2"
output_collection = 'diff_uid'
except_id(collection_1, collection_2, output_collection, pipeline)
print('success')
전체 코드 주소:spark_count_diff_uid.py이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
jupyter notebook pyspark 사용 (mac)spark download 설치후에 버전확인. pyspark 실행시 jupyter로 접속하도록 환경변수 설정 이 후 pyspark 입력했을때 jupyter notebook 켜지면 성공 pyspark 테스트 노트파일 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.