18 Spark Streaming 프로그램의 우아한 정지
def stop(
stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true)
): Unit = synchronized {
stop(stopSparkContext, false)
}
여기에 두 개의 매개 변수를 정의했습니다. stopSparkContext는 설정 파일을 통해 정의할 수 있습니다. 이어서 두 개의 매개 변수를 수신하는 stop 방법을 보십시오. 코드는 다음과 같습니다.
/**
* Stop the execution of the streams, with option of ensuring all received data
* has been processed.
*
* @param stopSparkContext if true, stops the associated SparkContext. The underlying SparkContext
* will be stopped regardless of whether this StreamingContext has been
* started.
* @param stopGracefully if true, stops gracefully by waiting for the processing of all
* received data to be completed
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
var shutdownHookRefToRemove: AnyRef = null
if (AsynchronousListenerBus.withinListenerThread.value) {
throw new SparkException("Cannot stop StreamingContext within listener thread of" +
" AsynchronousListenerBus")
}
synchronized {
try {
state match {
case INITIALIZED =>
logWarning("StreamingContext has not been started yet")
case STOPPED =>
logWarning("StreamingContext has already been stopped")
case ACTIVE =>
scheduler.stop(stopGracefully)
// Removing the streamingSource to de-register the metrics on stop()
env.metricsSystem.removeSource(streamingSource)
uiTab.foreach(_.detach())
StreamingContext.setActiveContext(null)
waiter.notifyStop()
if (shutdownHookRef != null) {
shutdownHookRefToRemove = shutdownHookRef
shutdownHookRef = null
}
logInfo("StreamingContext stopped successfully")
}
} finally {
// The state should always be Stopped after calling `stop()`, even if we haven't started yet
state = STOPPED
}
}
if (shutdownHookRefToRemove != null) {
ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove)
}
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
if (stopSparkContext) sc.stop()
}
주석에서 프로그램을 정지할 때 정확한 방법은 모든 수신된 데이터가 처리된 후에 정지하는 것입니다. 그러면 우리가 전송한 stopGracefully 매개 변수가true이고, 정지할 때 모든 작업이 완료되기를 기다립니다.
private def stopOnShutdown(): Unit = {
val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully)
}
stop On Shutdown () 방법은 무슨 뜻입니까? 프로그램이 종료될 때 정상적으로 종료되거나 이상하게 종료되더라도, stop On Shutdown () 방법은 리셋되고, stop 방법을 호출합니다.stopGracefully는 설정 항목spark를 통과할 수 있습니다.streaming.stopGracefully OnShutdown 설정, 생산 환경은true로 설정해야 합니다.
shutdownHookRef = ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
Shutdown Hook Manager에 stopOn Shutdown 함수를 추가하려면ddShutdown Hook 코드는 다음과 같습니다
def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
shutdownHooks.add(priority, hook)
}
Spark Shutdown Hook Manager에 무엇이 있는지 보고 코드 설명을 보고 Spark Shutdown Hook Manager의 기능을 일일이 소개하지 않습니다.
private [util] class SparkShutdownHookManager {
// , ,
private val hooks = new PriorityQueue[SparkShutdownHook]()
@volatile private var shuttingDown = false
/**
* Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
* have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
* the best.
*/
// , jvm , jvm
def install(): Unit = {
val hookTask = new Runnable() {
override def run(): Unit = runAll()
} Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) match {
case Success(shmClass) =>
val fsPriority = classOf[FileSystem]
.getField("SHUTDOWN_HOOK_PRIORITY")
.get(null) // static field, the value is not used
.asInstanceOf[Int]
val shm = shmClass.getMethod("get").invoke(null)
shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
.invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
case Failure(_) =>
Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
}
}
// jvm
def runAll(): Unit = {
shuttingDown = true
var nextHook: SparkShutdownHook = null
// , ,
while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) {
Try(Utils.logUncaughtExceptions(nextHook.run()))
}
}
def add(priority: Int, hook: () => Unit): AnyRef = {
hooks.synchronized {
if (shuttingDown) {
throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
}
val hookRef = new SparkShutdownHook(priority, hook)
hooks.add(hookRef)
hookRef
}
}
def remove(ref: AnyRef): Boolean = {
hooks.synchronized { hooks.remove(ref) }
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.