python 에서 pyspark 읽 기 쓰기 Hive 데이터 조작 사용 하기

5363 단어 pythonpysparkHive
1.Hive 표 데이터 읽 기
pyspark 는 hive 데 이 터 를 읽 는 것 이 매우 간단 합 니 다.전문 적 인 인터페이스 가 있 기 때문에 hbase 처럼 많은 설정 이 필요 하지 않 습 니 다.pyspark 가 제공 하 는 조작 hive 인 터 페 이 스 는 프로그램 이 직접 SQL 문 구 를 사용 하여 hive 에서 필요 한 데 이 터 를 조회 할 수 있 습 니 다.코드 는 다음 과 같 습 니 다.

from pyspark.sql import HiveContext,SparkSession
 
_SPARK_HOST = "spark://spark-master:7077"
_APP_NAME = "test"
spark_session = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate()
 
hive_context= HiveContext(spark_session )
 
#      SQL  ,   hive       ,      where     
hive_database = "database1"
hive_table = "test"
hive_read = "select * from {}.{}".format(hive_database, hive_table)
 
#   SQL   hive         dataframe   
read_df = hive_context.sql(hive_read)
2.데 이 터 를 hive 표 에 기록 합 니 다.
pyspark 는 hive 표를 쓰 는 데 두 가지 방법 이 있 습 니 다.
(1)SQL 문 구 를 통 해 표 생 성

from pyspark.sql import SparkSession, HiveContext
 
_SPARK_HOST = "spark://spark-master:7077"
_APP_NAME = "test"
 
spark = SparkSession.builder.master(_SPARK_HOST).appName(_APP_NAME).getOrCreate()
 
data = [
 (1,"3","145"),
 (1,"4","146"),
 (1,"5","25"),
 (1,"6","26"),
 (2,"32","32"),
 (2,"8","134"),
 (2,"8","134"),
 (2,"9","137")
]
df = spark.createDataFrame(data, ['id', "test_id", 'camera_id'])
 
# method one,default         ,write_test     default       
df.registerTempTable('test_hive')
sqlContext.sql("create table default.write_test select * from test_hive")
(2)saveastable 방식

# method two
 
# "overwrite"       ,     ,        ,             
# mode("append")               
df.write.format("hive").mode("overwrite").saveAsTable('default.write_test')
 
tips:
spark 는 위의 몇 가지 방식 으로 hive 를 읽 고 쓸 때 작업 을 제출 할 때 해당 하 는 설정 을 추가 해 야 합 니 다.그렇지 않 으 면 오류 가 발생 할 수 있 습 니 다.
spark-submit --conf spark.sql.catalogImplementation=hive test.py
추가 지식:PySpark 는 SHC 프레임 워 크 를 기반 으로 HBase 데 이 터 를 읽 고 DataFrame 로 전환
첫째,우선 HBase 디 렉 터 리 lib 의 jar 패키지 와 SHC 의 jar 패 키 지 를 모든 노드 의 Spark 디 렉 터 리 lib 에 복사 해 야 합 니 다.
2.spark-defaults.conf 를 수정 하여 spark.driver.extraclassPath 와 spark.executor.extraclassPath 에 상기 jar 가방 이 있 는 경 로 를 추가 합 니 다.
3.클 러 스 터 재 개
코드

#/usr/bin/python
#-*- coding:utf-8 C*-
 
from pyspark import SparkContext
from pyspark.sql import SQLContext,HiveContext,SparkSession
from pyspark.sql.types import Row,StringType,StructField,StringType,IntegerType
from pyspark.sql.dataframe import DataFrame
 
sc = SparkContext(appName="pyspark_hbase")
sql_sc = SQLContext(sc)
 
dep = "org.apache.spark.sql.execution.datasources.hbase"
#  schema
catalog = """{
       "table":{"namespace":"default", "name":"teacher"},
       "rowkey":"key",
       "columns":{
            "id":{"cf":"rowkey", "col":"key", "type":"string"},
            "name":{"cf":"teacherInfo", "col":"name", "type":"string"},
            "age":{"cf":"teacherInfo", "col":"age", "type":"string"},
            "gender":{"cf":"teacherInfo", "col":"gender","type":"string"},
            "cat":{"cf":"teacherInfo", "col":"cat","type":"string"},
            "tag":{"cf":"teacherInfo", "col":"tag", "type":"string"},
            "level":{"cf":"teacherInfo", "col":"level","type":"string"} }
      }"""
 
df = sql_sc.read.options(catalog = catalog).format(dep).load()
 
print ('***************************************************************')
print ('***************************************************************')
print ('***************************************************************')
df.show()
print ('***************************************************************')
print ('***************************************************************')
print ('***************************************************************')
sc.stop()
해명
데이터 출처 는 본인 의 이전 글 을 참고 하 시기 바 랍 니 다.여기 서 는 군말 하지 않 겠 습 니 다.
schema 정의 참조 그림:

결국

이 편 은 python 에서 pyspark 를 사용 하여 Hive 데 이 터 를 읽 고 쓰 는 작업 이 바로 편집장 이 여러분 에 게 공유 하 는 모든 내용 입 니 다.참고 하 시기 바 랍 니 다.여러분 들 도 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.

좋은 웹페이지 즐겨찾기