실시간 스트리밍 백엔드에서 Redis 다루기 - 기초(1)
안녕하세요, 오늘은 제가 만드려는 여러 서비스의 기초가 되는 실시간 온라인 스트리밍 백엔드의 시스템 엔지니어링 기술에 대해서 기초를 다루어 보겠습니다.
PRD입니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# 📘 PRD: 실시간 무거운 작업 처리 스트리밍 백엔드
## 1. 개요
FastAPI와 Celery를 활용하여, 클라이언트가 보낸 요청을 실시간으로 처리하되, CPU/메모리를 많이 사용하는 작업은 백그라운드에서 비동기적으로 처리하는 백엔드 서비스.
## 2. 목적
- 실시간 스트리밍 환경에서 무거운 영상 처리, AI 분석 등을 서버 과부하 없이 안정적으로 처리
- 빠른 응답성과 안정성 확보
## 3. 주요 기능
- 클라이언트 요청 처리 (API)
- Redis를 통한 작업 큐 등록
- Celery Worker를 통한 병렬 처리
- 작업 상태 조회 및 결과 반환
## 4. 사용자 시나리오
1. 사용자가 `/process` API로 데이터를 보냄
2. 서버는 요청을 수신하고 즉시 "요청 접수됨" 메시지를 반환
3. 내부적으로 Redis 큐에 작업을 넣고 Celery Worker가 비동기로 처리
4. 사용자는 `/status/{task_id}`로 작업 상태를 조회하거나
5. 완료된 작업 결과를 `/result/{task_id}`로 확인할 수 있음
## 5. 엔드포인트 정리
### POST `/process`
- 설명: 작업 요청을 서버에 전달
- 요청 바디:
```json
{
"data": "some input data"
}
```
- 응답:
```json
{
"message": "Processing started",
"task_id": "celery-task-id"
}
```
---
### GET `/status/{task_id}`
- 설명: 특정 작업의 상태를 조회
- 응답:
```json
{
"task_id": "celery-task-id",
"status": "PENDING | STARTED | SUCCESS | FAILURE"
}
```
---
### GET `/result/{task_id}`
- 설명: 작업이 완료되었을 경우 결과 확인
- 응답:
```json
{
"task_id": "celery-task-id",
"result": { "output": "processed output" }
}
```
## 6. 기술 스택
- Python 3.13+
- FastAPI (async web framework)
- Celery (분산 비동기 작업 큐)
- Redis (broker)
- Docker (로컬 개발/배포 환경)
## 7. 향후 확장 아이디어
- 작업 우선순위 설정 (high/medium/low queue)
- 작업 중간 상태 보고 기능
- WebSocket 기반 푸시 알림 추가
- 결과 저장소 연동 (DB, S3)
---
다루는 python 패키지는 다음과 같습니다.
1
2
3
4
5
6
7
fastapi
uvicorn
celery
redis
pydantic
python-dotenv
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# main.py
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from worker import heavy_task
app = FastAPI()
class TaskRequest(BaseModel):
data: str
@app.post("/process")
async def process_data(request: TaskRequest):
task = heavy_task.delay(request.data)
return {"message": "Processing started", "task_id": task.id}
@app.get("/status/{task_id}")
async def get_status(task_id: str):
from worker import app as celery_app
result = celery_app.AsyncResult(task_id)
return {"task_id": task_id, "status": result.status}
@app.get("/result/{task_id}")
async def get_result(task_id: str):
from worker import app as celery_app
result = celery_app.AsyncResult(task_id)
if result.ready():
return {"task_id": task_id, "result": result.result}
return {"task_id": task_id, "status": result.status, "result": None}
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# worker.py
import os
from celery import Celery
from dotenv import load_dotenv
load_dotenv() # .env 파일 로드
REDIS_HOST = os.getenv("REDIS_HOST", "redis")
REDIS_PORT = os.getenv("REDIS_PORT", "6379")
REDIS_DB = os.getenv("REDIS_DB", "0")
REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
REDIS_RESULT_EXPIRES = os.getenv("REDIS_RESULT_EXPIRES", "3600")
app = Celery("tasks", broker=REDIS_URL, backend=REDIS_URL)
app.conf.result_expires = REDIS_RESULT_EXPIRES
@app.task
def heavy_task(data):
import time
time.sleep(5) # Simulate heavy processing
return {"output": f"Processed: {data}"}
.env
1
2
3
4
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_DB=0
REDIS_RESULT_EXPIRES=3600
docker-compose.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# docker-compose.yml
version: "3"
services:
redis:
image: redis:alpine
ports:
- "6379:6379"
fastapi:
build: .
ports:
- "8000:8000"
volumes:
- .:/app
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
depends_on:
- redis
worker:
build: .
command: celery -A worker worker --loglevel=info
volumes:
- .:/app
depends_on:
- redis
env_file:
- .env
Dockerfile
1
2
3
4
5
6
7
8
# Dockerfile
FROM python:3.13-slim
WORKDIR /app
COPY . .
RUN pip install --upgrade pip
RUN pip install fastapi uvicorn celery redis pydantic python-dotenv
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
이제 실행 명령어입니다. README.md
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 실시간 스트리밍 백엔드
## 실행
```powershell
docker info
docker-compose up --build
```
## 접속
```url
http://localhost:8000/docs
```
```powershell
docker exec -it streaming_backend-redis-1 redis-cli
```
```redis-cli
KEYS *
MONITOR
```
fastapi서버에 들어가시면 다음과 같은 endpoint api들이 있습니다.
process api를 보내봅니다.
태스크 아이디가 b935a842-9894-487a-a6d0-86d141f8ec9f 입니다.
이제 redis에 접속해서 해당 태스크가 캐시로 잘 남아있는지 확인해봅니다.
잘 남아있습니다:)
참고로 redis expire 기간을 3600초로 해두었기 때문에 1시간 뒤에 자동으로 삭제되어 무제한 캐시가 쌓이는 일은 없습니다.
이번 짧은 실습을 시작으로 앞으로도 다양한 기초 실습을 해보겠습니다.
감사합니다.
+추가) Redis큐, celery비동기 워커, flower대쉬보드, fastapi 서버를 사용한 간단한 사이드 프로젝트를 소개하는 다음 포스팅도 도움이 되실 수 있습니다. 관심 있으시다면 구경해주세요~ https://blog.naver.com/devramyun/223838211922
[20250418] Redis 큐, Celery 비동기 워커, Flower 대쉬보드를 사용한 Fastapi 백엔드 사이드프로젝트 및 시연 https://youtu.be/6UFrDmWOGuw 이번 영상도 ppt 발표 부분은 자동으로 만들었습니다. 뒤의 시연 부분… https://youtu.be/6UFrDmWOGuw 이번 영상도 ppt 발표 부분은 자동으로 만들었습니다. 뒤의 시연 부분…




