Azure HDInsight + Microsoft R Server에서 연산 처리 분산

개요



Microsoft Azure HDInsight는 Microsoft가 제공하는 Hadoop의 PaaS 서비스로 인프라 주변의 구축 노하우를 몰라도 훌륭한 Hadoop 클러스터를 구축할 수 있는 훌륭한 서비스입니다.
이번에는 이 Microsoft Azure HDInsight를 구축하고 구축한 클러스터에서 R 언어를 병렬로 실행해 보겠습니다.

사전에 준비하는 것


  • Azure 구독

  • ※무상시용판을 사용하지 않는 분은 반드시 읽어 주세요※



    이 클러스터를 구축하면 매시간 270엔 정도 돈이 듭니다. 작성 후 반드시 삭제하십시오.
    삭제하지 않으면 1일 6000엔 정도 걸리므로, 대단한 청구액이 됩니다.

    구축 절차


  • 새로 만들기 > Intelligence + analytics 범주에서 HDInsight를 선택합니다.
  • 클러스터 이름을 입력하고 클러스터 구성에서 클러스터 유형을 R Server로 설정한 다음 선택을 클릭합니다.
  • 자격 증명을 설정합니다. "클러스터 로그인 비밀번호"는 대소문자와 기호를 포함한 12자 이상의 비밀번호가 필요합니다.
  • 그런 다음 데이터 소스를 연결합니다. 기존 Azure 스토리지 계정과 Data Lake Store를 사용할 수 있습니다. 없으면 신규 작성도 가능합니다. 데이터 소스의 데이터는이 기사에서는 사용하지 않으므로 모든 설정이 OK입니다.
  • 클러스터의 크기를 지정합니다. 저자는 돈이 없기 때문에 가장 싼 구성으로 하고 있습니다(웃음) ※그래도 1시간 242.76엔이므로, 이용이 끝나면 반드시 삭제하도록 주의합시다. 그런 다음 "만들기"를 눌러 배포되기를 기다립니다. 대략 10분 이내에 배포가 완료되어야 합니다.
  • R Server 대시보드를 클릭합니다.
  • R Studio 서버를 선택합니다.
  • 로그인이 두 번 요청됩니다. 첫 번째는 클러스터 로그인 정보이고 두 번째는 SSH 로그인 정보입니다.
  • 이것으로 R 언어로 실행 준비가 완료됩니다.

  • 단일 노드에서 실행



    다음 코드를 실행합니다.
     # サンプルデータのHDFS (WASB) 場所を指定します
     bigDataDirRoot <- "/example/data"
    
     # 一時的に保管するローカルフォルダーを作成します
     source <- "/tmp/AirOnTimeCSV2012"
     dir.create(source)
    
     # データを一時フォルダへダウンロードします
     remoteDir <- "http://packages.revolutionanalytics.com/datasets/AirOnTimeCSV2012"
     download.file(file.path(remoteDir, "airOT201201.csv"), file.path(source, "airOT201201.csv"))
     download.file(file.path(remoteDir, "airOT201202.csv"), file.path(source, "airOT201202.csv"))
     download.file(file.path(remoteDir, "airOT201203.csv"), file.path(source, "airOT201203.csv"))
     download.file(file.path(remoteDir, "airOT201204.csv"), file.path(source, "airOT201204.csv"))
     download.file(file.path(remoteDir, "airOT201205.csv"), file.path(source, "airOT201205.csv"))
     download.file(file.path(remoteDir, "airOT201206.csv"), file.path(source, "airOT201206.csv"))
     download.file(file.path(remoteDir, "airOT201207.csv"), file.path(source, "airOT201207.csv"))
     download.file(file.path(remoteDir, "airOT201208.csv"), file.path(source, "airOT201208.csv"))
     download.file(file.path(remoteDir, "airOT201209.csv"), file.path(source, "airOT201209.csv"))
     download.file(file.path(remoteDir, "airOT201210.csv"), file.path(source, "airOT201210.csv"))
     download.file(file.path(remoteDir, "airOT201211.csv"), file.path(source, "airOT201211.csv"))
     download.file(file.path(remoteDir, "airOT201212.csv"), file.path(source, "airOT201212.csv"))
    
     # bigDataDirRootのディレクトリを設定し、ロードするデータを指定します
     inputDir <- file.path(bigDataDirRoot,"AirOnTimeCSV2012") 
    
     # ディレクトリを作成します
     rxHadoopMakeDir(inputDir)
    
     # データをソースからインプットへコピーします
     rxHadoopCopyFromLocal(source, bigDataDirRoot)
    
     # HDFS (WASB) ファイルシステムを定義します
     hdfsFS <- RxHdfsFileSystem()
    
     # 航空会社のリストを作成します
     airlineColInfo <- list(
         DAY_OF_WEEK = list(type = "factor"),
         ORIGIN = list(type = "factor"),
         DEST = list(type = "factor"),
         DEP_TIME = list(type = "integer"),
         ARR_DEL15 = list(type = "logical"))
    
     # カラム名を抽出します
     varNames <- names(airlineColInfo)
    
     # hdfs上のデータソースを定義します
     airOnTimeData <- RxTextData(inputDir, colInfo = airlineColInfo, varsToKeep = varNames, fileSystem = hdfsFS)
    
     # ローカルシステム上のテキストデータを定義します
     airOnTimeDataLocal <- RxTextData(source, colInfo = airlineColInfo, varsToKeep = varNames)
    
     # 利用する関数です
     formula = "ARR_DEL15 ~ ORIGIN + DAY_OF_WEEK + DEP_TIME + DEST"
    
    # 演算コンテキストをローカルにします
     rxSetComputeContext("local")
    
     # ロジスティクス回帰を実行します(およそ40分ほどかかります)
     system.time(
         modelLocal <- rxLogit(formula, data = airOnTimeDataLocal)
     )
    
     # 結果を表示します
     summary(modelLocal)
    

    이 방법을 사용하면 병렬 처리가 아니므로 40분 정도 시간이 걸립니다. 길네요...

    여러 노드에서 실행



    다중 노드에서 실행하려면 Spark를 연산 컨텍스트로 설정하여 실행할 수 있습니다.
    
     # Spark演算コンテキストを定義します 
     mySparkCluster <- RxSpark()
    
     # 演算コンテキストを定義します
     rxSetComputeContext(mySparkCluster)
    
     # ロジスティクス回帰を実行します
     system.time(  
         modelSpark <- rxLogit(formula, data = airOnTimeData)
     )
    
     # 結果を表示します
     summary(modelSpark)
    

    클러스터 삭제



    사용 후에는 반드시 클러스터를 삭제하는 것이 좋습니다.


    참고 자료

    좋은 웹페이지 즐겨찾기