Spark에서 중첩 구조 작업

내용의 테이블


  • Intro
  • Add Column
  • Drop Column
  • Map column
  • Afterword

  • 소개



    spark-hats 이라는 라이브러리를 소개하고 싶습니다. Array Transformation*s*를 위한 전체 이름 Spark Helpers이지만 이름에 속지 마십시오. 구조체에서도 작동합니다. 이 라이브러리는 중첩 구조와 함께 작동해야 하는 새로운 스파크 응용 프로그램을 개발할 때 많은 시간과 에너지를 절약합니다. 그것이 당신에게도 도움이되기를 바랍니다.

    라이브러리의 핵심은 열 추가, 열 매핑, 열 삭제 방법입니다. 이 모든 것이 설계되어 다음과 같이 전환할 수 있습니다.

    val dfOut = df.select(col("id"), transform(col("my_array"), c => {
      struct(c.getField("a").as("a"),
      c.getField("b").as("b"),
      (c.getField("a") + 1).as("c"))
    }).as("my_array"))
    

    이것으로:

    val dfOut = df.nestedMapColumn("my_array.a","c", a => a + 1)
    

    가져오기와 예제에 사용할 구조부터 시작하겠습니다.

    셸에서 다음 명령을 사용하여 패키지와 함께 spark-shell을 사용합니다.

    $> spark-shell --packages za.co.absa:spark-hats_2.11:0.2.1
    

    그런 다음 스파크 쉘에서 :

    scala> import za.co.absa.spark.hats.Extensions._
    import za.co.absa.spark.hats.Extensions._
    
    scala> df.printSchema()
    root
     |-- id: long (nullable = true)
     |-- my_array: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- a: long (nullable = true)
     |    |    |-- b: string (nullable = true)
    
    scala> df.show(false)
    +---+------------------------------+
    |id |my_array                      |
    +---+------------------------------+
    |1  |[[1, foo]]                    |
    |2  |[[1, bar], [2, baz], [3, foz]]|
    +---+------------------------------+
    

    이제 메소드로 넘어갑시다.

    열 추가



    열 추가는 두 가지 변형으로 제공됩니다. 간단하고 확장되었습니다. 단순을 사용하면 중첩 구조에 새 필드를 추가할 수 있습니다. Extend는 다른 요소를 참조할 수 있도록 허용하면서 동일한 작업을 수행합니다.

    간단한 것은 매우 간단합니다. DataFrame을 가져오고 withColumn 를 호출하는 대신 nestedWithColumn 를 호출합니다. 구조체에 리터럴을 추가해 보겠습니다.

    scala> df.nestedWithColumn("my_array.c", lit("hello")).printSchema
    root
     |-- id: long (nullable = true)
     |-- my_array: array (nullable = true)
     |    |-- element: struct (containsNull = false)
     |    |    |-- a: long (nullable = true)
     |    |    |-- b: string (nullable = true)
     |    |    |-- c: string (nullable = false)
    
    scala> df.nestedWithColumn("my_array.c", lit("hello")).show(false)
    +---+---------------------------------------------------+
    |id |my_array                                           |
    +---+---------------------------------------------------+
    |1  |[[1, foo, hello]]                                  |
    |2  |[[1, bar, hello], [2, baz, hello], [3, foz, hello]]|
    +---+---------------------------------------------------+
    

    그런 다음 확장 버전은 배열의 다른 요소를 사용할 수 있습니다. API도 다릅니다. 여기서 메서드nestedWithColumnExtended는 열을 두 번째 매개변수로 반환하는 함수를 예상합니다. 게다가 이 함수에는 함수 자체인 getField() 함수인 인수가 있습니다. 변환에서 getField() 함수를 사용하여 정규화된 이름으로 DataFrame의 다른 열을 참조할 수 있습니다.

    scala> val dfOut = df.nestedWithColumnExtended("my_array.c", getField =>
             concat(col("id").cast("string"), getField("my_array.b"))
           )
    
    scala> dfOut.printSchema
    root
     |-- id: long (nullable = true)
     |-- my_array: array (nullable = true)
     |    |-- element: struct (containsNull = false)
     |    |    |-- a: long (nullable = true)
     |    |    |-- b: string (nullable = true)
     |    |    |-- c: string (nullable = true)
    
    scala> dfOut.show(false)
    +---+------------------------------------------------+
    |id |my_array                                        |
    +---+------------------------------------------------+
    |1  |[[1, foo, 1foo]]                                |
    |2  |[[1, bar, 2bar], [2, baz, 2baz], [3, foz, 2foz]]|
    +---+------------------------------------------------+
    

    루트 수준 열의 경우 col 를 사용하면 충분하지만 getField 여전히 괜찮을 것입니다.

    열 삭제



    두 번째 방법으로 이미 명명 규칙을 잡았을 수 있습니다. 이 방법을 nestedDropColumn라고 하며 세 가지 방법 중 가장 간단합니다. 정규화된 이름을 제공하면 됩니다.

    scala> df.nestedDropColumn("my_array.b").printSchema
    root
     |-- id: long (nullable = true)
     |-- my_array: array (nullable = true)
     |    |-- element: struct (containsNull = false)
     |    |    |-- a: long (nullable = true)
    
    scala> df.nestedDropColumn("my_array.b").show(false)
    +---+---------------+
    |id |my_array       |
    +---+---------------+
    |1  |[[1]]          |
    |2  |[[1], [2], [3]]|
    +---+---------------+
    

    지도 열



    지도 열은 아마도 가장 많이 사용되는 열일 것입니다. 맵은 구조체의 각 요소에 함수를 적용하고 기본적으로 같은 수준에 출력을 넣거나 지정된 경우 다른 곳에 출력합니다.

    입력 열이 기본 열이면 간단한 기능으로 충분합니다. 구조체인 경우 getField를 다시 사용해야 합니다.

    scala> df.nestedMapColumn(inputColumnName = "my_array.a", outputColumnName = "c", expression = a => a + 1).printSchema
    root
     |-- id: long (nullable = true)
     |-- my_array: array (nullable = true)
     |    |-- element: struct (containsNull = false)
     |    |    |-- a: long (nullable = true)
     |    |    |-- b: string (nullable = true)
     |    |    |-- c: long (nullable = true)
    
    scala> df.nestedMapColumn(inputColumnName = "my_array.a", outputColumnName = "c", expression = a => a + 1).show(false)
    +---+---------------------------------------+
    |id |my_array                               |
    +---+---------------------------------------+
    |1  |[[1, foo, 2]]                          |
    |2  |[[1, bar, 2], [2, baz, 3], [3, foz, 4]]|
    +---+---------------------------------------+
    

    뒷말



    이 방법들과 라이브러리가 저에게 도움이 된 만큼 여러분에게도 도움이 되었으면 합니다. 그들은 구조 작업을 훨씬 쉽게 만들고 코드를 더 간결하게 유지하므로 내 머리로는 오류가 발생하기 쉽습니다.

    자세한 내용은 https://github.com/AbsaOSS/spark-hats으로 이동하십시오.

    행운을 빕니다 행복한 코딩!

    좋은 웹페이지 즐겨찾기