트랜스포메이션
Narrow Transformation
- 각 파티션이 독립적으로 처리되어 다른 파티션과 데이터 셔플이 필요 없는 변환입니다.
- 예시: map, filter, flatMap.
- 특징: 빠르고 메모리 효율적입니다.
Wide Transformation
- 데이터가 다른 파티션으로 이동하거나 셔플링이 필요한 변환입니다.
- 예시: groupByKey, reduceByKey, join.
- 특징: 데이터 셔플링으로 인해 느리고 자원 소모가 큽니다.
Group by Query
spark.sql.adaptive.enable
해당 옵션은 Spark SQL에서 Adaptive Query Execution(AQE) 기능을 활성화할지 여부를 결정합니다.
AQE는 Spark 3.0부터 도입된 기능으로,
쿼리 실행 도중 발생할 수 있는 다양한 실행 계획을 동적으로 조정하여 성능을 최적화하는 역할을 합니다.
셔플 파티션을 자동적으로 튜닝하기 때문에 비활성화 시킵니다.
SET spark.sql.adaptive.enabled = FALSE
Stage 확인
먼저 wide transformation이 일어나는group by 쿼리를 실행합니다.
# 소요시간 13초
SELECT `destination`, count(*) AS count
FROM flights
GROUP BY `destination`
ORDER BY count DESC
쿼리 실행 후 Spark UI를 살펴보면
- 1 stage with 7 tasks
- 1 stage with 200 tasks
두개의 스테이지를 확인할 수 있습니다.
spark.sql.shuffle.partition
spark.sql.shuffle.partition 설정은 셔플(wide transformation) 파티션을 조정합니다.
기본적으로 200 값으로 할당되어있어 데이터가 크든 적든 파티션이 생성됩니다.
- stage 6
Spark UI를 살펴보면 파티션이 200개로 설정되어있어,
실제 레코드가 없음에도 Shuffle Read를 실행하는 것을 볼 수 있습니다.
명시적으로 파티션 셔플을 7로 조정합니다.
SET spark.sql.shuffle.partitions=7
쿼리를 재실행합니다.
실행시간이 줄어든 것을 볼 수 있습니다.
# 소요시간 4.5초
SELECT `destination`, count(*) AS count
FROM flights
GROUP BY `destination`
ORDER BY count DESC
Spark UI를 다시한번 살펴보면
Shuffle Read Size가 적절하게 분배되었음을 확인 할 수 있습니다.
Count Query
분산된 환경에서 count를 구현할 때는 2개의 스테이지가 구현됩니다.
- 각 task는 로컬환경에서 집계를 수행한 다음 (stage 10)
- 로컬에서 수행한 집계 결과를 모두 읽어서 결과를 냅니다. (stage11)
- stage 10
7개의 task에서
Shuffle Write Size : 59B
Records : 1
인 것을 확인할 수 있습니다.
- stage 11
Shuffle Read Size와 Records가
stage10에서 Shuffle Write Size 했던 파일을 집계합니다.
'DataPipeline > Spark' 카테고리의 다른 글
Spark - 병렬 JDBC Table 읽기 (0) | 2024.11.07 |
---|---|
Spark - Databricks에 AWS S3 연동 (0) | 2024.10.30 |
Spark - 테이블 캐싱(Caching) 예제 (0) | 2024.10.27 |
Spark - Databricks를 통한 디버그 환경설정 (0) | 2024.10.26 |
스파크완벽가이드 - 21장 구조적 스트리밍의 기초 (0) | 2024.08.18 |