21.1 구조적 스트리밍의 기초
- 구조적 스트리밍의 핵심 아이디어는 스트림 데이터를 데이터가 계속해서 추가되는 테이블처럼 다루는 것
- 필요한 경우 상태 저장소(state store)에 있는 일부 상태를 갱신해 결과를 변경
- API 핵심은 배치처리나 관련되 쿼리 구문을 변경하지 않아도 된다는 것
- DataFrame도 스트리밍 방식으로 동작할 수 있음
21.2 핵심 개념
트랜스포메이션과 액션
- 몇 가지 제약 사항이 증분 처리를 할 수 없는 일부 쿼리에 있음
- 연속적으로 처리해 결과를 출력하는 한 가지 액션만 있음
입력 소스
- 아파치 카프카 0.10버전 ( spark.2.2 )
- HDFS, S3 등 분산 파일 시스템의 파일
- 테스트용 소켓 소스
싱크( Sink )
싱크로 스트림의 결과를 저장할 목적지를 명시
- 아파치 카프카 0.10버전
- 출력 레코드에 임의 연산을 실행하는 foreach 싱크
- 테스트용 콘솔 싱크
- 디버깅용 메모리 싱크
출력모드
- 출력모드는 지정해야하는 작업 필요
- 지원하는 출력 모드는
- append
- update
- complete
트리거
- 출력 시점을 정의
- 기본적으로 마지막 입력 데이터를 처리한 직후에 신규 입력 데이터를 조회해 결과를 만들어냄
이벤트 시간처리
- 구조적 스트리밍은 이벤트 시간 기준의 처리도 지원
- 레코드 내부의 기록된 타임스탬프를 기준으로 함
21.3 구조적 스트리밍 활용
- 예제는 이기종 데이터 셋(Heterogeneity Human Activity Recognition)
- 구조적 스트리밍에서 스키마 추론 기능을 사용하기 싶은 경우 명시적으로 설정
- spark.sql.streaming.schemaInference = true
예제
## 예제 코드
static = spark.read.json("/data/activity-data/")
dataSchema = static.schema
# 스트리밍 생성
# maxFilesPerTrigger는 폴더 내의 전체 파일을 얼마나 빨리 읽을지 결정
streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
.json("/data/activity-data")
# 스트리밍 트랜스포메이션 지정
activityCounts = streaming.groupBy("gt").count()
spark.conf.set("spark.sql.shuffle.partitions", 5)
# 스트리밍 액션을 정의 ( 메모리 싱크 사용 )
# 싱크를 지정, 이 때 출력 방식도 함계 정의
activityQuery = activityCounts.writeStream.queryName("activity_counts")\
.format("memory").outputMode("complete")\
.start()
# 코드 실행
# 백그라운드에서 스트리밍 연산 시작
activityQuery.awaitTemination()
'DataPipeline > Spark' 카테고리의 다른 글
Spark - 테이블 캐싱(Caching) 예제 (0) | 2024.10.27 |
---|---|
Spark - Databricks를 통한 디버그 환경설정 (0) | 2024.10.26 |
스파크완벽가이드 - 20장 스트림 처리의 기초 (0) | 2024.08.18 |
스파크완벽가이드 - 18장 모니터링과 디버깅 (0) | 2024.08.18 |
스파크완벽가이드 - 17장 스파크 배포환경 (0) | 2024.08.18 |