Spark 2.2 소스 분석 - SecurityManager

8102 단어 빅 데이터Spark
『 8194 』 Security Manager 는 주로 계 정, 권한 과 신분 인증 을 설정 하고 관리 합 니 다.스파크 의 배치 모드 가 YARN 이면 시 크 릿 키 (키) 를 생 성하 고 Hadoop UGI 를 저장 해 야 한다.다른 모드 에 서 는 환경 변 수 를 설정 해 야 합 니 다SPARK_AUTH_SECRET (우선 순위 가 더 높 음) 또는 spark. authenticate. secret 속성 은 secret key (키) 를 지정 합 니 다.마지막 으로 Security Manager 에 기본 암호 인증 인 스 턴 스 Authenticator 가 설정 되 어 있 습 니 다. 이 인 스 턴 스 는 익명 내부 클래스 로 이 루어 집 니 다. HTTP client 를 사용 할 때마다 HTTP 서버 에서 사용자 와 비밀 번 호 를 가 져 옵 니 다.이 는 Spark 의 노드 간 통신 은 사용자 이름, 비밀 번 호 를 동적 으로 협상 해 야 하기 때문에 이런 방식 은 이러한 수 요 를 유연 하 게 지원 하기 때문이다.
    Spark 는 비밀 키 공 유 를 통 해 인증 하 는 것 을 지원 합 니 다.인증 기능 을 사용 하면 파라미터 spark. authenticate 를 통 해 설정 할 수 있 습 니 다.이 매개 변 수 는 spark 통신 프로 토 콜 이 공유 비밀 키 를 사용 하여 인증 할 지 여 부 를 제어 합 니 다.이런 인증 방식 은 휴대 전화 제 를 바탕 으로 통신 쌍방 이 똑 같은 비밀 키 를 공유 할 때 만 통신 할 수 있다.공유 키 가 일치 하지 않 으 면 쌍방 이 통신 할 수 없습니다.다음 과정 을 통 해 공유 키 를 만 들 수 있 습 니 다.모든 프로그램 은 유일한 공유 키 를 사용 합 니 다.    2. 다른 배치 방식 에서 모든 노드 에 매개 변수 spark. authenticate. secret 를 설정 해 야 합 니 다.이 비밀 키 는 모든 Master, worker 및 응용 프로그램 에서 사용 합 니 다.
/*
	    
	  :org.apache.spark
	  :SparkEnv
*/
val securityManager = new SecurityManager(conf, ioEncryptionKey) 
    ioEncryptionKey.foreach { _ =>
      //            。
      if (!securityManager.isEncryptionEnabled()) {
        logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
          "wire.")
      }
    }
/*
	    
	   :new SecurityManager()
	  :org.apache.spark
	  :SecurityManager
*/
  //           spark.authenticate,   false
  private val authOn = sparkConf.get(NETWORK_AUTH_ENABLED)
  
 //   HTTP        
  //             HTTP  /     。   HTTP     HTTP      。      ,       。
  //       ,    spark.authenticate true,       ,    yarn       
  if (authOn) {
    Authenticator.setDefault(
    //         ,  PasswordAuthentication  ,        
      new Authenticator() {
        override def getPasswordAuthentication(): PasswordAuthentication = {
          var passAuth: PasswordAuthentication = null
          val userInfo = getRequestingURL().getUserInfo()
          if (userInfo != null) {
            val  parts = userInfo.split(":", 2)
            passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
          }
          return passAuth
        }
      }
    )
  }

/*
	   :          
	  :org.apache.spark
	  :SecurityManager
*/
  def isEncryptionEnabled(): Boolean = {
    sparkConf.get(NETWORK_ENCRYPTION_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED)
  }

Security Manage 는 주로 다음 과 같은 몇 가지 일 을 했 습 니 다. 코드 는 다음 과 같 습 니 다.
/*
	1、      
*/
private val authOn = sparkConf.get(NETWORK_AUTH_ENABLED)
private var aclsOn =
    sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false))
private var adminAcls: Set[String] =
  stringToSet(sparkConf.get("spark.admin.acls", "")) 
