Spark - 적응형 쿼리 (Adaptive Query Execution)
·
DataPipeline/Spark
적응형 쿼리 (Adaptive Query Execution)AQE는 Spark 3.0부터 도입된 기능으로,복잡한 쿼리 처리의 효율성을 높이고 실행 시간을 단축하는데 도움이 됩니다..  쿼리가 시작되면 AQE 프레임워크는 다른 스테이지에 의존하지 않는 리프 스테이지(leaf stages)를 먼저 실행합니다. 하나 이상의 스테이지가 완료되면, 프레임워크는 해당 스테이지를 완료된 것으로 표시하고 실행 중에 얻은 통계로 논리적 쿼리 플랜을 업데이트합니다.  이 통계에 따라 프레임워크는 선택된 논리적/물리적 최적화 규칙, 파티션 병합, 스큐 조인과 같은적응형 최적화 규칙을 적용해 새로운 최적화 쿼리 플랜을 생성합니다.즉, 실행-재최적화-재실행 과정을 반복하여 전체 쿼리가 완료될 때까지 최적화를 계속 진행합니다. ..
Spark - 병렬 JDBC Table 읽기
·
DataPipeline/Spark
Spark에서 PostgreSQL 테이블을 읽는 것을 예제로 합니다.직렬 읽기는 단일 커넥션을 통해 데이터가 순차적으로 로드되며,병렬 읽기는 여러 분할을 통해 데이터를 동시에 읽음으로써 성능을 크게 향상시킬 수 있습니다. 병렬 읽기가 무조건 좋은 것 처럼 보이지만클러스터의 리소스나 테이블의 크기에 따라 병렬 읽기가 필요하지 않은 경우도 있습니다. 1. 직렬 읽기 (Sequential Read)직렬 읽기는 단일 커넥션으로 PostgreSQL 테이블의 데이터를 Spark로 읽어옵니다. Spark는 이 커넥션을 통해 데이터를 가져오고, 모든 데이터를 하나의 파티션으로 로드합니다.예제 코드 (pysark)from pyspark.sql import SparkSession# SparkSession 생성spark =..
Spark - Databricks에 AWS S3 연동
·
DataPipeline/Spark
[ Catalog를 통해 연동 ]1. Access Key CSV 파일 업로드Catalog 메뉴에서 'Create Table'을 클릭한다. Files 영역에 AWS Access Key 정보가 들어있는 CSV파일을 업로드 한다. CSV 파일 예제는 아래와 같다.# CSV 파일 예제Access key ID,Secret access keyDOCDNSSOD1223DK,A9DFSDF9899/DFSDSF91/ 2. 업로드된 CSV 파일 확인%fs ls dbfs:/FileStore/tables/accessKeys.csvaws_key_df = spark.read.csv("dbfs:/FileStore/tables/accessKeys.csv" , header=True , inferSchema= True)aws_key_df...
Spark - 파티션 변경에 따른 셔플작업 변화
·
DataPipeline/Spark
트랜스포메이션Narrow Transformation각 파티션이 독립적으로 처리되어 다른 파티션과 데이터 셔플이 필요 없는 변환입니다.예시: map, filter, flatMap.특징: 빠르고 메모리 효율적입니다. Wide Transformation데이터가 다른 파티션으로 이동하거나 셔플링이 필요한 변환입니다.예시: groupByKey, reduceByKey, join.특징: 데이터 셔플링으로 인해 느리고 자원 소모가 큽니다.  Group by Queryspark.sql.adaptive.enable해당 옵션은 Spark SQL에서 Adaptive Query Execution(AQE) 기능을 활성화할지 여부를 결정합니다. AQE는 Spark 3.0부터 도입된 기능으로, 쿼리 실행 도중 발생할 수 있는 다양한..
Spark - 테이블 캐싱(Caching) 예제
·
DataPipeline/Spark
설정 환경Databrick에서 제공하는 파일을 통해 flights 테이블을 생성합니다.CREATE TABLE IF NOT EXISTS flightsUSING csvOPTIONS ( header "true", path "/databricks-datasets/flights", inferSchema "true")  캐싱(Caching) 전후 비교캐싱을 하지 않았을 때 실행 시간이 2.37초 실행 됨을 확인합니다. flights 테이블을 캐싱합니다.CACHE TABLE flights 테이블을 캐싱을 후, 동일한 쿼리의 실행시간을 비교합니다.아래와 같이 1.39초로 실행시간이 감소하였습니다.  캐싱(Caching) 테이블 확인Spark UI의 Storage에서 캐싱 테이블 확인할 수 있습니다.flights라..
Spark - Databricks를 통한 디버그 환경설정
·
DataPipeline/Spark
Databricks 소개Databricks는 데이터 분석과 머신러닝을 위한 통합 분석 플랫폼으로, Amazon, Microsoft, Google의 클라우드에서 작동합니다. 구조화 및 비구조화 데이터, 스트리밍 데이터 등 다양한 데이터 형식을 지원하며,여러 오픈 소스 도구와 통합되어 데이터 분석, 대시보드, ETL, 머신러닝, 스트리밍 작업에 적합합니다. Spark cluster 디버깅 환경을 설정하기가 쉽지 않은데 ( 여러개의 Docker, 혹은 여러개의 EC2 서버 구축 )Spark의 복잡한 설정을 간소화하고 성능을 최적화하여 데이터 분석과 SQL 쿼리 실행에 편리한 환경을 제공합니다. Databricks Community EditionCommunity Edition은 무료 버전으로, 소규모 Spark..
스파크완벽가이드 - 21장 구조적 스트리밍의 기초
·
DataPipeline/Spark
21.1 구조적 스트리밍의 기초구조적 스트리밍의 핵심 아이디어는 스트림 데이터를 데이터가 계속해서 추가되는 테이블처럼 다루는 것필요한 경우 상태 저장소(state store)에 있는 일부 상태를 갱신해 결과를 변경API 핵심은 배치처리나 관련되 쿼리 구문을 변경하지 않아도 된다는 것DataFrame도 스트리밍 방식으로 동작할 수 있음  21.2 핵심 개념트랜스포메이션과 액션- 몇 가지 제약 사항이 증분 처리를 할 수 없는 일부 쿼리에 있음- 연속적으로 처리해 결과를 출력하는 한 가지 액션만 있음  입력 소스- 아파치 카프카 0.10버전 ( spark.2.2 )- HDFS, S3 등 분산 파일 시스템의 파일- 테스트용 소켓 소스 싱크( Sink )싱크로 스트림의 결과를 저장할 목적지를 명시 - 아파치 카프..
스파크완벽가이드 - 20장 스트림 처리의 기초
·
DataPipeline/Spark
20.1 스트림 처리란연속형 애플리케이션(Continuous Processing)기존의 Spark Streaming은 마이크로 배치 방식으로 동작, 그러나 저지연 처리가 필요한 애플리케이션에는 한계나머지 컴포넌트와 쉽게 연동할 수 있어야 하여 '연속형 애플리케이션' 이란 개념을 추가연속적인 데이터 처리를 가능하게 하기 위해 Spark 2.3에서 추가 됨 주요 특징저지연 처리밀리초 단위의 지연 시간을 제공End-to-End Exactly-Once Semantics데이터 소스에서 싱크까지 데이터를 중복 없이 정확히 한 번만 처리통합된 APIStructured Streaming API를 사용하여 "연속 처리 모드"와 기존 "마이크로 배치 모드"를 모두 지원 예제trigger(processingTime="1 s..
wave35
'DataPipeline/Spark' 카테고리의 글 목록