DataPipeline/Spark

스파크완벽가이드 - 13장 RDD 고급개념

wave35 2024. 8. 17. 10:59

 

핵심주제
- 집계와 키-값 형태의 RDD
- 사용자 정의 파티셔닝
- RDD 조인

 

 

13.1 키-값 형태의 기초

words.map(lambda word: (word.lower(), 1)).take(5)
>>> [('spark', 1), ('the', 1), ('definitive', 1), ('guide', 1), (':', 1)]


- [1] key-value 구조로 만들기 ( 튜플 )
keyword = words.keyBy(lambda word: word.lower()[0])
>>> [('s', 'Spark'), ('t', 'The'), ('d', 'Definitive'), ('g', 'Guide'), (':', ':')]


- [2] key, value 값 추출하기
keyword.keys().collect()
keyword.values().collect()


- [3] lookup
- 특정 키에 관한 결과를 찾음 ( 하나의 키만 찾는 메소드는 없음 )
keyword.lookup("s")

 

 

13.2 집계

chars = words.flatMap(lambda word: word.lower())
KVcharacters = chars.map(lambda letter: (letter, 1))
KVcharacters.take(5)
>>> [('s', 1), ('p', 1), ('a', 1), ('r', 1), ('k', 1)]


- [1] countByKey 메서드는 각 키의 아이템 수를 구함
KVcharacters.countByKey()
>>> defaultdict(<class 'int'>, {'s': 4, 'p': 3, 'a': 4, 'r'....})


- [2] groupByKey 
from functools import reduce
def addFunc(left, right):
  return left + right
  
KVcharacters.groupByKey().map(lambda row: (row[0], reduce(addFunc, row[1]))).collect()
>>> [('p', 3), ('t', 3), ('d', 4), ('g', 3), ('b', 1), ('o', 1) ...]

 

 

13.4 조인

- 구조적 API와 동일한 조인 방식이지만, RDD를 사용하면 사용자가 많은 부분에 관여해야 함
- inner join


import random

distinctChars = words.flatMap(lambda word: word.lower()).distinct()
keyedChars = distinctChars.map(lambda c: (c, random.random()))
>>> [('p', 0.07471830168125726), ('t', 0.5281442744239391), ('d', 0.6968018446068369)...]

KVcharacters.join(keyedChars).count()
KVcharacters.join(keyedChars, outputPartitions).count()
>>> 51

 

 

13.5 파티션 제어하기

- RDD를 사용하면 데이터가 클러스터 전체에 물리적으로 정확히 분산되는 방식을 정의할 수 있음

- [1] coalesce
- 파티션을 재분배 할 때 데이터 셔플을 방지하기 위해 동일한 워커에 존재하는 파티션을 합치는 메서드
words.coalesce(1).getNumPartitions() # 1


- [2] repartition
- 파티션 수를 늘리거나 줄일 수 있고, 노드 간의 셔플이 발생 함
words.repartitions(10) # 10개의 파티션 생성


- [3] 사용자 정의 파티션
- 페이지랭크(PageRank)는 사용자 정의 파티셔닝을 이용해 클러스터 전체의 데이터를 균등하게 분배
- 과정은, 구조적API로 RDD를 얻고 사용자 정의 파티셔너를 적용하고 다시 구조적API로 변환
- 사용자정의 파티셔너를 사용하려면 Partitioner 클래스 통해 구현  

- 예제 1) 기본 제공 HashPartitioner, RangePartitioner
df = spark.read.option("header", "true").option("inferSchema", "true")\
  .csv("/data/retail-data/all/")
rdd = df.coalesce(10).rdd


- 예제 2) 17850, 12583 고객의 데이터가 커서 따로 나누어 처리 
def partitionFunc(key):
  import random
  if key == 17850 or key == 12583:
    return 0
  else:
    return random.randint(1,2)
    
keyedRDD = rdd.keyBy(lambda row: row[6])
keyedRDD\
  .partitionBy(3, partitionFunc)\
  .map(lambda x: x[0])\
  .glom()\
  .map(lambda x: len(set(x)))\
  .take(5)