포스트

실시간 스트리밍 백엔드에서 Redis 다루기 - 기초(1)

안녕하세요, 오늘은 제가 만드려는 여러 서비스의 기초가 되는 실시간 온라인 스트리밍 백엔드의 시스템 엔지니어링 기술에 대해서 기초를 다루어 보겠습니다.


PRD입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# 📘 PRD: 실시간 무거운 작업 처리 스트리밍 백엔드

## 1. 개요

FastAPI와 Celery를 활용하여, 클라이언트가 보낸 요청을 실시간으로 처리하되, CPU/메모리를 많이 사용하는 작업은 백그라운드에서 비동기적으로 처리하는 백엔드 서비스.

## 2. 목적

- 실시간 스트리밍 환경에서 무거운 영상 처리, AI 분석 등을 서버 과부하 없이 안정적으로 처리
- 빠른 응답성과 안정성 확보

## 3. 주요 기능

- 클라이언트 요청 처리 (API)
- Redis를 통한 작업 큐 등록
- Celery Worker를 통한 병렬 처리
- 작업 상태 조회 및 결과 반환

## 4. 사용자 시나리오

1. 사용자가 `/process` API로 데이터를 보냄
2. 서버는 요청을 수신하고 즉시 "요청 접수됨" 메시지를 반환
3. 내부적으로 Redis 큐에 작업을 넣고 Celery Worker가 비동기로 처리
4. 사용자는 `/status/{task_id}`로 작업 상태를 조회하거나
5. 완료된 작업 결과를 `/result/{task_id}`로 확인할 수 있음

## 5. 엔드포인트 정리

### POST `/process`

- 설명: 작업 요청을 서버에 전달
- 요청 바디:

```json
{
  "data": "some input data"
}
```

- 응답:

```json
{
  "message": "Processing started",
  "task_id": "celery-task-id"
}
```

---

### GET `/status/{task_id}`

- 설명: 특정 작업의 상태를 조회
- 응답:

```json
{
  "task_id": "celery-task-id",
  "status": "PENDING | STARTED | SUCCESS | FAILURE"
}
```

---

### GET `/result/{task_id}`

- 설명: 작업이 완료되었을 경우 결과 확인
- 응답:

```json
{
  "task_id": "celery-task-id",
  "result": { "output": "processed output" }
}
```

## 6. 기술 스택

- Python 3.13+
- FastAPI (async web framework)
- Celery (분산 비동기 작업 큐)
- Redis (broker)
- Docker (로컬 개발/배포 환경)

## 7. 향후 확장 아이디어

- 작업 우선순위 설정 (high/medium/low queue)
- 작업 중간 상태 보고 기능
- WebSocket 기반 푸시 알림 추가
- 결과 저장소 연동 (DB, S3)

---


다루는 python 패키지는 다음과 같습니다.

1
2
3
4
5
6
7
fastapi
uvicorn
celery
redis
pydantic
python-dotenv


main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# main.py
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
from worker import heavy_task

app = FastAPI()


class TaskRequest(BaseModel):
    data: str


@app.post("/process")
async def process_data(request: TaskRequest):
    task = heavy_task.delay(request.data)
    return {"message": "Processing started", "task_id": task.id}


@app.get("/status/{task_id}")
async def get_status(task_id: str):
    from worker import app as celery_app

    result = celery_app.AsyncResult(task_id)
    return {"task_id": task_id, "status": result.status}


@app.get("/result/{task_id}")
async def get_result(task_id: str):
    from worker import app as celery_app

    result = celery_app.AsyncResult(task_id)
    if result.ready():
        return {"task_id": task_id, "result": result.result}
    return {"task_id": task_id, "status": result.status, "result": None}


worker.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# worker.py
import os
from celery import Celery
from dotenv import load_dotenv

load_dotenv()  # .env 파일 로드

REDIS_HOST = os.getenv("REDIS_HOST", "redis")
REDIS_PORT = os.getenv("REDIS_PORT", "6379")
REDIS_DB = os.getenv("REDIS_DB", "0")
REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
REDIS_RESULT_EXPIRES = os.getenv("REDIS_RESULT_EXPIRES", "3600")

app = Celery("tasks", broker=REDIS_URL, backend=REDIS_URL)

app.conf.result_expires = REDIS_RESULT_EXPIRES


@app.task
def heavy_task(data):
    import time

    time.sleep(5)  # Simulate heavy processing
    return {"output": f"Processed: {data}"}


.env

1
2
3
4
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_DB=0
REDIS_RESULT_EXPIRES=3600

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# docker-compose.yml
version: "3"
services:
  redis:
    image: redis:alpine
    ports:
      - "6379:6379"
  fastapi:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - .:/app
    command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
    depends_on:
      - redis
  worker:
    build: .
    command: celery -A worker worker --loglevel=info
    volumes:
      - .:/app
    depends_on:
      - redis
    env_file:
      - .env


Dockerfile

1
2
3
4
5
6
7
8
# Dockerfile
FROM python:3.13-slim
WORKDIR /app
COPY . .
RUN pip install --upgrade pip
RUN pip install fastapi uvicorn celery redis pydantic python-dotenv
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]


이제 실행 명령어입니다. README.md

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 실시간 스트리밍 백엔드

## 실행

```powershell
docker info
docker-compose up --build
```

## 접속

```url
http://localhost:8000/docs
```

```powershell
docker exec -it streaming_backend-redis-1 redis-cli
```

```redis-cli
KEYS *
MONITOR
```


fastapi서버에 들어가시면 다음과 같은 endpoint api들이 있습니다.


process api를 보내봅니다.

태스크 아이디가 b935a842-9894-487a-a6d0-86d141f8ec9f 입니다.


이제 redis에 접속해서 해당 태스크가 캐시로 잘 남아있는지 확인해봅니다.

잘 남아있습니다:)

참고로 redis expire 기간을 3600초로 해두었기 때문에 1시간 뒤에 자동으로 삭제되어 무제한 캐시가 쌓이는 일은 없습니다.


이번 짧은 실습을 시작으로 앞으로도 다양한 기초 실습을 해보겠습니다.

감사합니다.

sticker

+추가) Redis큐, celery비동기 워커, flower대쉬보드, fastapi 서버를 사용한 간단한 사이드 프로젝트를 소개하는 다음 포스팅도 도움이 되실 수 있습니다. 관심 있으시다면 구경해주세요~ https://blog.naver.com/devramyun/223838211922

[20250418] Redis 큐, Celery 비동기 워커, Flower 대쉬보드를 사용한 Fastapi 백엔드 사이드프로젝트 및 시연 https://youtu.be/6UFrDmWOGuw 이번 영상도 ppt 발표 부분은 자동으로 만들었습니다. 뒤의 시연 부분… https://youtu.be/6UFrDmWOGuw 이번 영상도 ppt 발표 부분은 자동으로 만들었습니다. 뒤의 시연 부분…

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.