Spark Streaming에서 Twitter 구문 분석 (Twitter4J 사용)

여기을 참조하여 Apache Spark의 Spark Streaming을 사용하여 실시간 트위터 구문 분석 프로세스를 시도한 단계를 요약합니다. 실행 환경은 다음과 같습니다.

· CentOS 7.5
· 아파치 스파크 2.3.1
· Scala 2.12.6
・쿠로모지 0.7.7
· Spark Streaming Twitter 2.10 rev 1.1.0
・Twitter4J 3.0.3

Twitter 분석 실행 결과



'iPhone6'이 포함된 트윗 중에 나타나는 단어와 그 빈도를 계산

Apache Spark, Scala 및 sbt 설치는 여기의 절차에 따라 수행되었습니다.

kuromoji 설치



SPARK_HOME에 kuromoji (일본어 형태소 해석 엔진) 다운로드 및 배포
# cd /usr/local/lib/spark
# wget https://github.com/downloads/atilika/kuromoji/kuromoji-0.7.7.zip
# yum -y install unzip
# unzip kuromoji-0.7.7.zip
# rm kuromoji-0.7.7.zip

Twitter4J 용 Spark Streaming 관련 라이브러리 다운로드



여기을 참조하여 필요한 모듈을 다운로드하고 배포
logging.jar 다운로드를 위의 단계에 추가
# cd /usr/local/lib/spark
# mkdir twitter4j
# cd twitter4j
# wget https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-twitter_2.10/1.1.0/spark-streaming-twitter_2.10-1.1.0.jar 
# wget https://raw.githubusercontent.com/swordsmanliu/SparkStreamingHbase/master/lib/spark-core_2.11-1.5.2.logging.jar   

# curl -O http://twitter4j.org/archive/twitter4j-3.0.3.zip
# unzip -j ./twitter4j-3.0.3.zip "lib/*.jar" (jarファイルのみ展開)
# rm twitter4j-3.0.3.zip


Spark-shell 시작



--jars에서 Twitter 용 라이브러리 및 kuromoji를 지정하고로드
spark-shell --master local[2] --jars /usr/local/lib/spark/twitter4j/spark-streaming-twitter_2.10-1.1.0.jar,/usr/local/lib/spark/twitter4j/twitter4j-core-3.0.3.jar,/usr/local/lib/spark/twitter4j/twitter4j-media-support-3.0.3.jar,/usr/local/lib/spark/twitter4j/twitter4j-async-3.0.3.jar,/usr/local/lib/spark/twitter4j/twitter4j-examples-3.0.3.jar,/usr/local/lib/spark/twitter4j/twitter4j-stream-3.0.3.jar,/usr/local/lib/spark/twitter4j/spark-core_2.11-1.5.2.logging.jar,/usr/local/lib/spark/kuromoji-0.7.7/lib/kuromoji-0.7.7.jar


집계 스크립트(Twitter4J를 이용) 실행



이번에는 Twitter의 Java용 라이브러리인 Twitter4J를 이용하여 액세스한다.
Twitter API Key(Access Token 등 자격 증명)의 지정은, 여기 를 참고로 Key를 취득한다

(2018/7/24부터) 상기 Key의 취득에 즈음하여, 개발자 포털에서, 개발자용 어카운트의 등록이 사전에 필요하게 되어, API의 이용 목적등을 영어 300문자 이상으로 기술 위에서의 어카운트 등록 가 필수입니다. ※참고
import org.apache.spark.streaming._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkConf
import java.util.regex._
import org.atilika.kuromoji._
import org.atilika.kuromoji.Tokenizer._

System.setProperty("twitter4j.oauth.consumerKey", "xxxxxxxxxxxxxxxxxxx")
System.setProperty("twitter4j.oauth.consumerSecret", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
System.setProperty("twitter4j.oauth.accessToken", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
System.setProperty("twitter4j.oauth.accessTokenSecret", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")

val ssc = new StreamingContext(sc, Seconds(60))
val stream = TwitterUtils.createStream(ssc, None, Array("iPhone6"))

val tweetStream = stream.flatMap(status => {
   val tokenizer : Tokenizer = Tokenizer.builder().build()
   val features : scala.collection.mutable.ArrayBuffer[String] = new collection.mutable.ArrayBuffer[String]()
   var tweetText : String = status.getText()

   val japanese_pattern : Pattern = Pattern.compile("[\\u3040-\\u309F]+")
   if(japanese_pattern.matcher(tweetText).find()) {
     tweetText = tweetText.replaceAll("http(s*)://(.*)/", "").replaceAll("\\uff57", "")
     val tokens : java.util.List[Token] = tokenizer.tokenize(tweetText) 
     val pattern : Pattern = Pattern.compile("^[a-zA-Z]+$|^[0-9]+$")
     for(index <- 0 to tokens.size()-1) {
       val token = tokens.get(index)
       val matcher : Matcher = pattern.matcher(token.getSurfaceForm())
       if(token.getSurfaceForm().length() >= 3 && !matcher.find()) {
         features += (token.getSurfaceForm() + "-" + token.getAllFeatures())
       }
     }
   }
   (features)
})
val topCounts60 = tweetStream.map((_, 1)
                  ).reduceByKeyAndWindow(_+_, Seconds(60*60)
                  ).map{case (topic, count) => (count, topic)
                  }.transform(_.sortByKey(false))
topCounts60.foreachRDD(rdd => {
   val topList = rdd.take(20)
   println("\n Popular topics in last 60*60 seconds (%s words):".format(rdd.count()))
   topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
ssc.start()
ssc.awaitTermination()


※error: object atilika is not a member of package org 가 표시되는 경우는
spark-shell 시작 메시지 검토 또는 kuromoji 설치 재실행
(그 외의 모듈도 마찬가지)

실행 결과



아래가 집계 결과 이미지입니다.

좋은 웹페이지 즐겨찾기