[ Time-Series Collection ]
1. 개요
MongoDB 5.0부터 도입된 시계열 전용 컬렉션 타입입니다.
“같은 시계열(센서·채널 등)에 속하는 여러 측정값을
‘버킷(bucket)’이라는 큰 문서에 압축·열 지향 포맷으로 묶어서 보관해
쓰기·저장·조회 효율을 극대화한 컬렉션” 입니다.
필수 필드
- timeField : 시간 값(ISODate)
- metaField : 동일 시계열을 구분하는 태그(ID·채널·센서 등, 선택)
추가 옵션
- granularity : 데이터 입력 주기 힌트("seconds", "minutes", "hours")
- TTL·샤딩도 그대로 사용 가능
2. 간단한 구조 살펴보기
예시 시나리오 : YouTube 라이브 채널에서 1초마다 “동시시청자(viewers)” 수치를 수집
1) 컬렉션 생성
db.createCollection("channelViewers", {
timeseries: {
timeField : "ts", // 1초 주기의 타임스탬프
metaField : "channelId", // 채널 ID별로 시계열 구분
granularity: "seconds"
},
expireAfterSeconds: 60*60*24*30 // 30일 보존(선택)
})
2) 데이터 삽입
db.channelViewers.insertMany([
{ channelId: "LIVE_abc", ts: ISODate("2024-07-30T10:15:04Z"), viewers: 731 },
{ channelId: "LIVE_abc", ts: ISODate("2024-07-30T10:15:05Z"), viewers: 742 },
{ channelId: "LIVE_xyz", ts: ISODate("2024-07-30T10:15:05Z"), viewers: 189 }
])
- 동일 channelId(metaField)에 해당하는 레코드들은 내부적으로 같은 버킷에 차곡차곡 기록됩니다.
- _id 인덱스를 만들지 않으므로 쓰기 오버헤드가 작습니다.
3) 기본 조회 (특정 채널 1분치 찾기)
db.channelViewers.find({
channelId: "LIVE_abc",
ts: { $gte: ISODate("2024-07-30T10:15:00Z"),
$lt : ISODate("2024-07-30T10:16:00Z") }
})
- MongoDB는 버킷 메타데이터를 먼저 찾아 필요한 버킷만 스캔합니다.
3. 집계( Aggregation Pipeline ) 예제
[목표] 유튜브 채널별 1분 평균 시청자수를 구해보자.
db.channelViewers.aggregate([
// ① 5분 범위로 한정(옵션)
{ $match: {
ts: { $gte: ISODate("2024-07-30T10:15:00Z"),
$lt : ISODate("2024-07-30T10:20:00Z") }
}},
// ② 1분 단위로 버킷 키 만들기
{ $group: {
_id: {
channelId: "$channelId",
minute: { $dateTrunc: { date: "$ts", unit: "minute" } }
},
avgViewers: { $avg: "$viewers" }
}},
{ $sort: { "_id.channelId": 1, "_id.minute": 1 } }
])
특징
- 버킷(열 지향) 저장 덕분에 $match 와 $group 단계에서 버킷 단위 블록 프로세싱이 적용되어 CPU·디스크 I/O가 매우 적음.
- channelId 로 샤딩하면 채널별 스케일-아웃도 간단.
4. TTL·아카이빙: 오래된 데이터 자동 삭제
앞서 설정한 expireAfterSeconds 옵션 = TTL 인덱스 역할
→ 30일이 지난 버킷은 백그라운드에서 자동 삭제되어 스토리지를 깔끔하게 유지.
5. Materialized View(요약 컬렉션)로 선집계
자주 쓰는 1분 요약을 미리 저장하면 실시간 대시보드가 훨씬 빨라집니다.
db.channelViewers.aggregate([
{ $match: { ts: { $gte: /*당일 1분 전*/ } } },
{ $group: {
_id: {
channelId: "$channelId",
minute: { $dateTrunc: { date: "$ts", unit: "minute" } }
},
avgViewers: { $avg: "$viewers" }
}},
{ $merge: {
into: "channelViewers_1min",
on: ["_id.channelId", "_id.minute"],
whenMatched: "replace",
whenNotMatched: "insert"
}}
])
- 이 파이프라인을 1분마다 크론/Trigger로 실행 → channelViewers_1min 컬렉션에 upsert
- 대시보드는 요약 컬렉션만 조회하면 되므로 응답이 몇 ms 수준
6. 요약
1. createCollection() 시 timeseries 옵션만 지정하면 시계열 특화 저장 구조가 자동 적용
2. 동일 시계열(채널·센서 등) 데이터가 한 버킷-문서에 모여 쓰기·저장·조회가 모두 빨라짐
3. Aggregation Pipeline, 윈도 함수, $merge 를 이용해
- 초 → 분 → 시 단위 집계
- 누적합·랭킹·롤링 평균
- Materialized View 생성
등을 별도 데이터베이스 없이 몽고 내부에서 처리
[ Aggregation Pipeline ]
MongoDB 서버 내부에서
“데이터를 흐름(Stream)처럼 흘려보내며, 여러 단계(stage)로 변환·필터·집계”
하도록 설계된 데이터 가공 엔진입니다.
SQL의 SELECT … GROUP BY … HAVING … 을 MongoDB 한곳에서 구현한다고 보면 됩니다.
1. 핵심 개념
- 파이프라인은 배열 형태로, 각 요소가 하나의 stage - [ { $stage1: {...} }, { $stage2: {...} }, … ]
- 앞 단계의 출력이 바로 다음 단계의 입력이 되어 스트림처럼 흐름을 이룹니다.
- 모든 stage는 서버 메모리에서 실행되므로 애플리케이션이 데이터를 꺼내서 따로 가공할 필요가 없습니다.
2. 대표 Stage와 용도
Stage | 설명 (SQL에 비유)
- $match | WHERE 절 : 조건 필터, 인덱스 사용
- $project | SELECT 절 : 필드 선택·변환
- $group | GROUP BY : 키별 집계(sum, avg …)
- $sort | ORDER BY
- $unwind | 배열을 행으로 펼치기
- $bucket/$bucketAuto | 구간 히스토그램
- $lookup | JOIN (다른 컬렉션 결합)
- $setWindowFields | 윈도 함수(누적합, 이동평균 등)
- $merge | 결과를 컬렉션에 upsert ⇒ Materialized View 생성
3. 예제 시나리오
가상의 컬렉션 userActions
{
channelId : "LIVE_abc",
ts : ISODate("2024-07-30T14:23:05Z"),
userId : "U_123",
action : "donate", // like | subscribe | donate
amount : NumberDecimal("5")
}
목표:
1) 채널별·1시간 단위 기부 합계 / 건수
2) “기부액 누적합” 그래프용 데이터
3) 위 결과를 요약 컬렉션에 upsert
db.userActions.aggregate([
/* ① 관심 기간·액션만 선별 ------------ */
{ $match: {
action: "donate", // 필터
ts: { $gte: ISODate("2024-07-30T00:00:00Z") } // 인덱스 사용
}},
/* ② 키 생성: 채널 + 1시간 ------------ */
{ $group: { // GROUP BY
_id: {
channelId: "$channelId",
hour: { $dateTrunc: { date: "$ts", unit: "hour" } }
},
totalDonation : { $sum: "$amount" }, // SUM(amount)
donationCnt : { $sum: 1 } // COUNT(*)
}},
/* ③ 정렬 ---------------------------- */
{ $sort: { "_id.channelId": 1, "_id.hour": 1 } },
/* ④ 결과를 요약 컬렉션에 upsert ------ */
{ $merge: {
into: "donationHourlySummary", // 대상 테이블
on: ["_id.channelId", "_id.hour"], // PK
whenMatched: "replace",
whenNotMatched: "insert"
}}
])