DataPipeline/Spark

스파크완벽가이드 - 2장 스파크 간단히 살펴보기

wave35 2024. 7. 27. 13:09

 

2.1 스파크 기본 아키텍처

클러스터

- 클러스터는 여러 컴퓨터의 자원을 모아 하나의 컴퓨터 처럼 사용
- 스파크는 클러스터에서 작업을 조율할 수 있는 역할을 하는 프레임워크

 

드라이버 프로세스

- 클러스터 노드 중 하나에서 실행되며 main() 함수를 실행
- 필수적으로 존재

 

익스큐터 프로세스

- 드라이버가 할당한 작업을 수행 후, 드라이버 노드에 보고하는 두가지 역할 수행

스파크 아키텍처

 

 

모드 실행 예제

클러스터 모드 실행 예제

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MySparkApp") \  # 애플리케이션 이름 설정
    .master("spark://<master-hostname>:<master-port>") \  # 마스터 노드의 주소 설정
    .config("spark.executor.instances", "4") \  # 실행할 Executor 인스턴스 수 설정
    .config("spark.executor.cores", "4") \  # 각 Executor의 코어 수 설정
    .getOrCreate()


클라이언트 모드 실행 예제

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MySparkApp") \ 
    .master("local[1]") \  # 로컬 머신에서 실행
    .config("spark.executor.instances", 3) \  # 실행할 Executor 인스턴스 수 설정
    .config("spark.executor.cores", 1) \  # 각 Executor의 코어 수 설정
    .getOrCreate()


Hive Thirft 서버 클러스터 모드

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MySparkApp") \  # 애플리케이션 이름 설정
    .config("spark.sql.warehouse.dir", "hdfs://<namenode>:<port>/user/hive/warehouse") \  # Hive warehouse 경로 설정
    .config("spark.hadoop.hive.metastore.uris", "thrift://<metastore-host>:<port>") \  # Hive metastore 주소 설정
    .config("spark.executor.instances", 3) \  # 실행할 Executor 인스턴스 수 설정
    .config("spark.executor.cores", 1) \  # 각 Executor의 코어 수 설정
    .enableHiveSupport() \  # Hive 지원 활성화
    .getOrCreate()

 

 

2.3 스파크 API

스파크는 두 가지 API를 제공

 

저수준의 비구조적 API 

ㄴ RDD

ㄴ 저수준의 API는 데이터에 대한 세부적인 제어를 제공, 그러나 직접적으로 분산, 병렬 처리 및 메모리 관리가 필요

ㄴ map(), filter(), reduce()와 같은 연산자를 사용

 

고수준의 구조적 API 

ㄴ DataFrame 및 Dataset API

ㄴ 스키마를 가지고 있어 데이터의 구조와 타입을 명확하게 정의, 최적화된 실행 계획이 가능

ㄴ SQL과 유사한 문법을 사용하여 데이터를 처리 가능

ㄴ select(), filter(), groupBy(), agg()와 같은 함수를 사용하여 데이터를 조작 및 집계

 

 

2.5 SparkSession

SparkSession

- Apache Spark를 사용하여 작업을 수행하는 데 사용되는 주요 객체

- SparkSession 인스턴스는 사용자가 정의한 명령을 클러스터에서 실행

- 하나의 SparkSession은 하나의 스파크 애플리케이션에 대응

    - Ex) myRange = spark.range(1000).toDF("number")

 

 

SparkSession과 SparkContext의 차이점

SparkContext

ㄴ Spark 1.x 시절부터 사용되던 주요 진입점(entry point) 객체
ㄴ 클러스터와의 연결을 설정하고 RDD(Resilient Distributed Dataset)를 생성하고 조작하는 데 사용
ㄴ 주로 하위 수준의 API를 다루는 데 사용

 

SparkSession

