DataPipeline/Spark

Spark - 적응형 쿼리 (Adaptive Query Execution)

wave35 2024. 11. 10. 12:18

적응형 쿼리 (Adaptive Query Execution)

AQE는 Spark 3.0부터 도입된 기능으로,

복잡한 쿼리 처리의 효율성을 높이고 실행 시간을 단축하는데 도움이 됩니다.. 

 

쿼리가 시작되면 AQE 프레임워크는 

다른 스테이지에 의존하지 않는 리프 스테이지(leaf stages)를 먼저 실행합니다. 

하나 이상의 스테이지가 완료되면, 프레임워크는 해당 스테이지를 완료된 것으로 표시하고 

실행 중에 얻은 통계로 논리적 쿼리 플랜을 업데이트합니다. 

 

이 통계에 따라 프레임워크는 선택된 논리적/물리적 최적화 규칙, 파티션 병합, 스큐 조인과 같은

적응형 최적화 규칙을 적용해 새로운 최적화 쿼리 플랜을 생성합니다.
즉, 실행-재최적화-재실행 과정을 반복하여 전체 쿼리가 완료될 때까지 최적화를 계속 진행합니다.

 

예측한 데이터에 누락이 있거나 사용자정의함수가 있을 경우

카탈리스트옵티마이저를 통해 생성된 계획이 최적화가 안될 수도 있어,

AQE가 런타임 통계를 기반으로 재 최적화를 진행합니다.

실행-재최적화-재실행 과정

AQE Features

  • 셔플 파티션을 동적 병합 (Dynamically coalesce shuffle partitions)
  • 동적 조인 전략 (Dynamically switch join strategies)
  • 스큐 조인 동적 최적화 (Dynamically optimize skew joins)

 

셔플 파티션을 동적 병합 (Dynamically coalesce shuffle partitions)

- 쿼리 실행 중 셔플 과정에서 작은 파티션을 자동으로 병합하여 최적의 파티션 수를 유지합니다.

- 작은 파티션이 다수 생성될 경우, Spark가 파티션 병합을 통해 오버헤드를 줄이고 성능을 개선합니다.

 

동적 조인 전략 (Dynamically switch join strategies)

- broadcast join을 자동 적용하여 로컬에서 조인을 수행하고 shuffle하는 데이터를 줄입니다.

- broadcast join을 적용할 때 런타임 데이터(실제 데이터를 읽은 후 직렬화된 상태) 크기로 조인을 재배치합니다.

 

실제 테이블 크기 (25MB) : 기본 broadcast join 크기 제한은 10MB에 적용하지 못함

런타임 시 테이블 크기 (8MB) : AQE는 런타임 사이즈로 broadcast join을 적용하기에 해당 사항이 됨

 

스큐 조인 동적 최적화 (Dynamically optimize skew joins)

- AQE는 런타임 통계를 사용해 파티션 크기에 따른 편차를 감지하고 더 작은 파티션으로 분할합니다.

 

PySpark에서 AQE 활성화

PySpark에서 AQE를 사용하려면 

Spark 설정에서 spark.sql.adaptive.enabled 옵션을 True로 설정합니다.

예제

from pyspark.sql import SparkSession

# SparkSession 생성 시 AQE 활성화
spark = SparkSession.builder \
    .appName("Example with AQE") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

 

주요 AQE 관련 설정 옵션

spark.sql.adaptive.enabled

- AQE를 활성화할지 여부

spark.sql.adaptive.coalescePartitions.enabled

- 작은 셔플 파티션을 병합할지 여부

spark.sql.adaptive.skewJoin.enabled

- 데이터가 비대칭적으로 분포된 경우 스큐 조인을 최적화할지 여부

 

 

참조 : https://www.databricks.com/blog/2020/05/29/adaptive-query-execution-speeding-up-spark-sql-at-runtime.html