pyspark DataFrame 데이터 사전 처리

15332 단어 데이터 처리

문서 목록

  • 1,pyspark에서 실행 코드
  • 2. pyspark 및 DataFrame
  • 셋째,pyspark DataFrame의 생성 및 저장
  • 3.1, SparkSession 객체 만들기
  • 3.2, DataFrame 생성
  • 3.3, DataFrame 저장
  • 넷째,pyspark DataFrame의 흔한 조작
  • 다섯,pandas.DataFrame、spark.DataFrame 상호 변환
  • pyspark = spark + python

    pyspark에서 실행 코드


    pyspark는python 상호작용 실행 환경을 제공합니다.pyspark는 실시간, 상호작용의 방식으로 데이터를 분석할 수 있다.pyspark에서 Spark API를 쉽게 학습할 수 있습니다.python 인터랙티브 환경 개발 spark 응용 프로그램

    2. pyspark 및 DataFrame


    DataFrame과 RDD의 차이점: 1. DataFrame의 출시로 Spark는 대규모 데이터를 처리하는 능력을 갖추었다. 기존의 RDD 변환보다 더욱 간단하고 사용하기 쉬울 뿐만 아니라 더욱 높은 계산 성능을 얻었다. 2. Spark는 MySQL에서 DataFrame로의 전환을 쉽게 실현할 수 있고 SQL 조회 3, RDDD를 지원하는 것은 분포식 Java 대상의 집합이지만 대상 내부의 구조는 RDDD에 있어 알 수 없는 4, 4이다.DataFrame은 RDD 기반의 분산 데이터 세트로서 자세한 메커니즘 정보를 제공합니다.
    RDD 분포식 자바 집합은 가장 바깥쪽에서 자바 대상만 볼 수 있고 내부의 세부 정보는 RDD의 분포식 구조 데이터를 바탕으로 명확한 구조화된 정보를 제공할 수 없다.
    RDD:객체-객체 속성 DataFrame:선반, 내부 정보를 한눈에 파악

    3. pyspark DataFrame의 생성 및 저장


    Spark 2.0 이상 버전부터 Spark는 Spark Session 인터페이스를 사용하여 Spark 1을 대체합니다.6의 SQLcontext 및 HiveContext 커넥터.SparkSession 인터페이스를 이용하여 데이터에 대한 불러오기, 변환, 처리 등의 기능을 실현한다.SparkSession은 데이터에 대한 로드, 변환, 처리 등의 기능을 실현했다.SparkSession은 SQLcontext 및 HiveContext의 모든 기능을 구현했습니다.SparkSession은 서로 다른 데이터 원본에서 데이터를 불러오고 데이터 변환을 DataFrame으로 할 수 있으며 DataFrame 변환을 SQLcontext 자체의 테이블로 하고 SQL 문구를 사용하여 데이터를 조작할 수 있습니다.SparkSession은 HiveQL과 Hive에 의존하는 다른 기능도 지원합니다.
    SparkSession 지휘관:

    3.1 SparkSession 객체 만들기:


    pyspark에 들어간 후 mpyspark는 기본적으로 SparkContext 대상 (이름 sc) 과 SparkSession 대상 (이름 spark) 을 제공합니다.
     #         ,    
     from pyspark import SparkContext, SparkConf
     from pyspark.sql import SparkSession
     #    sparksession  
     spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
    
    

    3.2 DataFrame 생성


    spark.read.text(‘people.txt’) spark.read.json(‘people.json’) spark.read.parquet(‘people.parquet’)
    또는: 텍스트 파일을 읽고 DataFrame spark를 생성합니다.read.format(‘text’).load(‘people.txt’) park.read.format(‘json’).load(‘people.json’) park.read.format(‘parquet’).load(‘people.parquet’)
    df = spark.read.json(‘file:///usr/re/people.json’) df.show

    3.3 DataFrame의 저장


    스파크를 사용합니다.write 작업은 DataFrame df 저장--> df.write.txt(‘people.txt’) df ————> df.write.json(‘people.json’) df ————> df.write.parquet('people.parquet') 또는 df----> df.write.format(‘text’).save(‘people.txt’) df ————> df.write.format(‘json’).save(‘people.json’) df ————> df.write.format(‘parquet’).save(‘people.parquet’)

    4.pyspark DataFrame의 일반적인 작업

     df = spark.read.json('people.json')
     #       ,   ,    ,    
     df.printSchema()
     #     
     df.select(df['name'], df['age']+1)
     df.select(df['name'], df['age']+1).show()
     #     
     df.filter(df['age'] > 20).show()
     # groupBy()
     df.groupby('age').count().show()
     # dataframe   
     df.sort(df['age'].desc , df['name'].asc).show()
    

    5.pandas.DataFrame、spark.DataFrame 상호 변환

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    """
    Created on Thu Oct  4 17:00:10 2018
    
    @author: chichao
    """
    
    import pandas as pd
    from pyspark.sql import SparkSession
    from pyspark.sql import SQLContext
    from pyspark import SparkContext
    
    
    
    # 1. pandas DataFrame
    
    # 1)   
    df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], index=['row1', 'row2'], columns=['c1', 'c2', 'c3'])
    
    # 2)    
    print(df)
    
    # 2. spark DataFrame
    
    # 1)    
    sc = SparkContext() 
    
    spark = SparkSession \
            .builder \
            .appName("testDataFrame") \
            .getOrCreate() 
            
    sentenceData = spark.createDataFrame([
        (0.0, "I like Spark"),
        (1.0, "Pandas is useful"),
        (2.0, "They are coded by Python ")
    ], ["label", "sentence"]) 
    
    # 2)    
    sentenceData.show()
    
    sentenceData.select("label").show()
    
    # 3. spark.DataFrame     pandas.DataFrame
    
    sqlContest = SQLContext(sc)
    
    spark_df = sqlContest.createDataFrame(df)
    
    spark_df.select("c1").show()
    
    # 4. pandas.DataFrame     spark.DataFrame
    
    pandas_df = sentenceData.toPandas()
    
    print(pandas_df)
    

    pandas의 방식은 단기판, 즉 toPandas()의 방식은 단기판이기 때문에 분포식 버전으로 바꿀 수도 있다.
    import pandas as pd
    def _map_to_pandas(rdds):
      return [pd.DataFrame(list(rdds))]
       
    def topas(df, n_partitions=None):
      if n_partitions is not None: df = df.repartition(n_partitions)
      df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
      df_pand = pd.concat(df_pand)
      df_pand.columns = df.columns
      return df_pand
       
    pandas_df = topas(spark_df)
    

    좋은 웹페이지 즐겨찾기