private var adminAclsGroups : Set[String] =
  stringToSet(sparkConf.get("spark.admin.acls.groups", ""))
 private var viewAcls: Set[String] = _

  private var viewAclsGroups: Set[String] = _

  // list of users who have permission to modify the application. This should
  // apply to both UI and CLI for things like killing the application.
  //                。      UI CLI            。
  private var modifyAcls: Set[String] = _

  private var modifyAclsGroups: Set[String] = _

  // always add the current user and SPARK_USER to the viewAcls
  //          spark_user viewacls
  private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
    Utils.getCurrentUserName()) // TODO:      defaultAclUsers:"Set$Set2" size =2  ,      0=“hzjs” 1="root"

  setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
  setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))

  setViewAclsGroups(sparkConf.get("spark.ui.view.acls.groups", ""));
  setModifyAclsGroups(sparkConf.get("spark.modify.acls.groups", "")); 

/*
	2、          
*/
logInfo("SecurityManager: authentication " + (if (authOn) "enabled" else "disabled") +
    "; ui acls " + (if (aclsOn) "enabled" else "disabled") +
    "; users  with view permissions: " + viewAcls.toString() +
    "; groups with view permissions: " + viewAclsGroups.toString() +
    "; users  with modify permissions: " + modifyAcls.toString() +
    "; groups with modify permissions: " + modifyAclsGroups.toString())

/*
	3、 Yarn     key
*/
private val secretKey = generateSecretKey()

/**
    * Generates or looks up the secret key.
    *
    *         。
    *
    * The way the key is stored depends on the Spark deployment mode. Yarn
    * uses the Hadoop UGI.
    *
    *          Spark    。Yarn  Hadoop UGI。
    *
    * For non-Yarn deployments, If the config variable is not set
    * we throw an exception.
    *
    *   non-Yarn    ,          ,         。
    */
  private def generateSecretKey(): String = {
    if (!isAuthenticationEnabled) {
      null
    } else if (SparkHadoopUtil.get.isYarnMode) {
      // In YARN mode, the secure cookie will be created by the driver and stashed in the
      // user's credentials, where executors can get it. The check for an array of size 0
      // is because of the test code in YarnSparkHadoopUtilSuite.
      val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(SECRET_LOOKUP_KEY)  //TODO: secretKey:null
      if (secretKey == null || secretKey.length == 0) {
        logDebug("generateSecretKey: yarn mode, secret key from credentials is null")
        val rnd = new SecureRandom()
        val length = sparkConf.getInt("spark.authenticate.secretBitLength", 256) / JByte.SIZE //TODO: length:32
        val secret = new Array[Byte](length)
        rnd.nextBytes(secret)

        val cookie = HashCodes.fromBytes(secret).toString() //TODO: cookie:"wery7237rwuhr732582irwberyu238grfuyewgr78....."
        SparkHadoopUtil.get.addSecretKeyToUserCredentials(SECRET_LOOKUP_KEY, cookie)
        cookie
      } else {
        new Text(secretKey).toString
      }
    } else {
      // user must have set spark.authenticate.secret config
      // For Master/Worker, auth secret is in conf; for Executors, it is in env variable
      Option(sparkConf.getenv(SecurityManager.ENV_AUTH_SECRET))
        .orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match {
        case Some(value) => value
        case None =>
          throw new IllegalArgumentException(
            "Error: a secret key must be specified via the " +
              SecurityManager.SPARK_AUTH_SECRET_CONF + " config")
      }
    }
  }

/**
    * Check to see if authentication for the Spark communication protocols is enabled
    *        Spark         。
    * @return true if authentication is enabled, otherwise false
    */
  def isAuthenticationEnabled(): Boolean = authOn

/*
	4、  HTTP        
*/
//   HTTP        
  //             HTTP  /     。   HTTP     HTTP      。      ,       。
  //       ,    spark.authenticate true,       ,    yarn       
  if (authOn) {
    Authenticator.setDefault(
      new Authenticator() {
        override def getPasswordAuthentication(): PasswordAuthentication = {
          var passAuth: PasswordAuthentication = null
          val userInfo = getRequestingURL().getUserInfo()
          if (userInfo != null) {
            val  parts = userInfo.split(":", 2)
            passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
          }
          return passAuth
        }
      }
    )
  }

/*
	5、  SSL
*/
//     SSL  
private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)

//     SSL
val fileServerSSLOptions = getSSLOptions("fs")
//  sslSocketFactory,hostnameVerifier
val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
    val trustStoreManagers =
      for (trustStore 

좋은 웹페이지 즐겨찾기