Tachyon에서 Spark 응용 프로그램 실행

8603 단어 BigData
우리는 에서 위분포식 Tachyon 집단을 어떻게 구축하는지 소개했다.공식 문서에서 알 수 있듯이 스파크 1.4.x와 Tachyon 0.6.4 버전이 호환되고 최신 버전의 Tachyon 0.7.1과 Spark 1.5가 호환됩니다.x 호환, 현재 최신 버전의 스파크는 1.4.1이기 때문에 아래의 조작 절차는 모두 Tachyon 0.6.4 플랫폼을 바탕으로 하는 것이고 Tachyon 0.6.4의 구축 절차는 Tachyon 0.7.0과 유사하다.
잔말 말고 소개를 시작합시다.먼저 HDFS에 iteblog과 같은 파일을 업로드합니다.txt - 디렉토리를/data로 저장합니다.
[blog@node1 hadoop]$  bin/hadoop fs -put blog.txt /data
Spark-shell 시작
[blog@node1 spark]$  bin/spark-shell
이때 우리는 Tachyon을 통해 iteblog을 얻을 수 있다.txt 파일은 다음과 같습니다.
scala>  val s = sc.textFile("tachyon://localhost:19998/data/iteblog.txt")
15/08/31 14:15:24 INFO storage.MemoryStore: ensureFreeSpace(156896) called with curMem=216700, maxMem=280248975
15/08/31 14:15:24 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 153.2 KB, free 266.9 MB)
15/08/31 14:15:24 INFO storage.MemoryStore: ensureFreeSpace(14945) called with curMem=373596, maxMem=280248975
15/08/31 14:15:24 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 14.6 KB, free 266.9 MB)
15/08/31 14:15:24 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:55566 (size: 14.6 KB, free: 267.2 MB)
15/08/31 14:15:24 INFO spark.SparkContext: Created broadcast 3 from textFile at :21
s: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at textFile at :21
tachyon://localhost:19998바로 너의 Tachyon의 통신 주소다.우리는 ji/data/iteblog를 볼 수 있다.txt가 대응하는 것은 HDFS의 파일입니다. 이것은 어떻게 얻었습니까?사실은 Tachyon의conf/tachyon-env입니다.sh 파일에서 구성된, export TACHYONUNDERFS_ADDRESS=hdfs://iteblog.com:8020설정, 이것이 바로 우리 HDFS 집단의 통신 주소를 설정하는 것입니다. 그러면 우리는 tachyon을 통해 그 파일을 찾을 수 있습니다.이제 Action 작업을 수행하겠습니다.
scala> s.count()
15/08/31 14:15:45 INFO : getFileStatus(/data/iteblog.txt): HDFS Path: hdfs://localhost:8020/data/iteblog.txt TPath: tachyon://localhost:19998/data/iteblog.txt
15/08/31 14:15:45 INFO : Loading to /data/iteblog.txt hdfs://localhost:8020/data/iteblog.txt 
15/08/31 14:15:45 INFO : Loading: hdfs://localhost:8020/data/iteblog.txt
15/08/31 14:15:46 INFO : Create tachyon file /data/iteblog.txt/iteblog.txt with file id 15 and checkpoint location hdfs://localhost:8020/data/iteblog.txt
15/08/31 14:15:46 INFO : listStatus(tachyon://localhost:19998/data/iteblog.txt): HDFS Path: hdfs://localhost:8020/data/iteblog.txt
15/08/31 14:15:46 INFO : getFileStatus(tachyon://localhost:19998/data/iteblog.txt/iteblog.txt): 
HDFS Path: hdfs://localhost:8020/data/iteblog.txt/iteblog.txt TPath: tachyon://localhost:19998/data/iteblog.txt/iteblog.txt
15/08/31 14:15:46 INFO mapred.FileInputFormat: Total input paths to process : 1
15/08/31 14:15:46 INFO spark.SparkContext: Starting job: count at :24
15/08/31 14:15:46 INFO scheduler.DAGScheduler: Got job 2 (count at :24) with 2 output partitions (allowLocal=false)
15/08/31 14:15:46 INFO scheduler.DAGScheduler: Final stage: ResultStage 2(count at :24)
15/08/31 14:15:46 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/08/31 14:15:46 INFO scheduler.DAGScheduler: Missing parents: List()
15/08/31 14:15:46 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[4] at textFile at :21), which has no missing parents
15/08/31 14:15:46 INFO storage.MemoryStore: ensureFreeSpace(2992) called with curMem=388541, maxMem=280248975
15/08/31 14:15:46 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 2.9 KB, free 266.9 MB)
15/08/31 14:15:46 INFO storage.MemoryStore: ensureFreeSpace(1828) called with curMem=391533, maxMem=280248975
15/08/31 14:15:46 INFO storage.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 1828.0 B, free 266.9 MB)
15/08/31 14:15:46 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:55566 (size: 1828.0 B, free: 267.2 MB)
15/08/31 14:15:46 INFO spark.SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:874
15/08/31 14:15:46 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 2 (MapPartitionsRDD[4] at textFile at :21)
15/08/31 14:15:46 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
15/08/31 14:15:46 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4, localhost, PROCESS_LOCAL, 1427 bytes)
15/08/31 14:15:46 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 4)
15/08/31 14:15:46 INFO rdd.HadoopRDD: Input split: tachyon://localhost:19998/data/iteblog.txt/iteblog.txt:0+5840
15/08/31 14:15:46 INFO : open(tachyon://localhost:19998/data/iteblog.txt/iteblog.txt, 65536)
15/08/31 14:15:46 INFO : /mnt/ramdisk/tachyonworker/users/2/16106127360 was created!
15/08/31 14:15:46 INFO : Try to find remote worker and read block 16106127360 from 0, with len 11680
15/08/31 14:15:46 INFO : Block locations:[NetAddress(mHost:localhost, mPort:-1, mSecondaryPort:-1)]
15/08/31 14:15:46 INFO : Block locations:[NetAddress(mHost:localhost, mPort:-1, mSecondaryPort:-1)]
15/08/31 14:15:46 INFO : Opening stream from underlayer fs: hdfs://localhost:8020/data/iteblog.txt
15/08/31 14:15:46 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 4). 1830 bytes result sent to driver
15/08/31 14:15:46 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 2.0 (TID 5, localhost, PROCESS_LOCAL, 1427 bytes)
15/08/31 14:15:46 INFO executor.Executor: Running task 1.0 in stage 2.0 (TID 5)
15/08/31 14:15:46 INFO rdd.HadoopRDD: Input split: tachyon://localhost:19998/data/iteblog.txt/iteblog.txt:5840+5840
15/08/31 14:15:46 INFO : open(tachyon://localhost:19998/data/iteblog.txt/iteblog.txt, 65536)
15/08/31 14:15:46 INFO executor.Executor: Finished task 1.0 in stage 2.0 (TID 5). 1830 bytes result sent to driver
15/08/31 14:15:46 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4) in 543 ms on localhost (1/2)
15/08/31 14:15:46 INFO scheduler.DAGScheduler: ResultStage 2 (count at :24) finished in 0.555 s
15/08/31 14:15:46 INFO scheduler.DAGScheduler: Job 2 finished: count at :24, took 0.651055 s
res2: Long = 212
이렇게 해서 우리는iteblog을 얻었다.txt 파일의 줄 수입니다.위에서 실행된 로그를 통해 알 수 있듯이 Tachyon 자체는iteblog입니다.txt 파일이 메모리에 로드되어 자체 파일 시스템에 저장됩니다.tachyon://localhost:19998/data/iteblog.txt/iteblog.txt, Tachyon의 WEB UI 인터페이스에서http://localhost:19999/browse?path=%2Fdata%2Fiteblog.txt%offset=0&limit=1) 이 파일이 표시됩니다.
또한 계산 결과를 Tachyon에 저장할 수도 있습니다.
scala> s.saveAsTextFile("tachyon://localhost:19998/blog")
우리는 Tachyon의 WEB UI 인터페이스에서 이 파일을 볼 수 있고 그 안에iteblog 폴더를 만들었는데 그 안의 데이터는 RDD의 데이터이다.또한 HDFS에는 다음과 같은 RDD 데이터가 저장된 폴더가 생성되어 있습니다.
[blog@node1 hadoop]$  bin/hadoop fs -ls /tachyon/data
Found 6 items
-rwxrwxrwx   3 blog supergroup      13367 2015-08-31 14:02 /tachyon/data/11
-rwxrwxrwx   3 blog supergroup          0 2015-08-31 14:02 /tachyon/data/12
-rwxrwxrwx   3 blog supergroup       5890 2015-08-31 14:21 /tachyon/data/21
-rwxrwxrwx   3 blog supergroup       5790 2015-08-31 14:21 /tachyon/data/23
-rwxrwxrwx   3 blog supergroup          0 2015-08-31 14:21 /tachyon/data/24
-rwxrwxrwx   3 blog supergroup      13491 2015-08-31 14:02 /tachyon/data/9
사실 이 경로는conf/tachyon-env를 통과한다.sh 안에 있는 - Dtachyon.data.folder=$TACHYON_UNDERFS_ADDDRESS/tachyon/data에서 구성합니다.
우리는 또한 RDD cache를 Tachyon에 설치하여 캐시 레벨을 Storage Level로 설정할 수 있습니다.OFF_HEAP만 있으면 됩니다.이것은 GC의 주파수를 줄일 수 있고excutors가 차지하는 자원을 줄일 수 있다. 가장 좋은 것은 서로 다른 Application 간에 RDD의 데이터를 공유할 수 있도록 하는 것이다.예를 들면 다음과 같습니다.
scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel

scala> val data=sc.parallelize(List("www", "iteblog", "com"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at parallelize at :22

scala> val tmp = data.map(item => item +"good")
tmp: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at map at :24

scala> tmp.persist(StorageLevel.OFF_HEAP)
res9: tmp.type = MapPartitionsRDD[8] at map at :24

scala> tmp.count
그리고 Tachyon의 WEB UI 인터페이스에서 캐시된 RDD 메모리 디렉터리를 볼 수 있습니다.RDD는 메모리 파일 시스템에 저장됩니다.사용 과정에서 우리는spark를 통과할 수 있다.externalBlockStore.url 매개 변수 Tachyon file system의 URL을 설정합니다. 기본값은tachyon://localhost:19998;스파크를 통해서.externalBlockStore.baseDir는 Tachyon File System에 RDD를 저장하는 기본 경로를 설정합니다. 기본값은 System입니다.getProperty("java.io.tmpdir")의 값입니다.

좋은 웹페이지 즐겨찾기