RDD Operation (3) 실습:MapReduce
RDD Operation(1) -Transformations 보러 가기
RDD Operation(2) - Actions 보러 가기
실습: MapReduce
- Word Counter 구현하기
- Titanic 데이터 분석하기
Word Counter 구현하기
MapReduce 개념을 처음 배울 때 다뤄보는 Word Counter을 구현해보겠습니다.
이번에는 스파크 RDD 함수를 이용해 구현해 봅시다.
- (힌트1) Map 함수를 구현할 때, 입력 스트링의 각 문자 x에 대해 x -> (x, 1) 형태로 tuple로 매핑하면 수월합니다.
- (힌트2) 일반적인 reduce 함수보다는 reduceNyKey 함수를 사용하는 것을 추천합니다.
text = sc.parrallelize('hello python')
# map 함수를 적용한 RDD 구하기
# [[YOUR CODE]]
#reduceByKey 함수를 적용한 Word Counter 출력
# [[YOUR CODE]]
이제 직접 해볼까요?😭
제가 작성한 코드 (정답 ❌)
text = sc.parrallelize('hello python')
# map 함수를 적용한 RDD 구하기
step_1 = text.filter(lambda x: x != " ")
step_2 = step_1.map(lambda x : (x,1))
#reduceByKey 함수를 적용한 Word Counter 출력
word_counter = step_2.reduceByKey(lambda )
word_counter.collect()
저는 reduceByKey 함수를 적용한 Word Counter 출력 단계에서 lambda 뒤에 어떻게 count를 해줘야 하는지 작성하지 못했습니다.
맞는 답을 살펴보겠습니다.
정답 👇
text = sc.parrallelize('hello python')
# map 함수를 적용한 RDD 구하기
step_1 = text.filter(lambda x: x != " ")
step_2 = step_1.map(lambda x : (x,1))
#reduceByKey 함수를 적용한 Word Counter 출력
word_counter = step_2.reduceByKey(lambda accum, n : accum + n )
word_counter.collect()
accum이 무엇인가 찾아보았는데, sum을 누적하는 함수라고 하네요. 이걸 이용해 변수명을 지정해준 것 같습니다. 실제로 lambda 뒤에 올 이름은 뭐든지 상관은 없겠죠...
결괏값은 어떻게 나올까요? 위의 코드는 무엇을 의미하나요?
우리가 만든 word_counter은
- 문자열 생성
- (1) 공백 제거 (2) 문자열 인수 1개당 튜플로 만듦
- 문자열 갯수의 누적 합, 개수 count
그렇다면 결괏값은
[('h', 2),
('e', 1),
('l', 2),
('o', 2),
('p', 1),
('y', 1),
('t', 1),
('n', 1)]
이렇게 나옵니다. h는 두 번이 나오니 ('h', 2)가 된 것을 볼 수 있습니다. 다른 문자도 마찬가지입니다.
Titanic 데이터 분석하기
이번에는 타이타닉에서의 생존자와 사망자의 평균연령을 구해 봅시다. 둘 사이에 얼마나 차이가 날까요?
지금까지 우리가 다루었던 RDD 함수만 이용하면 충분히 풀 수 있는 문제입니다.
- (힌트) map 함수를 이용해 모든 데이터를 (생존 여부, 연령)의 형태로 바꾸면 생존자, 사망자 각각의 연령의 총합을 쉽게 구할 수 있습니다.
# 이전 스텝에서 CSV 파일을 로딩했던 내역입니다.
csv_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/train.csv'
csv_data_0 = sc.textFile(csv_path)
csv_data_1 = csv_data_0.filter(lambda line: len(line)>1).map(lambda line: line.split(","))
columns = csv_data_1.take(1)
csv_data_2 = csv_data_1.filter(lambda line: line[0].isdecimal())
csv_data_3 = csv_data_2.map(lambda line: [(columns[0][i], linedata) for i, linedata in enumerate(line)])
csv_data_3.take(3)
[[('survived', '0'),
('sex', 'male'),
('age', '22.0'),
('n_siblings_spouses', '1'),
('parch', '0'),
('fare', '7.25'),
('class', 'Third'),
('deck', 'unknown'),
('embark_town', 'Southampton'),
('alone', 'n')],
[('survived', '1'),
('sex', 'female'),
('age', '38.0'),
('n_siblings_spouses', '1'),
('parch', '0'),
('fare', '71.2833'),
('class', 'First'),
('deck', 'C'),
('embark_town', 'Cherbourg'),
('alone', 'n')],
[('survived', '1'),
('sex', 'female'),
('age', '26.0'),
('n_siblings_spouses', '0'),
('parch', '0'),
('fare', '7.925'),
('class', 'Third'),
('deck', 'unknown'),
('embark_town', 'Southampton'),
('alone', 'y')]]
앞의 실습에서 불러왔던 결괏값이고요. 이 과정이 잘 이해가 안 되시면 위의 링크로 들어가서 다시 천천히 보고 옵시다. 저도 다시 보러 갑니다...😭
이걸 바탕으로 생존자, 사망자의 연령 평균을 구해봅시다.
# csv_data_3을 가공하여 생존자, 사망자의 연령 총합과 사람 수를 각각 구해 봅시다.
# 이후 각각의 데이터로부터 생존자와 사망자의 평균 연령을 구할 수 있습니다.
# [[YOUR CODE]]
과연 제가 할 수 있을까요?😨
정답 👇
# 생존자와 사망자의 연령 총합 구하기
# (생존여부, 연령)
csv_data_4 = csv_data_3.map(lambda line:(line[0][1], line[2][1]))
age_sum_data = csv_data_4.reduceByKey(lambda accum, age: float(accum) + float(age))
age_sum = age_sum_data.collect()
# 생존자와 사망자의 사람 수 구하기
csv_data_5 = csv_data_3.map(lambda line:(line[0][1], 1))
survived_data = csv_data_5.reduceByKey(lambda accum, count: int(accum) + int(count))
survived_count = survived_data.collect()
age_sum_dict = dict(age_sum)
survived_dict = dict(survived_count)
avg_age_survived = age_sum_dict['1']/survived_dict['1']
print('생존자 평균 연령:' ,avg_age_survived)
avg_age_died = age_sum_dict['0']/survived_dict['0']
print('사망자 평균 연령:' ,avg_age_died)
아 한 눈에 잘 보이지 않는 건 왜일까요😓 여기까지가 정말 마지막입니다. 포기할 순 없으니 같이 뜯어 봅시다...
앞의 csv_data_3은 (col1, data1), (col2, data2), ... 의 형태로 불러오는 과정이었고요. 이제 이걸 활용하여 평균을 구해야 합니다. 첫 번째 스텝으로 가볼까요?
csv_data_4 = csv_data_3.map(lambda line:(line[0][1], line[2][1]))
이건 생존여부와 연령을 불러오는 코드입니다. 생존 1, 비생존 0에 따라 연령의 합을 보여줍니다.
# 생존자와 사망자의 사람 수 구하기
csv_data_5 = csv_data_3.map(lambda line:(line[0][1], 1))
survived_data = csv_data_5.reduceByKey(lambda accum, count: int(accum) + int(count))
survived_count = survived_data.collect()
평균을 구해주려면 연령의 합계에 생존자와 사망자의 수만큼 나누어 줘야 할테니 누적 합을 또 구합니다.
age_sum_dict = dict(age_sum)
survived_dict = dict(survived_count)
avg_age_survived = age_sum_dict['1']/survived_dict['1']
print('생존자 평균 연령:' ,avg_age_survived)
avg_age_died = age_sum_dict['0']/survived_dict['0']
print('사망자 평균 연령:' ,avg_age_died)
마지막 코드를 통해 생존자/사망자 평균 연령을 계산하고 값을 출력합니다!
생존자 평균 연령: 29.110411522633743
사망자 평균 연령: 29.9609375
결과는 이렇게 나왔는데요.
제 생각엔 데이터셋이 크고 복잡한 만큼 어려운 것 같습니다. 그만큼 전처리도 중요하겠죠. 언제쯤 익숙해질까요. 그래도 여기까지 수고 많았습니다!
Author And Source
이 문제에 관하여(RDD Operation (3) 실습:MapReduce), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://velog.io/@cha-suyeon/RDD-Operation-3-실습MapReduce저자 귀속: 원작자 정보가 원작자 URL에 포함되어 있으며 저작권은 원작자 소유입니다.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)