Cloud에서 Hadoop으로 데이터 전송 및 Spark에서 참조

지난번 만든 Cluster로 데이터를 전송하고 Spark에서 참조합니다.
로컬 (Mac)에서 webhdfs를 사용하여 hdfs로 파일을 전송합니다.

로컬(Mac)에서 파일 전송



webhdfs에는 REST 인터페이스가 있습니다.
이전에 소개한 것처럼 Bluemix에서 지불한 Hadoop의 Cluster
기본적으로 webhdfs URL이 지불되므로 사용합니다.

hdfs 구성



Bluemix의 Hadoop은 hdfs의 루트 디렉토리 아래에 아래 그림의 구성을 기본적으로 제공합니다.



또, user 디렉토리 한 에 프로비저닝시의 사용자 디렉토리가 준비되기 때문에, 이후 그 쪽의 디렉토리(/user/tanayu)에 파일을 두어 갑니다.



webhdfs를 사용한 디렉토리 찾아보기



여기서는 curl을 사용하여 webhdfs의 REST 인터페이스를 조작합니다.
기본 URL : h tps : // Bi-Hadoo p-p d 4148. 비. 세 r ぃ 세 s. 음 - 그래 th. b 에미 x. 네 t : 8443
는 공통이며 뒤에 root 디렉토리의 경로를 씁니다.
마지막으로 op에서 작업을 지정했습니다. 여기에서는 LISTSTATUS를 사용하여 디렉토리 정보를 검색합니다.

다음 예제에서는/user 디렉토리의 정보를 참조합니다.
사용자 이름과 비밀번호는 자신의 환경으로 바꿉니다.
curl -i -k -s --user <user name>:<password> --max-time 45 https://bi-hadoop-prod-4148.bi.services.us-south.bluemix.net:8443/gateway/default/webhdfs/v1/user?op=LISTSTATUS

명령을 실행하면 HTTP Status 및/user 디렉토리 정보가 Json 형식입니다.
출력됩니다.
꽤 보기 힘들지만 반환되는 정보는 기본적으로 hdfs dfs -ls 명령과 차이가 없습니다.



webhdfs를 사용하여 파일 전송



그런 다음 로컬 파일을 webhdfs를 사용하여 Bluemix의 Cluster에 배치합니다.

다음 예제에서는/user/tanayu 디렉토리 아래에 ad_sample_log.csv 파일을 작성합니다. op는 CREATE를 사용합니다.
curl -i -L -k -s --user <user name>:<password> --max-time 45 -X PUT -T ad_sample_log.csv https://bi-hadoop-prod-4148.bi.services.us-south.bluemix.net:8443/gateway/default/webhdfs/v1/user/tanayu/ad_sample_log.csv?op=CREATE

아래 그림은 런타임 콘솔입니다.



파일이 전송되었는지 확인하는 것은 LISTSTATUS에서 수행됩니다.
curl -i -k -s --user <user name>:<password> --max-time 45 https://bi-hadoop-prod-4148.bi.services.us-south.bluemix.net:8443/gateway/default/webhdfs/v1/user/tanayu?op=LISTSTATUS

다음은 명령의 반환값입니다.
성공적으로 작성되었음을 알 수 있습니다.



webhdfs를 사용하여 파일 삭제



마지막으로 webhdfs를 사용한 파일 삭제 처리입니다.
이전에 전송한 ad_sample_log.csv 파일을 삭제합니다.

삭제는 op=DELETE입니다.
curl -i -L -k -s --user <user name>:<password> --max-time 45 -X DELETE https://bi-hadoop-prod-4148.bi.services.us-south.bluemix.net:8443/gateway/default/webhdfs/v1/user/tanayu/ad_sample_log.csv?op=DELETE

아래 그림은 명령 실행시 콘솔입니다.



거의 webhdfs의 기능을 그대로 사용할 수 있을 것 같다.

Spark 프로그래밍



이번에는 hdfs에 배치한 Spark에서 참조하여 행 수를 표시하는 간단한 프로그램입니다.
package org.tanayu.qiita

import org.apache.log4j.{Level,Logger}
import org.apache.spark.{SparkConf, SparkContext}

object sample {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("qiita_sample")
    val sc = new SparkContext(conf)
    run(sc)
    sc.stop
  }
  private[qiita]
  def run(sc: SparkContext){
    val txt = sc.textFile("hdfs://user/tanayu/ad_sample_log.csv")
    println("count :" + txt.count());
  }
}

위를 build 한 .jar를 scp로 전송합니다.
다음은 명령의 예입니다.
scp  target/scala-2.11/bioc_sample-assembly-0.0.1.jar <user name>@bi-hadoop-prod-4148.bi.services.us-south.bluemix.net:

전송이 끝나면 home_dir로 파일이 전송됩니다.

다음은 전송된 jar 실행 명령입니다.
기본이라면 INFO 로그가 가득 찼습니다.
log4j의 레벨을 WARN으로 설정하고 있습니다.
spark-submit --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:./log4j.properties --class org.tanayu.qiita.sample --master yarn-client  bioc_sample-assembly-0.0.1.jar

Cloud의 Hadoop 구성은 직접 접촉하지 않으므로,
/etc/hadoop/conf/log4j.properties를 home_dir에 복사하고 내용을 변경하십시오.
spark-submit시에 전달하고 있습니다. 덧붙여서 다시 쓰는 부분은 아래와 같습니다.
hadoop.root.logger=WARN,console

아래는 실행 결과입니다.

좋은 웹페이지 즐겨찾기