spark 1.4 로 컬 모드 프로 그래 밍 연습(1)
WordcountUserMiningTweetMiningHashtagMiningInvertedIndexTest
--------------------------------------------------------------------------------
테스트 코드
package tutorial;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
public class Test {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf conf = new SparkConf().setAppName("test").setMaster("spark://master:7077");
@SuppressWarnings("resource")
JavaSparkContext sc = new JavaSparkContext(conf);
// sc.addJar("/home/sun/jars/myjar.jar");
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
System.out.println(distData.count());
}
}
두 개의 Utils 방법 1.json 코드 분석
package utils;
import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class Parse {
public static Tweet parseJsonToTweet(String jsonLine) {
ObjectMapper objectMapper = new ObjectMapper();
Tweet tweet = null;
try {
tweet = objectMapper.readValue(jsonLine, Tweet.class);
} catch (IOException e) {
e.printStackTrace();
}
return tweet;
}
}
2.실체 류
package utils;
import java.io.Serializable;
public class Tweet implements Serializable {
long id;
String user;
String userName;
String text;
String place;
String country;
String lang;
public String getUserName() {
return userName;
}
public String getLang() {
return lang;
}
public long getId() {
return id;
}
public String getUser() { return user;}
public String getText() {
return text;
}
public String getPlace() {
return place;
}
public String getCountry() {
return country;
}
public void setId(long id) {
this.id = id;
}
public void setUser(String user) {
this.user = user;
}
public void setUserName(String userName) {
this.userName = userName;
}
public void setText(String text) {
this.text = text;
}
public void setPlace(String place) {
this.place = place;
}
public void setCountry(String country) {
this.country = country;
}
public void setLang(String lang) {
this.lang = lang;
}
@Override
public String toString(){
return getId() + ", " + getUser() + ", " + getText() + ", " + getPlace() + ", " + getCountry();
}
}
reduced-tweets.json 데이터 및 테스트 코드 가 져 오기 여 기 를 클릭 하 십시오.
WordCount 코드 블록
package tutorial;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import java.util.Arrays;
/*
* step 1, the mapper:
*
* - 1. (word,1) JavaPairRDD<String, Integer>。 key
*
* step 2, the reducer:
* - .
*
*
*/
public class Wordcount {
private static String pathToFile = "data/wordcount.txt";
public JavaRDD<String> loadData() {
SparkConf conf = new SparkConf()
.setAppName("Wordcount")
.set("spark.driver.allowMultipleContexts", "true")
.setMaster("local[*]"); // here local mode. And * means you will use as much as you have cores.
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> words = sc.textFile(pathToFile).flatMap(new FlatMapFunction<String, String>(){
public Iterable call(String line) throws Exception {
return Arrays.asList( line.split(" ")) ;
}
});
return words;
}
/**
* Now count how much each word appears!
*/
public JavaPairRDD<String, Integer> wordcount() {
JavaRDD<String> words = loadData();
// code here
JavaPairRDD<String, Integer> couples = null;
// code here
JavaPairRDD<String, Integer> result = null;
return result;
}
/**
* Now keep the word which appear strictly more than 4 times!
*/
public JavaPairRDD<String, Integer> filterOnWordcount() {
JavaPairRDD<String, Integer> wordcounts = wordcount();
// TODO write code here
JavaPairRDD<String, Integer> filtered = null;
return filtered;
}
}
UserMining 코드 블록
package tutorial;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import utils.Parse;
import utils.Tweet;
/**
* The Java Spark API documentation:
* http://spark.apache.org/docs/latest/api/java/index.html
*
* 8198 tweet 。 :
*
* {"id":"572692378957430785", "user":"Srkian_nishu :)", "text":
* "@always_nidhi @YouTube no i dnt understand bt i loved of this mve is rocking"
* , "place":"Orissa", "country":"India"}
*
* : user tweet ( user tweet , Srkian_nishu tweet [572692378957430785,...])
*
*/
public class UserMining {
private static String pathToFile = "data/reduced-tweets.json";
public JavaRDD<Tweet> loadData() {
// Create spark configuration and spark context
SparkConf conf = new SparkConf().setAppName("User mining").set("spark.driver.allowMultipleContexts", "true")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load the data and parse it into a Tweet.
// Look at the Tweet Object in the TweetUtils class.
JavaRDD<Tweet> tweets = sc.textFile(pathToFile).map(new Function<String, Tweet>() {
public Tweet call(String line) throws Exception {
// TODO Auto-generated method stub
return Parse.parseJsonToTweet(line);
}
});
return tweets;
}
/**
* For each user return all his tweets
*/
public JavaPairRDD<String, Iterable<Tweet>> tweetsByUser() {
JavaRDD<Tweet> tweets = loadData();
// TODO write code here
// Hint: the Spark API provides a groupBy method
JavaPairRDD<String, Iterable<Tweet>> tweetsByUser = null;
return tweetsByUser;
}
/**
* Compute the number of tweets by user
*/
public JavaPairRDD<String, Integer> tweetByUserNumber() {
JavaRDD<Tweet> tweets = loadData();
// TODO write code here
// Hint: think about what you did in the wordcount example
JavaPairRDD<String, Integer> count = null;
return count;
}
}
TweetMining 코드 블록
package tutorial;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import utils.Parse;
import utils.Tweet;
import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* The Java Spark API documentation:
* http://spark.apache.org/docs/latest/api/java/index.html
** 8198 tweet 。 :
*
* {"id":"572692378957430785", "user":"Srkian_nishu :)", "text":
* "@always_nidhi @YouTube no i dnt understand bt i loved of this mve is rocking"
* , "place":"Orissa", "country":"India"}
*
* : 1. @
* 2. @ , 10 @
*
*
* Use the TweetMiningTest to implement the code.
*/
public class TweetMining implements Serializable {
/**
*
*/
private static String pathToFile = "data/reduced-tweets.json";
/**
* Load the data from the json file and return an RDD of Tweet
*/
public JavaRDD<Tweet> loadData() {
// create spark configuration and spark context
SparkConf conf = new SparkConf().setAppName("Tweet mining").setMaster("spark://master:7077");
conf.set("spark.driver.allowMultipleContexts" ,"true");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.addJar("/home/sun/jars/tutorial-all.jar");
// load the data and create an RDD of Tweet
JavaRDD<Tweet> tweets = sc.textFile("hdfs://master:9000/sparkdata/reduced-tweets.json")
.map(new Function<String, Tweet>() {
public Tweet call(String line) throws Exception {
// TODO Auto-generated method stub
return Parse.parseJsonToTweet(line);
}
});
return tweets;
}
/**
* Find all the persons mentioned on tweets (case sensitive)
*/
public JavaRDD<String> mentionOnTweet() {
JavaRDD<Tweet> tweets = loadData();
// You want to return an RDD with the mentions
// Hint: think about separating the word in the text field and then find
// the mentions
// TODO write code here
JavaRDD<String> mentions = tweets.flatMap(new FlatMapFunction<Tweet, String>() {
public Iterable<String> call(Tweet t) throws Exception {
String text = t.getText();
Set<String> set = new HashSet<String>();
String[] words = text.split(" ");
for (String word : words) {
if (word.startsWith("@")) {
set.add(word);
}
}
return set;
}
});
return mentions;
}
/**
* Count how many times each person is mentioned
*/
public JavaPairRDD<String, Integer> countMentions() {
JavaRDD<String> mentions = mentionOnTweet();
// Hint: think about what you did in the wordcount example
// TODO write code here
JavaPairRDD<String, Integer> mentionCount = mentions.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String t) throws Exception {
return new Tuple2<String, Integer>(t, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer v1, Integer v2) throws Exception {
// TODO Auto-generated method stub
return v1 + v2;
}
});
mentionCount.saveAsTextFile("hdfs://master:9000/sparkdata/tweets-m4");
return mentionCount;
}
/**
* Find the 10 most mentioned persons by descending order
*/
public List<Tuple2<Integer, String>> top10mentions() {
JavaPairRDD<String, Integer> counts = countMentions();
// Hint: take a look at the sorting and take methods
// TODO write code here
List<Tuple2<Integer, String>> mostMentioned = null;
return mostMentioned;
}
public static void main(String[] args) {
Ex2TweetMining ex2TweetMining = new Ex2TweetMining();
JavaPairRDD<String, Integer> res = ex2TweetMining.countMentions();
System.out.println(res.take(1));
}
}
HashtagMining 코드 블록
package tutorial;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
import utils.Parse;
import utils.Tweet;
import java.util.List;
/**
* The Java Spark API documentation: http://spark.apache.org/docs/latest/api/java/index.html
*
* 8198 tweet 。 :
*
* {"id":"572692378957430785", "user":"Srkian_nishu :)", "text":
* "@always_nidhi @YouTube no i dnt understand bt i loved of this mve is rocking"
* , "place":"Orissa", "country":"India"}
*
* : 1. (”#“) 。
* 2. (“#”) (”@“) ,
*
*
*/
public class HashtagMining {
private static String pathToFile = "data/reduced-tweets.json";
/**
* Load the data from the json file and return an RDD of Tweet
*/
public JavaRDD<Tweet> loadData() {
// create spark configuration and spark context
SparkConf conf = new SparkConf()
.setAppName("Hashtag mining")
.set("spark.driver.allowMultipleContexts", "true")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Tweet> tweets = sc.textFile(pathToFile).map(new Function<String, Tweet>() {
public Tweet call(String line) throws Exception {
// TODO Auto-generated method stub
return Parse.parseJsonToTweet(line);
}
});
return tweets;
}
/**
* Find all the hashtags mentioned on tweets
*/
public JavaRDD<String> hashtagMentionedOnTweet() {
JavaRDD<Tweet> tweets = loadData();
// You want to return an RDD with the mentions
// Hint: think about separating the word in the text field and then find the mentions
// TODO write code here
JavaRDD<String> mentions = null;
return mentions;
}
/**
* Count how many times each hashtag is mentioned
*/
public JavaPairRDD<String,Integer> countMentions() {
JavaRDD<String> mentions = hashtagMentionedOnTweet();
// Hint: think about what you did in the wordcount example
// TODO write code here
JavaPairRDD<String, Integer> counts = null;
return counts;
}
/**
* Find the 10 most popular Hashtags by descending order
*/
public List<Tuple2<Integer, String>> top10HashTags() {
JavaPairRDD<String, Integer> counts = countMentions();
// Hint: take a look at the sorting and take methods
// TODO write code here
List<Tuple2<Integer, String>> top10 = null;
return top10;
}
}
InvertedIndex 코드 블록
package tutorial;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import utils.Parse;
import utils.Tweet;
import java.util.Map;
/**
* :
*
* : #spark, tweet1, tweet3, tweet39 。 (#spark, List(tweet1,tweet3, tweet39))
*
*/
public class InvertedIndex {
private static String pathToFile = "data/reduced-tweets.json";
/**
* Load the data from the json file and return an RDD of Tweet
*/
public JavaRDD<Tweet> loadData() {
// create spark configuration and spark context
SparkConf conf = new SparkConf()
.setAppName("Inverted index")
.set("spark.driver.allowMultipleContexts", "true")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<Tweet> tweets = sc.textFile(pathToFile).map(new Function<String, Tweet>() {
public Tweet call(String line) throws Exception {
// TODO Auto-generated method stub
return Parse.parseJsonToTweet(line);
}
});
return tweets;
}
public Map<String, Iterable<Tweet>> invertedIndex() {
JavaRDD<Tweet> tweets = loadData();
// for each tweet, extract all the hashtag and then create couples (hashtag,tweet)
// Hint: see the flatMapToPair method
// TODO write code here
JavaPairRDD<String, Tweet> pairs = null;
// We want to group the tweets by hashtag
// TODO write code here
JavaPairRDD<String, Iterable<Tweet>> tweetsByHashtag = null;
// Then return the inverted index (= a map structure)
// TODO write code here
Map<String, Iterable<Tweet>> map = null;
return map;
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Is Eclipse IDE dying?In 2014 the Eclipse IDE is the leading development environment for Java with a market share of approximately 65%. but ac...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.