「Spark에 의한 실천 데이터 해석」을 실천

15146 단어 스파크EMRScala

소개



Spark를 통한 실습 데이터 분석


Spark의 공부로 이쪽의 서적을 읽기 시작했으므로, 비망록적으로 실시 내용을 쓰고 싶습니다.
기본적으로 서적의 내용과 같은 일을 하기 때문에, 세세한 설명은 하고 있지 않습니다.

Spark란?



Apache Spark는 여러 머신으로 구성된 클러스터에 걸쳐 프로그램을 분산시키는 엔진을 엔진에 프로그램을 작성하기 위한 정교한 모델과 결합한 오픈 소스 프레임워크입니다.
(본문에서 발췌)

대규모 데이터 처리를 빠르게 수행하기 위한 분산 프레임워크로 Python, Scala, SQL에서 이용할 수 있습니다. 본 기사에서는 서적의 흐름에 따라 Scala에 의한 구현이나 MLlib라는 기계 학습 라이브러리를 사용해보고 싶습니다. Scala는커녕 Java조차 제대로 만지지 않았지만, 배우는 것보다 익숙해지면서 진행합니다.

환경 준비



이번에는 빠르고 AWS 공부의 의미도 담아 AWS EMR Notebook을 사용하고 싶습니다.
Amazon EMR > 노트북 > 노트북 만들기로 순식간에 노트북을 만들 수 있습니다.


2장: Scala와 Spark를 이용한 데이터 분석 소개



Scala를 사용하여 기본 집계를 수행합니다.
캘리포니아 대학 어바인의 Machine Learning Repository에 포함된 샘플 데이터 세트를 데이터로 사용합니다.

클러스터에서 클라이언트로 데이터 전송


// csvファイルからRDDを作成
val rawblocks = sc.textFile("s3://s3上ファイルのパス") 

// RDDの先頭の要素を取得
rawblocks.first

// 最初の10行を取得
val head = rawblocks.take(10)
head.foreach(println)

// ヘッダ行を除く関数
def isHeader(line: String): Boolean = {
    line.contains("id_1")
}
//  ヘッダ行を表示
head.filter(isHeader).foreach(println)

// ヘッダ行以外を取得
head.filter(x => ! isHeader(x)).length
// head.filter(!isHeader(_)).length

// RDDに対してフィルタリング
val noheader = rawblocks.filter(x => !isHeader(x))

noheader.first 

데이터 구조화



레코드의 요소 (환자 1ID, 환자 2ID, 매치 스코어, 매치 판정)를 튜플로 퍼스
val line = head(5)
val pieces = line.split(',')

// 各要素を型変換
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val matched = pieces(11).toBoolean

val rawscores = pieces.slice(2,11)
def toDouble(s:String) = {
    if ("?".equals(s)) Double.NaN else s.toDouble
}

// パースされた値をタプルに入れて返す関数
def parse(line:String) = {
    val pieces = line.split(',')
    val id1 = pieces(0).toInt
    val id2 = pieces(1).toInt
    val scores = pieces.slice(2,11).map(toDouble)
    val matched = pieces(11).toBoolean
    (id1, id2, scores, matched)
}
val tup = parse(line)
// タプルの操作
tup._1
tup.productElement(0)
tup.productArity
// ケースクラスを利用して、名前レコードを作成する
case class MatchData(id1: Int, id2:Int, scores: Array[Double], matched: Boolean)

def parse(line:String) = {
    val pieces = line.split(',')
    val id1 = pieces(0).toInt
    val id2 = pieces(1).toInt
    val scores = pieces.slice(2,11).map(toDouble)
    val matched = pieces(11).toBoolean
    MatchData(id1, id2, scores, matched)
}
val md = parse(line)
md.matched
md.id1
md.scores
// 配列headに適用
val mds = head.filter(x => !isHeader(x)).map(x => parse(x))
// RDDに適用
// 実際に適用されるのは、RDDに対して何らかの出力呼び出しがあった時
val parsed = noheader.map(line => parse(line))
// パースされた結果をキャッシュしておく
parsed.cache()

집계 및 히스토그램 작성


// ローカルデータに対して集計
val grouped = mds.groupBy(md => md.matched)
grouped.mapValues(x => x.size).foreach(println)

val matchCounts = parsed.map(md => md.matched).countByValue

val matchCountsSeq = matchCounts.toSeq //seq型に変更
matchCountsSeq.sortBy(_._1).foreach(println)
  • 결과
  •     matchCountsSeq: Seq[(Boolean, Long)] = ArrayBuffer((true,20931), (false,5728201))
        (false,5728201)
        (true,20931)
    

    정리 · 향후 목표



    scala로 spark 프로그램을 실행할 수있었습니다.
    다음은 Mllib를 이용한 기계 학습 처리가 가능하다고 생각합니다.

    좋은 웹페이지 즐겨찾기