python 에서 pyspark 읽 기 쓰기 Hive 데이터 조작 사용 하기
pyspark 는 hive 데 이 터 를 읽 는 것 이 매우 간단 합 니 다.전문 적 인 인터페이스 가 있 기 때문에 hbase 처럼 많은 설정 이 필요 하지 않 습 니 다.pyspark 가 제공 하 는 조작 hive 인 터 페 이 스 는 프로그램 이 직접 SQL 문 구 를 사용 하여 hive 에서 필요 한 데 이 터 를 조회 할 수 있 습 니 다.코드 는 다음 과 같 습 니 다.
from pyspark.sql import HiveContext,SparkSession
_SPARK_HOST = "spark://spark-master:7077"
_APP_NAME = "test"
spark_session = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate()
hive_context= HiveContext(spark_session )
# SQL , hive , where
hive_database = "database1"
hive_table = "test"
hive_read = "select * from {}.{}".format(hive_database, hive_table)
# SQL hive dataframe
read_df = hive_context.sql(hive_read)
2.데 이 터 를 hive 표 에 기록 합 니 다.pyspark 는 hive 표를 쓰 는 데 두 가지 방법 이 있 습 니 다.
(1)SQL 문 구 를 통 해 표 생 성
from pyspark.sql import SparkSession, HiveContext
_SPARK_HOST = "spark://spark-master:7077"
_APP_NAME = "test"
spark = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate()
data = [
(1,"3","145"),
(1,"4","146"),
(1,"5","25"),
(1,"6","26"),
(2,"32","32"),
(2,"8","134"),
(2,"8","134"),
(2,"9","137")
]
df = spark.createDataFrame(data, ['id', "test_id", 'camera_id'])
# method one,default ,write_test default
df.registerTempTable('test_hive')
sqlContext.sql("create table default.write_test select * from test_hive")
(2)saveastable 방식
# method two
# "overwrite" , , ,
# mode("append")
df.write.format("hive").mode("overwrite").saveAsTable('default.write_test')
tips:spark 는 위의 몇 가지 방식 으로 hive 를 읽 고 쓸 때 작업 을 제출 할 때 해당 하 는 설정 을 추가 해 야 합 니 다.그렇지 않 으 면 오류 가 발생 할 수 있 습 니 다.
spark-submit --conf spark.sql.catalogImplementation=hive test.py
추가 지식:PySpark 는 SHC 프레임 워 크 를 기반 으로 HBase 데 이 터 를 읽 고 DataFrame 로 전환
첫째,우선 HBase 디 렉 터 리 lib 의 jar 패키지 와 SHC 의 jar 패 키 지 를 모든 노드 의 Spark 디 렉 터 리 lib 에 복사 해 야 합 니 다.
2.spark-defaults.conf 를 수정 하여 spark.driver.extraclassPath 와 spark.executor.extraclassPath 에 상기 jar 가방 이 있 는 경 로 를 추가 합 니 다.
3.클 러 스 터 재 개
코드
#/usr/bin/python
#-*- coding:utf-8 C*-
from pyspark import SparkContext
from pyspark.sql import SQLContext,HiveContext,SparkSession
from pyspark.sql.types import Row,StringType,StructField,StringType,IntegerType
from pyspark.sql.dataframe import DataFrame
sc = SparkContext(appName="pyspark_hbase")
sql_sc = SQLContext(sc)
dep = "org.apache.spark.sql.execution.datasources.hbase"
# schema
catalog = """{
"table":{"namespace":"default", "name":"teacher"},
"rowkey":"key",
"columns":{
"id":{"cf":"rowkey", "col":"key", "type":"string"},
"name":{"cf":"teacherInfo", "col":"name", "type":"string"},
"age":{"cf":"teacherInfo", "col":"age", "type":"string"},
"gender":{"cf":"teacherInfo", "col":"gender","type":"string"},
"cat":{"cf":"teacherInfo", "col":"cat","type":"string"},
"tag":{"cf":"teacherInfo", "col":"tag", "type":"string"},
"level":{"cf":"teacherInfo", "col":"level","type":"string"} }
}"""
df = sql_sc.read.options(catalog = catalog).format(dep).load()
print ('***************************************************************')
print ('***************************************************************')
print ('***************************************************************')
df.show()
print ('***************************************************************')
print ('***************************************************************')
print ('***************************************************************')
sc.stop()
해명데이터 출처 는 본인 의 이전 글 을 참고 하 시기 바 랍 니 다.여기 서 는 군말 하지 않 겠 습 니 다.
schema 정의 참조 그림:
결국
이 편 은 python 에서 pyspark 를 사용 하여 Hive 데 이 터 를 읽 고 쓰 는 작업 이 바로 편집장 이 여러분 에 게 공유 하 는 모든 내용 입 니 다.참고 하 시기 바 랍 니 다.여러분 들 도 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
로마 숫자를 정수로 또는 그 반대로 변환그 중 하나는 로마 숫자를 정수로 변환하는 함수를 만드는 것이었고 두 번째는 그 반대를 수행하는 함수를 만드는 것이었습니다. 문자만 포함합니다'I', 'V', 'X', 'L', 'C', 'D', 'M' ; 문자열이 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.