Spark 시리즈 - mapPartitions에 대한 오류

2933 단어

전언


오늘 리뷰에서 동료의 코드를 살펴보니 그 코드에 매우 많은 mapPartitions가 있었다. 그 이유를 물었더니 성능이 map보다 더 좋다고 말했다.왜 성능이 좋냐고요?그래서 이 문장이 생겼다

온라인에서 맵 Partitions를 선호하는 이유

  • 실행 횟수가 줄어들고 속도가 빨라진다. 어떤 문장의 원어로 말하자면 partition , , 。 내가 말하고 싶은 것은 함수 호출은 한 번에partition의 모든 데이터를 처리한다. 함수를 호출하는 데 드는 아주 적은 시간 비용을 절약할 수 있지만 이 절약 시간은 정말 너무 적다. 특히spark와 같은 구조는 그 자체가 밀리초급 응답을 하는 것이 아니라 심지어 억지로 말하자면너는 교체기를 도입하여 교체기의 조작을 하는데 설마 시간을 소모하지 않겠니?만약에 위의 이런 견해가 믿음직스럽지 않다면 어떤 견해들은 정말 나를 어이없게 한다. 예를 들어 map, partition 1 ;ok, function 1 。 , MapPartitions , task function, function partition 。 , 이런 견해는 위의 방식대로 이해하면 사실은 같은 일이지만 일부 신인들은 map 1 , MapPartitions , 실제로 네가 MapPartitions를 사용하여 교체할 때도 하나의 데이터 처리였고 이 횟수는 전혀 변하지 않았다는 것을 이해하기 쉽다.

  • mapPartitions 문제


    사실 제 개인적인 경험을 보면 맵 Partitions의 정확한 사용은 큰 문제를 일으키지 않습니다. 물론 저도 일반적인 장면의 맵 Partitions가 맵보다 어떤 장점이 있는지 몰랐기 때문에 맵 Partitions를 일부러 사용할 필요가 없습니다. 오히려 맵 Partitions가 문제를 가져올 수 있습니다.
  • 사용하기에 결코 편리하지 않다. 이 코드를 쓴 사람은 모두 알고 있을 것이다.물론 이 문제가 해결되지 않는 것은 아니다. 우리는 아래의 코드를 쓸 수 있다. 확실히 맵과 간결성도 많이 다르지 않다. 응, 생산 환경에서 사용해 볼 수 있다는 것을 알려주지 않을 거야.
    //       ,      mapPartitions     
    def mapFunc[T, U](iterator: Iterator[T], f: T => U) = {
      iterator.map(x => {
        f(x)
      })
    }
    //      
    rdd.mapPartitions(x => {
        mapFunc(x, line => {
            s"${line}    "
        })
      })
    
    
  • OOM을 만들기 쉽다. 이것도 많은 블로그에서 제기한 문제이다. 그들은 대체적으로 다음과 같은 코드를 써서 테스트를 할 것이다.
    rdd.mapPartitions(x => {
        xxxx  
       while (x.hasNext){
         val next = x.next()
       }
        xxx  
      })
    
    만약에 당신의 코드가 위와 같다면 OOM도 이상할 것이 없다. 주의를 기울였는지 모르겠다. 맵Partitions는 하나의 교체기를 받고 다시 교체기를 되돌려준다. 만약에 당신이 이렇게 코드를 쓰면 교체기의 게으른 실행 특성을 전혀 사용하지 않는다.데이터를 메모리에 쌓아 놓으면 정말 파티션 데이터를 한 번에 처리하는 것이 되고 어느 정도에 스파크 피플라인의 계산 모드를 파괴했다.

  • mapPartitions의 용도


    존재는 이치다. 위에서 계속 나무라지만 존재할 이유가 있다.하나의 구역이 한 번만 호출되는 특성은 데이터베이스를 쓸 때 확실히 도움이 된다. 스파크는 분포식으로 실행되기 때문에 데이터베이스에 연결하는 작업은 반드시 산자 내부에 넣어야만 Executor에 의해 정확하게 실행될 수 있다. 그러면 맵 Partitions는 맵보다 훨씬 우세한 것으로 나타난다.예를 들어 아래의 이 위조 코드
    rdd.mapPartitions(x => {
            println("     ")
            val res = x.map(line=>{
              print("    :" + line)
              line
            })
            println("     ")
            res
          })
    

    이렇게 하면 나는 한 구역에서 데이터베이스를 한 번만 연결하고 맵 계산자라면 n번을 연결해야 할 수도 있다.
    또 하나는 맵 Partitions가 우리에게 더욱 강력한 데이터 제어력을 제공했는데 어떻게 이해합니까?우리는 한 번에 한 구역의 데이터를 얻을 수 있다. 그러면 우리는 한 구역의 데이터를 통일적으로 처리할 수 있다. 비록 메모리의 비용을 증가시킬 수 있지만 일부 장면에서 매우 유용하다. 예를 들어 일부 행렬의 곱셈 등이다.

    후기


    네가 어떤 산수를 사용하든지 간에 사실은 모두 가능하다. 그러나 대다수 때 나는 네가 맵 산수를 사용하는 것을 추천한다. 물론 맵 산수가 적합하지 않은 장면을 만나면 어쩔 수 없다...하지만 맵 파티션을 정말로 사용하려고 한다고 해도 교체기의 게으른 실행 특성을 충분히 발휘해 보세요.

    마지막으로 본문이 도움이 된다면 좋아요를 눌러주세요.

    좋은 웹페이지 즐겨찾기