RDD Operation (3) 실습:MapReduce

RDD Operation(1) -Transformations 보러 가기
RDD Operation(2) - Actions 보러 가기

실습: MapReduce

  1. Word Counter 구현하기
  2. Titanic 데이터 분석하기

Word Counter 구현하기

MapReduce 개념을 처음 배울 때 다뤄보는 Word Counter을 구현해보겠습니다.

이번에는 스파크 RDD 함수를 이용해 구현해 봅시다.

  • (힌트1) Map 함수를 구현할 때, 입력 스트링의 각 문자 x에 대해 x -> (x, 1) 형태로 tuple로 매핑하면 수월합니다.
  • (힌트2) 일반적인 reduce 함수보다는 reduceNyKey 함수를 사용하는 것을 추천합니다.
text = sc.parrallelize('hello python')

# map 함수를 적용한 RDD 구하기
# [[YOUR CODE]]

#reduceByKey 함수를 적용한 Word Counter 출력
# [[YOUR CODE]]

이제 직접 해볼까요?😭

제가 작성한 코드 (정답 ❌)

text = sc.parrallelize('hello python')

# map 함수를 적용한 RDD 구하기
step_1 = text.filter(lambda x: x != " ")
step_2 = step_1.map(lambda x : (x,1))

#reduceByKey 함수를 적용한 Word Counter 출력
word_counter = step_2.reduceByKey(lambda )
word_counter.collect()

저는 reduceByKey 함수를 적용한 Word Counter 출력 단계에서 lambda 뒤에 어떻게 count를 해줘야 하는지 작성하지 못했습니다.

맞는 답을 살펴보겠습니다.

정답 👇

text = sc.parrallelize('hello python')

# map 함수를 적용한 RDD 구하기
step_1 = text.filter(lambda x: x != " ")
step_2 = step_1.map(lambda x : (x,1))

#reduceByKey 함수를 적용한 Word Counter 출력
word_counter = step_2.reduceByKey(lambda accum, n : accum + n )
word_counter.collect()

accum이 무엇인가 찾아보았는데, sum을 누적하는 함수라고 하네요. 이걸 이용해 변수명을 지정해준 것 같습니다. 실제로 lambda 뒤에 올 이름은 뭐든지 상관은 없겠죠...

결괏값은 어떻게 나올까요? 위의 코드는 무엇을 의미하나요?

우리가 만든 word_counter은

  1. 문자열 생성
  2. (1) 공백 제거 (2) 문자열 인수 1개당 튜플로 만듦
  3. 문자열 갯수의 누적 합, 개수 count

그렇다면 결괏값은

[('h', 2),
 ('e', 1),
 ('l', 2),
 ('o', 2),
 ('p', 1),
 ('y', 1),
 ('t', 1),
 ('n', 1)]

이렇게 나옵니다. h는 두 번이 나오니 ('h', 2)가 된 것을 볼 수 있습니다. 다른 문자도 마찬가지입니다.

Titanic 데이터 분석하기

이번에는 타이타닉에서의 생존자와 사망자의 평균연령을 구해 봅시다. 둘 사이에 얼마나 차이가 날까요?

지금까지 우리가 다루었던 RDD 함수만 이용하면 충분히 풀 수 있는 문제입니다.

  • (힌트) map 함수를 이용해 모든 데이터를 (생존 여부, 연령)의 형태로 바꾸면 생존자, 사망자 각각의 연령의 총합을 쉽게 구할 수 있습니다.
# 이전 스텝에서 CSV 파일을 로딩했던 내역입니다. 
csv_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/train.csv'
csv_data_0 = sc.textFile(csv_path)
csv_data_1 = csv_data_0.filter(lambda line: len(line)>1).map(lambda line: line.split(","))   
columns = csv_data_1.take(1)
csv_data_2 = csv_data_1.filter(lambda line: line[0].isdecimal())
csv_data_3 = csv_data_2.map(lambda line: [(columns[0][i], linedata) for i, linedata in enumerate(line)])

csv_data_3.take(3)

