실시간 빅데이터 처리를 위한 Spark & Flink Online 5) Key-Value RDD Operations & Joins

Key-Value RDD Transformations & Actions

Transformations

  • groupByKey
  • reduceByKey
  • mapValues
  • keys
  • join (+ leftOuterJoin, rightOuterJoin)

Actions

  • countByKey

groupByKey

groupBy: 주어지는 함수를 기준으로 Group

  • groupBy
>>> rdd = sc.parallelize([1,1,3,5,8])
>>> result = rdd.groupBy(lambda x: x%2).collect()
>>> sorted([(x, sorted(y)) for (x,y) in result])
[(0, [2,8]), (1, [1,1,3,5])]

groupByKey: 주어지는 Key를 기준으로 Group

>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.groupByKey().mapValues(len).collect())
[('a',2), ('b',1)]
>>> sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1,1]), ('b', [1])]

groupBy 예제

grouped = sc.parallelize([
]).groupBy(lambda x: x[0]).collect()

for k,v in grouped:
	print(k, list(v))

###
J ['java']
C ['C', 'C++', 'C#']
P ['Python']

groupByKey 예제

x = sc.parallelize([
	("MATH", 7), ("MATH", 2), ("ENGLISH", 7),
    ("SCIENCE", 7), ("ENGLISH", 4), ("ENGLISH", 9),
    ("MATH", 8), ("MATH", 3), ("ENGLISH", 4),
    ("SCIENCE", 6), ("SCIENCE", 9), ("SCIENCE", 5)], 3)
    
y = x.groupByKey()

print(y.getNumPartitions())
# 3

y = x.groupByKey(2)
print(y.getNumPartitions())
# 2

for t in y.collect():
	print(t[0], list(t[1]))

# MATH [7,2,8,3]
# ENGLISH [7,4,9,4]
# SCIENCE [7,6,9,5]

reduceByKey

reduce: 주어지는 함수를 기준으로 요소들을 합침 (action)

>>> sc.parallelize([1,2,3,4,5]).reduce(add)
15

reduceByKey: Key를 기준으로 그룹을 만들고 합침 (trans)

>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]

ReduceByKey

  • 개념적으로는 groupByKey+reduction
  • 하지만 groupByKey보다 훨씬 빠르다 (원리는 다음 강의에서)
x = sc.parallelize([
	("MATH", 7), ("MATH", 2), ("ENGLISH", 7),
    ("SCIENCE", 7), ("ENGLISH", 4), ("ENGLISH", 9),
    ("MATH", 8), ("MATH", 3), ("ENGLISH", 4),
    ("SCIENCE", 6), ("SCIENCE", 9), ("SCIENCE", 5)], 3)
    
x.reduceByKey(lambda a,b: a+b).collect()
# [('MATH', 20), ('ENGLISH', 24), ('SCIENCE', 27)]

mapValues

  • 함수를 벨류에게만 적용한다
  • 파티션과 키는 그대로
x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
>>> def f(x): return len(x)
>>> x.mapValues(f).collect()
# [('a', 3), ('b', 1)]

파티션과 키를 왔다갔다하려면 네트워크 코스트가 크기 때문에 mapValues 사용 시 성능 향상

countByKey

  • 각 키가 가진 요소들을 센다
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.countByKey().items())
# [('a',2), ('b',1)]

Keys

  • Transformation

    파티션을 유지하거나 Key가 굉장히 많은 경우가 있기 때문에 transformation

  • 모든 Key를 가진 RDD를 생성
m = sc.prarallelize([(1,2), (3,4)]).keys()
>>> m.collect()

# [1,3]

Joins

  • Transformation
  • 여러개의 RDD를 합치는데 사용
  • 대표적으로 두 가지의 Join 방식이 존재
    • Inner Join (join)
      서로 연관된 데이터만 가져온다
    • Outer Join (left outer, right outer)
      한쪽에는 데이터가 있고 다른쪽에 없는 경우, 데이터가 있는 쪽의 데이터를 출력한다
      leftOuterJoin: 왼쪽에 있는 데이터를 모두 출력, 반대편에 데이터가 없는 경우 None
      rightOuterJoin: 오른쪽에 있는 데이터를 모두 출력, 반대편에 데이터가 없는 경우 None
rdd1 = sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
rdd2 = sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6), ("zoo", 1)])

rdd1.join(rdd2).collect()
# [("bar", (2, 5)), ("bar", (2, 6)), ("foo", (1, 4))]

rdd1.leftOuterJoin(rdd2).collect()
# [("baz", (3, None)), ("bar", (2, 5)), ("bar", (2, 6)), ("foo", (1, 4))]

rdd1.rightOuterJoin(rdd2).collect()
# [("bar", (2, 5)), ("bar", (2, 6)), ("zoo", (None, 1)), ("foo", (1, 4))]

요약

  • Key-Value (Pairs) RDD Operations
    • groupByKey
    • reduceByKey
    • mapValues
    • keys
    • countByValue
  • Joins
    • Inner join
    • Outer join
  • 배운것 외에 여러 Operation이 존재

좋은 웹페이지 즐겨찾기