스파크완벽가이드 - 2장 스파크 간단히 살펴보기
2.1 스파크 기본 아키텍처
클러스터
- 클러스터는 여러 컴퓨터의 자원을 모아 하나의 컴퓨터 처럼 사용
- 스파크는 클러스터에서 작업을 조율할 수 있는 역할을 하는 프레임워크
드라이버 프로세스
- 클러스터 노드 중 하나에서 실행되며 main() 함수를 실행
- 필수적으로 존재
익스큐터 프로세스
- 드라이버가 할당한 작업을 수행 후, 드라이버 노드에 보고하는 두가지 역할 수행
모드 실행 예제
클러스터 모드 실행 예제
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MySparkApp") \ # 애플리케이션 이름 설정
.master("spark://<master-hostname>:<master-port>") \ # 마스터 노드의 주소 설정
.config("spark.executor.instances", "4") \ # 실행할 Executor 인스턴스 수 설정
.config("spark.executor.cores", "4") \ # 각 Executor의 코어 수 설정
.getOrCreate()
클라이언트 모드 실행 예제
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MySparkApp") \
.master("local[1]") \ # 로컬 머신에서 실행
.config("spark.executor.instances", 3) \ # 실행할 Executor 인스턴스 수 설정
.config("spark.executor.cores", 1) \ # 각 Executor의 코어 수 설정
.getOrCreate()
Hive Thirft 서버 클러스터 모드
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MySparkApp") \ # 애플리케이션 이름 설정
.config("spark.sql.warehouse.dir", "hdfs://<namenode>:<port>/user/hive/warehouse") \ # Hive warehouse 경로 설정
.config("spark.hadoop.hive.metastore.uris", "thrift://<metastore-host>:<port>") \ # Hive metastore 주소 설정
.config("spark.executor.instances", 3) \ # 실행할 Executor 인스턴스 수 설정
.config("spark.executor.cores", 1) \ # 각 Executor의 코어 수 설정
.enableHiveSupport() \ # Hive 지원 활성화
.getOrCreate()
2.3 스파크 API
스파크는 두 가지 API를 제공
저수준의 비구조적 API
ㄴ RDD
ㄴ 저수준의 API는 데이터에 대한 세부적인 제어를 제공, 그러나 직접적으로 분산, 병렬 처리 및 메모리 관리가 필요
ㄴ map(), filter(), reduce()와 같은 연산자를 사용
고수준의 구조적 API
ㄴ DataFrame 및 Dataset API
ㄴ 스키마를 가지고 있어 데이터의 구조와 타입을 명확하게 정의, 최적화된 실행 계획이 가능
ㄴ SQL과 유사한 문법을 사용하여 데이터를 처리 가능
ㄴ select(), filter(), groupBy(), agg()와 같은 함수를 사용하여 데이터를 조작 및 집계
2.5 SparkSession
SparkSession
- Apache Spark를 사용하여 작업을 수행하는 데 사용되는 주요 객체
- SparkSession 인스턴스는 사용자가 정의한 명령을 클러스터에서 실행
- 하나의 SparkSession은 하나의 스파크 애플리케이션에 대응
- Ex) myRange = spark.range(1000).toDF("number")
SparkSession과 SparkContext의 차이점
SparkContext
ㄴ Spark 1.x 시절부터 사용되던 주요 진입점(entry point) 객체
ㄴ 클러스터와의 연결을 설정하고 RDD(Resilient Distributed Dataset)를 생성하고 조작하는 데 사용
ㄴ 주로 하위 수준의 API를 다루는 데 사용
SparkSession
ㄴ Spark 2.x 버전부터 도입된 새로운 진입점(entry point) 객체
ㄴ 데이터프레임(DataFrame)과 데이터셋(Dataset) API를 사용하여 구조적인 데이터를 다루는 데 사용
ㄴ SparkSession은 SparkContext와 SQLContext, HiveContext를 하나의 객체로 통합 됨
ㄴ Spark SQL, 스트리밍 처리 등 다양한 기능을 사용할 수 있도록 합니다.
2.6 DataFrame
DataFrame
- 가장 대표적인 구조적 API
- 테이블의 데이터를 row, column으로 표현 ( 스키마 )
- 분산 데이터를 표현하기에 효율적인 구조
Partition
- 모든 익스큐터가 병렬로 작업을 수행할 수 있도록 파티션이라는 청크 단위로 데이터를 분할
- 1 Core = 1 Task = 1 Partition
- 파티션은 클러스터의 여러 노드에 분산 저장되며, 병렬처리 작업을 수행할 때 각 파티션은 별도 태스크로 실행
- 클러스터의 물리적 머신에 존재하는 row의 집합, 분산된 데이터의 조각
- Ex) df = df.repartition(2) # 2개의 파티션으로 재분배
2.7 트랜스포메이션
트랜스포메이션
- 스파크의 핵심 데이터구조는 불변성 (immutable)
- 불변성이기이에 트랜스포메이션을 통해(액션 이전에) 데이터를 변환하거나 조작하는 작업을 의미
트랜스포메이션 유형
- 좁은 의존성
ㄴ 하나의 파티션이 하나의 출력 파티션에만 영향
- 넓은 의존성
ㄴ 하나의 파티션이 여러 출력 파티션에 영향
ㄴ 파티션을 교환하는 셔플이 발생
트랜스포메이션은 특성
- 지연 연산
ㄴ 스파크는 트랜스포메이션을 즉시 실행하지 않고, 논리적 실행 계획을 만들어 DAG(Directed Acyclic Graph)를 생성
ㄴ DAG에는 연산들 사이의 의존성과 실행 순서가 정의됩
ㄴ DAG가 생성 후, 스파크는 DAG를 물리적 실행 계획으로 컴파일
ㄴ 이는 DAG의 각 단계를 실제로 실행할 수 있는 실행 단위로 변환하는 과정
ㄴ 물리적 실행 계획은 실제로 클러스터에서 실행될 태스크들을 나타 냄
( 데이터의 파티션 처리, 클러스터의 자원 할당, 셔플 및 병렬 처리 등을 고려하여 구성 )
ㄴ 물리적 실행 계획은 최적화된 실행 경로를 선택하여 클러스터에서의 작업을 효율적으로 수행할 수 있도록 지원
- 병렬 처리
ㄴ 트랜스포메이션은 병렬로 처리 됨 ( 여러 파티션의 데이터에 동시에 적용 됨 )
2.8 액션
- 실행계획을 실제 연산으로 수행하는 명령
- 스파크 잡이 시작됨
- myRange.count
2.9 스파크 UI
- 스파크 UI는 드라이버 노드의 4040포트로 접속 가능
2.10 종합예제
- DataFrame은 스키마 정보를 알아내는 스키마 추론 기능
예제
filteData2015 = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv("/data/flight-data/csv/2015-summary.csv")
- head 명령어와 같은 결과
flightData2015.take(3)
- 실행계획 확인
flightData2015.sort("count").explain()
- DataFrame을 테이블이나 뷰로 생성
flightData2015.createOrReplaceTempView("flight_Data_2015")
스파크 셔플(Shuffle)
- 셔플은 데이터 그룹화, 집계 단계를 위해 클러스터의 노드 전체에 데이터를 재분배하는데 사용
- 데이터를 더 작은 청크로 분할하고, 네트워크를 통해 섞은 다음, 새로운 청크 집합으로 다시 분할하는 작업
- 셔플 최적화 관련 포스트
https://tech.kakao.com/2021/10/08/spark-shuffle-partition/