[[('survived', '0'),
  ('sex', 'male'),
  ('age', '22.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '7.25'),
  ('class', 'Third'),
  ('deck', 'unknown'),
  ('embark_town', 'Southampton'),
  ('alone', 'n')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '38.0'),
  ('n_siblings_spouses', '1'),
  ('parch', '0'),
  ('fare', '71.2833'),
  ('class', 'First'),
  ('deck', 'C'),
  ('embark_town', 'Cherbourg'),
  ('alone', 'n')],
 [('survived', '1'),
  ('sex', 'female'),
  ('age', '26.0'),
  ('n_siblings_spouses', '0'),
  ('parch', '0'),
  ('fare', '7.925'),
  ('class', 'Third'),
  ('deck', 'unknown'),
  ('embark_town', 'Southampton'),
  ('alone', 'y')]]

titanic 과정 다시 보기

앞의 실습에서 불러왔던 결괏값이고요. 이 과정이 잘 이해가 안 되시면 위의 링크로 들어가서 다시 천천히 보고 옵시다. 저도 다시 보러 갑니다...😭

이걸 바탕으로 생존자, 사망자의 연령 평균을 구해봅시다.

# csv_data_3을 가공하여 생존자, 사망자의 연령 총합과 사람 수를 각각 구해 봅시다. 
# 이후 각각의 데이터로부터 생존자와 사망자의 평균 연령을 구할 수 있습니다. 

# [[YOUR CODE]]

과연 제가 할 수 있을까요?😨

정답 👇

# 생존자와 사망자의 연령 총합 구하기

# (생존여부, 연령)
csv_data_4 = csv_data_3.map(lambda line:(line[0][1], line[2][1]))

age_sum_data = csv_data_4.reduceByKey(lambda accum, age: float(accum) + float(age))  
age_sum = age_sum_data.collect()

# 생존자와 사망자의 사람 수 구하기
csv_data_5 = csv_data_3.map(lambda line:(line[0][1], 1))
survived_data = csv_data_5.reduceByKey(lambda accum, count: int(accum) + int(count)) 
survived_count = survived_data.collect()

age_sum_dict = dict(age_sum)
survived_dict = dict(survived_count)
avg_age_survived = age_sum_dict['1']/survived_dict['1']
print('생존자 평균 연령:' ,avg_age_survived)
avg_age_died = age_sum_dict['0']/survived_dict['0']
print('사망자 평균 연령:' ,avg_age_died)

아 한 눈에 잘 보이지 않는 건 왜일까요😓 여기까지가 정말 마지막입니다. 포기할 순 없으니 같이 뜯어 봅시다...

앞의 csv_data_3은 (col1, data1), (col2, data2), ... 의 형태로 불러오는 과정이었고요. 이제 이걸 활용하여 평균을 구해야 합니다. 첫 번째 스텝으로 가볼까요?

csv_data_4 = csv_data_3.map(lambda line:(line[0][1], line[2][1])) 

이건 생존여부와 연령을 불러오는 코드입니다. 생존 1, 비생존 0에 따라 연령의 합을 보여줍니다.

# 생존자와 사망자의 사람 수 구하기
csv_data_5 = csv_data_3.map(lambda line:(line[0][1], 1))
survived_data = csv_data_5.reduceByKey(lambda accum, count: int(accum) + int(count)) 
survived_count = survived_data.collect()

평균을 구해주려면 연령의 합계에 생존자와 사망자의 수만큼 나누어 줘야 할테니 누적 합을 또 구합니다.

age_sum_dict = dict(age_sum)
survived_dict = dict(survived_count)
avg_age_survived = age_sum_dict['1']/survived_dict['1']
print('생존자 평균 연령:' ,avg_age_survived)
avg_age_died = age_sum_dict['0']/survived_dict['0']
print('사망자 평균 연령:' ,avg_age_died)

마지막 코드를 통해 생존자/사망자 평균 연령을 계산하고 값을 출력합니다!

생존자 평균 연령: 29.110411522633743
사망자 평균 연령: 29.9609375

결과는 이렇게 나왔는데요.

제 생각엔 데이터셋이 크고 복잡한 만큼 어려운 것 같습니다. 그만큼 전처리도 중요하겠죠. 언제쯤 익숙해질까요. 그래도 여기까지 수고 많았습니다!

좋은 웹페이지 즐겨찾기