RDD Creation | parallelize(), .textFile()

목차

  1. parallelize()
  2. .textFile()

parallelize()

parallelize() 함수는 내부에서 만들어진 데이터 집합을 병렬화하는 방법입니다.

방금 전 만들었던, SparkContext()의 parallelize() 함수를 이용해 내부의 데이터 집합을 RDD로 만들 수 있습니다

SparkContext() 만드는 법

RDD 생성

>>> rdd = sc.parallelize([1,2,3])
>>> rdd

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:262

위의 결괏값은 무엇을 의미하나요?

>>> type(rdd)
 
pyspark.rdd.RDD

데이터 타입을 확인해보니 RDD가 생성이 된 것을 확인 할 수 있습니다.

이 그림을 통해 보았듯이, RDD는 생성과 transformations 연산을 바로 수행하지 않습니다. 이 단계에서는 연산을 하고 있지 않고 계보(lineage)만 만들어 놓고 Actions 동작을 할 때, RDD가 비로소 만들어지며, 이를 느긋한 계산법으로 소개했씁니다.

하지만 위의 방법은 parallelize()를 사용했기 때문에 RDD가 만들어진 것은 아닙니다.

이걸 Actions 해보겠습니다.

함수는 take() 함수를 사용합니다. RDD 원소를 반환합니다. 인자로는 반환하고자 하는 RDD 원소의 개수를 입력합니다.

Actions 수행

>>> rdd.take(3)

[1, 2, 3]

take() 함수

위의 이미지를 보시면 take 함수의 이해가 빠를 것입니다.

.textFile()

.textFile() 함수는 외부의 파일을 로드하여 RDD를 만듭니다.

RDD 생성

import os

file_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/test.txt'
with open(file_path, 'w') as f:
	for i in range(10):
    		f.write(str(i)+'\n')

print('ok')

OK

파일이 만들어졌으니 방금 만든 파일을 불러와서 RDD를 생성해봅시다.

rdd2 = sc.textfile(file_path)
print(rdd)
print(type(rdd2))

/aiffel/aiffel/bigdata_ecosystem/test.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0
<class 'pyspark.rdd.RDD'>

Actions 수행

rdd2.take(3)

['0', '1', '2']

위와 다른 점이 있죠.

[1, 2, 3]['0', '1', '2']

한 가지 특이사항이 있다면, 숫자를 입력했으므로 [1,2,3] 의 리스트가 될 줄 알았는데 문자열 list가 만들어졌습니다.

이것은 spark가 .textFile()을 통해 얻어온 데이터 타입을 무조건 string으로 처리하기 때문에 그렇습니다.

그렇다면 이 데이터를 숫자로 변환하려면 어떻게 해야할까요?

그걸 도와주는 것이 Transformation 같은 RDD Operation입니다!

좋은 웹페이지 즐겨찾기