DataPipeline/Spark
스파크완벽가이드 - 9장 데이터 소스
wave35
2024. 8. 7. 15:39
스파크 핵심 외부 데이터소스
- CSV
- JSON
- PARQUET
- ORC
- JDBC/ODBC
- TEXT FILE
- 파일 옵션
ㄴ https://spark.apache.org/docs/latest/sql-data-sources-csv.html
CSV Files - Spark 3.5.1 Documentation
spark.apache.org
9.1 데이터소스 API 구조
읽기 모드
데이터 읽기 핵심 구조
- DataFrameReader.format(...).option("key","value").schema(...).load()
- DataFrameReader는 SparkSession의 read 속성으로 접근
읽기 모드 지정 값
- 포맷
- 스키마
- 읽기 모드
ㄴ permissive : 오류레코드 null 설정
ㄴ dropMalformed : 오류레코드 제거
ㄴ failFast : 오류레코드시 만나면 즉시 종료
- 옵션
예제
spark.read.format("csv")
.option("mode","FAILFAST")
.option("inferSchema","true")
.option("path","path/to/file(s)")
.schema(someSchema)
.load()
쓰기 모드
데이터 쓰기 핵심 구조
- DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
- DataFrameWriter는 DataFrame의 write 속성으로 접근
쓰기 모드 지정 값
- 포맷
- 옵션
- 저장 모드
ㄴ append : 해당 경로에 이미 존재하는 파일 목록에 결과 파일을 추가
ㄴ overwrite : 이미 존재하는 모든 데이터를 덮어씀
ㄴ errorIfExists : 해당 경로에 파일이 존재하면 오류 발생
ㄴ ignore : 해당 경로에 데이터 파일이 존재하면 아무런 처리하지 않음
예제
dataframe.write.format("csv")
.option("mode","OVERWRITE")
.option("dataFormat","yyyy-MM-dd")
.option("path","path/to/file(s)")
.save()
9.2 CSV 파일
CSV 옵션
# CSV 읽기
csvFile = spark.read.format("csv")
.option("header","true")
.option("mode","FAILFAST")
.option("inferSchema","true")
.load("some/path/to/file.csv")
# CSV 쓰기
csvFile.write.format("csv")
.option("mode","OVERWRITE")
.option("sep","\t")
.save("path/to/file(s)")
9.3 Json 파일
- 구조화되어 있고, 최소한의 기본 데이터 타입이 존재
Json 옵션
9.4 Parquet 파일
- 개별 컬럼을 기준으로 읽어 저장공간을 절약하고, 컬럼 기반의 압축 기능 제공
- 스파크 기본 파일 포맷
- 복합 데이터 타입을 지원 ( csv에서는 배열 사용못함 )
- 스파크와 호환이 잘 되기에 옵션이 2개만 존재
Parquet 옵션
9.5 ORC 파일
- 컬럼 기반의 파일 포맷
- 대규모 스트리밍 읽기에 최적화, 로우를 신속하게 찾아낼 수 있는 기능
- parquet는 spark에 최적화, orc는 hive에 최적화
9.6 SQL Database
- 접속 / 인증 및 네트워크 관련 옵션이 필요
- 스파크 classpath에 Database JDBC Driver를 추가하고 적절한 jar 파일을 제공
ㄴ 예제) ./bin/spark-shell --driver-class-path postgresql-9.4.1.jar --jars postgresql-9.4.1.jar
- 데이터 읽기
pgDF = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://database_server")\
.option("dbtable", "schema.tablename")\
.option("user", "username").option("password", "my-secret-password").load()
- 데이터 쓰기
newPath = "jdbc:sqlite://tmp/my-sqlite.db"
csvFile.write.jdbc(newPath, tablename, mode="overwrite", properties=props)
- 쿼리 푸시다운
ㄴ DF을 만들기 전에 데이터베이스 자체에서 데이터를 필터링하여 가져 옴
pushdownQuery = """(SELECT DISTINCT(DEST_COUNTRY_NAME) FROM flight_info) AS flight_info"""
dbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", pushdownQuery)\
.option("driver", driver)\
.load()
- 데이터베이스 병렬로 읽기
ㄴ numPartitions 옵션을 이용해 읽기/쓰기 동시 작업 수를 제한하는 파티션 수를 설정
dbDataFrame = spark.read.format("jdbc")\
.option("url", url)\
.option("dbtable", tablename)\
.option("driver", driver)\
.option("numPartitions", 10).load()
9.8 고급 I/O 개념
- 분할 가능한 파일 타입과 압축 방식
ㄴ 스파크에서 전체 파일이 아닌 쿼리에 필요한 부분만 읽을 수 있어 성능 향상 (parquet file)
ㄴ 모든 압축 방식이 분할 압축을 지원하지 않음 ( 추천 압축 방식 : GZIP )
- 병렬로 데이터 읽기
ㄴ 여러 익스큐터가 같은 파일을 동시에 읽을 수 없지만 여러 파일을 동시에 읽을 수 있음
ㄴ 다수의 파일을 읽을 때, 개별 파일을 DF의 파티션이 됨
- 병렬로 데이터 쓰기
ㄴ 기본적으로 데이터 파티션 당 하나의 파일이 작성
ㄴ 폴더 안에 5개의 파일을 생성
csvFile.repartiton(5).write.format("csv").save("/path/mm/")
- 파티셔닝
ㄴ 어떤 데이터를 어디에 저장할 것인지 제어
ㄴ 필터를 자주 사용하는 테이블의 경우 손쉬운 최적화 방식
ㄴ csvFile.limit(10).write.mode("overwrite").partitionBy("DEST_COUNTRY").save("/tmp/partitioned.parquet")
- 버켓팅
ㄴ 각 파일에 저장된 데이터를 제어하는 파일 조직화 기법
ㄴ 하나의 물리적 파티션에 모여 있어 데이터를 읽을 때 셔플을 피할 수 있음
ㄴ numBuckets = 100
csvFile.write.format("parquet").mode("overwrite").bucketBy(numBuckets, "customerId")..saveAsTable("bucket_files")
- 파일 크기 관리
ㄴ 파일이 작으면 메타데이터 관리에 부하가 생기며, 파일이 크면 작은 데이터를 부를 때 큰 데이트블록을 읽어야 함
ㄴ 파일 크기를 제한하는 maxRecordPerFile 통해 파일당 레코드 수 지정
ㄴ df.write.option("maxRecordPerFile", 5000)