ㄴ Spark 2.x 버전부터 도입된 새로운 진입점(entry point) 객체
ㄴ 데이터프레임(DataFrame)과 데이터셋(Dataset) API를 사용하여 구조적인 데이터를 다루는 데 사용
ㄴ SparkSession은 SparkContext와 SQLContext, HiveContext를 하나의 객체로 통합 됨
ㄴ Spark SQL, 스트리밍 처리 등 다양한 기능을 사용할 수 있도록 합니다.

 

 

2.6 DataFrame

DataFrame

- 가장 대표적인 구조적 API

- 테이블의 데이터를 row, column으로 표현 ( 스키마 )

- 분산 데이터를 표현하기에 효율적인 구조

 

Partition

- 모든 익스큐터가 병렬로 작업을 수행할 수 있도록 파티션이라는 청크 단위로 데이터를 분할

- 1 Core = 1 Task = 1 Partition 

- 파티션은 클러스터의 여러 노드에 분산 저장되며, 병렬처리 작업을 수행할 때 각 파티션은 별도 태스크로 실행

- 클러스터의 물리적 머신에 존재하는 row의 집합, 분산된 데이터의 조각

    - Ex) df = df.repartition(2)  # 2개의 파티션으로 재분배

 

 

2.7 트랜스포메이션

트랜스포메이션

- 스파크의 핵심 데이터구조는 불변성 (immutable)
- 불변성이기이에 트랜스포메이션을 통해(액션 이전에) 데이터를 변환하거나 조작하는 작업을 의미 

 

트랜스포메이션 유형

- 좁은 의존성
ㄴ 하나의 파티션이 하나의 출력 파티션에만 영향

 

- 넓은 의존성

ㄴ 하나의 파티션이 여러 출력 파티션에 영향

ㄴ 파티션을 교환하는 셔플이 발생

 

좁은 의존성

 

넚은 의존성

 

트랜스포메이션은 특성

- 지연 연산

ㄴ 스파크는 트랜스포메이션을 즉시 실행하지 않고, 논리적 실행 계획을 만들어 DAG(Directed Acyclic Graph)를 생성

ㄴ DAG에는 연산들 사이의 의존성과 실행 순서가 정의됩

ㄴ DAG가 생성 후, 스파크는 DAG를 물리적 실행 계획으로 컴파일

ㄴ 이는 DAG의 각 단계를 실제로 실행할 수 있는 실행 단위로 변환하는 과정

ㄴ 물리적 실행 계획은 실제로 클러스터에서 실행될 태스크들을 나타 냄

    ( 데이터의 파티션 처리, 클러스터의 자원 할당, 셔플 및 병렬 처리 등을 고려하여 구성 )

ㄴ 물리적 실행 계획은 최적화된 실행 경로를 선택하여 클러스터에서의 작업을 효율적으로 수행할 수 있도록 지원

 

- 병렬 처리

ㄴ 트랜스포메이션은 병렬로 처리 됨 ( 여러 파티션의 데이터에 동시에 적용 됨 )

 

 

2.8 액션

- 실행계획을 실제 연산으로 수행하는 명령
- 스파크 잡이 시작됨
- myRange.count

 

 

2.9 스파크 UI

- 스파크 UI는 드라이버 노드의 4040포트로 접속 가능

 

 

2.10 종합예제

- DataFrame은 스키마 정보를 알아내는 스키마 추론 기능

예제

filteData2015 = spark\
    .read\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .csv("/data/flight-data/csv/2015-summary.csv")
    
- head 명령어와 같은 결과
flightData2015.take(3)

- 실행계획 확인
flightData2015.sort("count").explain()

- DataFrame을 테이블이나 뷰로 생성
flightData2015.createOrReplaceTempView("flight_Data_2015")

 

 

스파크 셔플(Shuffle)

- 셔플은 데이터 그룹화, 집계 단계를 위해 클러스터의 노드 전체에 데이터를 재분배하는데 사용

- 데이터를 더 작은 청크로 분할하고, 네트워크를 통해 섞은 다음, 새로운 청크 집합으로 다시 분할하는 작업

- 셔플 최적화 관련 포스트
https://tech.kakao.com/2021/10/08/spark-shuffle-partition/