18 Spark Streaming 프로그램의 우아한 정지

6229 단어
  • 스파크 스트리밍 프로그램의 정지는 강제 정지, 이상 정지 또는 다른 방식으로 정지할 수 있다.우선 StreamingContext의 stop() 방법
  • 을 살펴보겠습니다.
    def stop(
          stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true)
         ): Unit = synchronized {
        stop(stopSparkContext, false)
    }
    

    여기에 두 개의 매개 변수를 정의했습니다. stopSparkContext는 설정 파일을 통해 정의할 수 있습니다. 이어서 두 개의 매개 변수를 수신하는 stop 방법을 보십시오. 코드는 다음과 같습니다.
    /**
       * Stop the execution of the streams, with option of ensuring all received data
       * has been processed.
       *
       * @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
       *                         will be stopped regardless of whether this StreamingContext has been
       *                         started.
       * @param stopGracefully if true, stops gracefully by waiting for the processing of all
       *                       received data to be completed
       */
    def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
        var shutdownHookRefToRemove: AnyRef = null
        if (AsynchronousListenerBus.withinListenerThread.value) {
          throw new SparkException("Cannot stop StreamingContext within listener thread of" +
            " AsynchronousListenerBus")
        }
        synchronized {
          try {
            state match {
              case INITIALIZED =>
                logWarning("StreamingContext has not been started yet")
              case STOPPED =>
                logWarning("StreamingContext has already been stopped")
              case ACTIVE =>
                scheduler.stop(stopGracefully)
                // Removing the streamingSource to de-register the metrics on stop()
                env.metricsSystem.removeSource(streamingSource)
                uiTab.foreach(_.detach())
                StreamingContext.setActiveContext(null)
                waiter.notifyStop()
                if (shutdownHookRef != null) {
                  shutdownHookRefToRemove = shutdownHookRef
                  shutdownHookRef = null
                }
                logInfo("StreamingContext stopped successfully")
            }
          } finally {
            // The state should always be Stopped after calling `stop()`, even if we haven't started yet
            state = STOPPED
          }
        }
        if (shutdownHookRefToRemove != null) {
          ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove)
        }
        // Even if we have already stopped, we still need to attempt to stop the SparkContext because
        // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
        if (stopSparkContext) sc.stop()
    }
    

    주석에서 프로그램을 정지할 때 정확한 방법은 모든 수신된 데이터가 처리된 후에 정지하는 것입니다. 그러면 우리가 전송한 stopGracefully 매개 변수가true이고, 정지할 때 모든 작업이 완료되기를 기다립니다.
  • Spark Streaming은 우아하게 멈추는 방법을 제공했다.Streaming Context에 stop On Shutdown () 방법이 있는데 코드는 다음과 같다
  • private def stopOnShutdown(): Unit = {
        val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
        logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
        // Do not stop SparkContext, let its own shutdown hook stop it
        stop(stopSparkContext = false, stopGracefully = stopGracefully)
    }
    

    stop On Shutdown () 방법은 무슨 뜻입니까? 프로그램이 종료될 때 정상적으로 종료되거나 이상하게 종료되더라도, stop On Shutdown () 방법은 리셋되고, stop 방법을 호출합니다.stopGracefully는 설정 항목spark를 통과할 수 있습니다.streaming.stopGracefully OnShutdown 설정, 생산 환경은true로 설정해야 합니다.
  • stop On Shutdown () 방법은 어떻게 호출됩니까?StreamingContext의 start 방법에 코드 한 줄
  • shutdownHookRef = ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
    

    Shutdown Hook Manager에 stopOn Shutdown 함수를 추가하려면ddShutdown Hook 코드는 다음과 같습니다
    def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
        shutdownHooks.add(priority, hook)
    }
    

    Spark Shutdown Hook Manager에 무엇이 있는지 보고 코드 설명을 보고 Spark Shutdown Hook Manager의 기능을 일일이 소개하지 않습니다.
    private [util] class SparkShutdownHookManager {
    
      //      ,     ,     
      private val hooks = new PriorityQueue[SparkShutdownHook]()
      @volatile private var shuttingDown = false
    
      /**
       * Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
       * have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
       * the best.
       */
      //          ,   jvm      ,  jvm        
      def install(): Unit = {
        val hookTask = new Runnable() {
          override def run(): Unit = runAll()
        }    Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
          case Success(shmClass) =>
            val fsPriority = classOf[FileSystem]
              .getField("SHUTDOWN_HOOK_PRIORITY")
              .get(null) // static field, the value is not used
              .asInstanceOf[Int]
            val shm = shmClass.getMethod("get").invoke(null)
            shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
              .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
          case Failure(_) =>
            Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
        }
      }
      // jvm          
      def runAll(): Unit = {
        shuttingDown = true
        var nextHook: SparkShutdownHook = null
        //             ,     ,     
        while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) {
          Try(Utils.logUncaughtExceptions(nextHook.run()))
        }
      }
      def add(priority: Int, hook: () => Unit): AnyRef = {
        hooks.synchronized {
          if (shuttingDown) {
            throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
          }
          val hookRef = new SparkShutdownHook(priority, hook)
          hooks.add(hookRef)
          hookRef
        }
      }
      def remove(ref: AnyRef): Boolean = {
        hooks.synchronized { hooks.remove(ref) }
      }
    }
    
  • 여기를 보면 알 수 있듯이 스톱 온 shutdown () 함수를 Spark Shutdown Hook Manager의 최적화 레벨 대기열 hooks에 넣으면 기본 우선 순위는 51이고 jvm가 종료할 때 라인을 시작하고run All () 방법을 호출한 다음 hooks 대기열에서 수거 (함수) 를 추출하고 실행하면 스톱 온 shutdown () 함수를 호출하고 스톱 () 함수를 호출합니다.우리의 응용 프로그램은 우아하게 작업을 멈추게 할 수 있다.
  • 좋은 웹페이지 즐겨찾기