포스트

FastAPI에서 동시성 처리 방법: 10가지 비동기 최적화 기술, 그리고 추천하는 것


FastAPI는 Python의 asyncio를 기반으로 설계되어, 고성능 웹 API 서버를 구현하는 데 매우 적합합니다. 하지만 그 진가를 제대로 발휘하려면 동시성 처리에 대한 기술 이해가 필요합니다. 이번 글에서는 FastAPI에서 동시성 처리에 반드시 알아야 할 10가지 기술과 간단한 코드 예제를 함께 소개합니다.


1. async / await 기반 비동기 프로그래밍 FastAPI는 비동기 요청 처리를 위해 Python의 async / await 문법을 적극 활용합니다. 이를 통해 I/O 대기 시간 동안 다른 요청을 처리할 수 있어, 성능 병목을 줄일 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
from fastapi import FastAPI
import asyncio

app = FastAPI()

async def delay():
    await asyncio.sleep(1)
    return "done"

@app.get("/async-demo")
async def demo():
    result = await delay()
    return {"result": result}

2. asyncio 코루틴 및 태스크 관리 단순한 비동기 처리 외에도 여러 작업을 병렬 실행하고 싶을 때는 asyncio.gather()나 asyncio.create_task()를 사용해 코루틴을 병렬적으로 실행할 수 있습니다.

상황추천 방식
단순히 병렬 실행하고 결과만 필요할 때asyncio.gather()
태스크를 추적하거나, 나중에 제어해야 할 때asyncio.create_task()
1
2
3
4
5
6
7
8
9
10
11
12
13
async def job(n):
    await asyncio.sleep(n)
    return f"job {n}"

@app.get("/gather")
async def gather_example():
    results = await asyncio.gather(
        job(1),
        job(2),
        job(3)
    )
    return {"results": results}

모든 작업이 병렬로 실행되고 모든 결과를 **한 번에 기다림. **[“job 1”, “job 2”, “job 3”] 같이 반환.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@app.get("/task")
async def task_example():
    t1 = asyncio.create_task(job(1))
    t2 = asyncio.create_task(job(2))
    t3 = asyncio.create_task(job(3))

    # 다른 작업 가능 (예: logging, DB 등)
    print("Doing something else...")

    # 각 태스크를 개별적으로 await
    r1 = await t1
    r2 = await t2
    r3 = await t3

    return {"results": [r1, r2, r3]}

태스크를 별도로 관리하거나 중간에 다른 로직 수행 가능. 에러 제어를 따로 할 수 있음.


3. 비동기 데이터베이스 드라이버 사용 (예: Databases)** FastAPI에서 동기 DB 드라이버를 사용할 경우 전체 이벤트 루프가 블로킹되어 성능이 급감할 수 있습니다. asyncpg, Databases, SQLAlchemy Async, motor 등 **비동기 DB 드라이버 사용은 필수입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from fastapi import FastAPI
from databases import Database

app = FastAPI()

# PostgreSQL 연결 URI
DATABASE_URL = "postgresql+asyncpg://postgres:your_password@localhost:5432/your_database"

# 비동기 Database 인스턴스 생성
database = Database(DATABASE_URL)

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()

@app.get("/users")
async def get_users():
    return await database.fetch_all("SELECT * FROM users")


4. 비동기 태스크 큐 도입 (Celery, RQ 등) 장시간 걸리는 작업은 웹 요청 응답 흐름에서 분리해 백그라운드 큐로 넘기는 것이 좋습니다. Redis + Celery 조합은 가장 널리 쓰이며, FastAPI와도 잘 어울립니다.

1
2
3
4
5
6
7
8
9
10
# tasks.py
from celery import Celery

# 1. Celery 앱 생성 및 설정
celery_app = Celery("worker", broker="redis://localhost:6379/0")

@celery_app.task
def add(x, y):
    return x + y

1
2
3
4
5
6
7
8
# FastAPI endpoint
from tasks import add # 위에서 만든 Celery 태스크 import

@app.post("/add")
def run_add():
    task = add.delay(3, 5) # add(x, y) 실행 요청 → Redis 큐에 등록
    return {"task_id": task.id}

FastAPI → Celery 태스크 큐에 요청 등록 → Redis → Celery 워커가 꺼내 실행.

  • add.delay : Celery 큐에 태스크 등록. 비동기적으로 실행하도록 함. (자주 쓰는 Celery의 필수 명령어들은 다른 포스팅에서 다룰 예정입니다.)

5. FastAPI의 BackgroundTasks 기능 간단한 비동기 작업이라면 굳이 Celery까지 쓰지 않고 FastAPI 내장 기능으로도 가능합니다. BackgroundTasks는 응답 이후에 실행할 태스크를 예약합니다.

1
2
3
4
5
6
7
8
9
10
11
12
from fastapi import BackgroundTasks

def write_log(msg: str):
    with open("log.txt", "a") as f:
        f.write(msg + "\n")

@app.post("/log")
async def log_message(background_tasks: BackgroundTasks):
    background_tasks.add_task(write_log, "New log entry")
    return {"status": "logging started"}


6. 비동기 파일 I/O 및 외부 API 호출 동기 파일 처리나 requests 모듈을 그대로 사용하면 병목이 발생합니다. aiofiles, httpx.AsyncClient 등 비동기 I/O 모듈을 적극적으로 사용해야 합니다. ** **

문제 상황설명
동기 파일 읽기open() 등은 블로킹 → 한 요청이 파일 읽는 동안 다른 요청은 기다려야 함
동기 HTTP 요청requests.get()은 외부 API 응답 대기 동안 전체 이벤트 루프가 정지

항목aiofileshttpx.AsyncClient
🌐 용도파일 읽기/쓰기 (비동기)외부 API 호출 (비동기)
📦 설치pip install aiofilespip install httpx
🔁 내부 동작asyncio + 스레드풀로 파일 처리httpx.AsyncClient는 내부적으로 HTTP/1.1, HTTP/2 비동기 처리
🔄 대체 대상open(), f.read() 등 동기 파일 I/Orequests.get/post() 같은 동기 HTTP 요청

1
2
3
4
5
6
7
8
9
10
11
import aiofiles
from fastapi import FastAPI

app = FastAPI()

@app.get("/save")
async def save_to_file():
    async with aiofiles.open("log.txt", mode="a") as f:
        await f.write("Log line\n")
    return {"status": "saved"}

요청 응답과 동시에 다른 클라이언트도 파일 쓰기 병렬 수행 가능. ** **비동기 외부 API 요청 - httpx.AsyncClient>**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import aiofiles
import httpx

@app.get("/fetch")
async def fetch_data():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://httpbin.org/get")
        return response.json()

@app.get("/save")
async def save_file():
    async with aiofiles.open("example.txt", "w") as f:
        await f.write("Hello async file")
    return {"status": "saved"}

requests.get()과 달리 이벤트 루프가 멈추지 않고 다른 요청도 병렬 처리됨.

그리고 병렬로 외부 API 여러 개 요청할 경우 다음과 같이 asyncio.gather() + httpx 조합으로도 사용 가능합니다. (두 API 병렬 요청 → 처리 시간 단축)

1
2
3
4
5
6
7
8
9
10
11
12
async def get_api_1(client):
    return await client.get("https://api.site1.com")

async def get_api_2(client):
    return await client.get("https://api.site2.com")

@app.get("/parallel-fetch")
async def fetch_both():
    async with httpx.AsyncClient() as client:
        r1, r2 = await asyncio.gather(get_api_1(client), get_api_2(client))
    return {"site1": r1.json(), "site2": r2.json()}

목적추천 모듈예시
비동기 파일 저장/로드aiofiles이미지, 로그 파일, 텍스트 등
외부 API 호출 (GET/POST)httpx.AsyncClient타 서버 REST API, 인증 등
다중 요청 병렬화asyncio.gather() + httpx속도 향상

  • ThreadPoolExecutorGIL은 있지만, 비동기적으로 블로킹 작업을 분리 가능
  • asyncio에서 씀.
  • 간단한 블로킹 함수(ex: sleep, 파일 등)를 분리하고자 할 때 사용.
  • ProcessPoolExecutor아예 다른 멀티코어 프로세스에서 실행되어 GIL에 의한 영향을 받지 않음.
  • 계산, 영상처리, 압축, ML 추론 등에 씀.
  • 무거운 CPU 연산, 멀티코어를 활용하고자 할 때 사용.
  • Async def + await블로킹이 없거나 CPU 연산이 적은 비동기 API, 파일, DB, 네트워크를 수행하고자 할 때 사용.
예시GIL 영향추천 executor
time.sleep()블로킹ThreadPoolExecutor
이미지 압축CPU 집중ProcessPoolExecutor
SHA256 해시 계산CPU 집중ProcessPoolExecutor
데이터프레임 연산CPU 집중ProcessPoolExecutor
외부 API 호출없음 (I/O)asyncio, httpx.AsyncClient

**ThreadPoolExecutor>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI

app = FastAPI()
executor = ThreadPoolExecutor()

def heavy_work(n):
    time.sleep(n)  # CPU-blocking 작업 (예시)
    return f"{n}초 후 완료"

@app.get("/compute")
async def run_heavy():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(executor, heavy_work, 3)
    return {"result": result}

**ProcessPoolExecutor>

1
2
3
4
5
6
7
8
9
10
from concurrent.futures import ProcessPoolExecutor

executor = ProcessPoolExecutor()

@app.get("/cpu-intensive")
async def run_cpu():
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(executor, lambda: sum(range(10**7)))
    return {"result": result}


8. Uvicorn / Gunicorn 환경 최적화 FastAPI는 ASGI 서버인 Uvicorn에서 실행되며, 이를 Gunicorn과 함께 운영 환경에서 배포하는 것도 가능합니다. 적절한 –workers 설정은 서버의 병렬 처리 능력을 높여줍니다.

Gunicorn이란 Python 웹 앱을 실행할 수 있게 해주는 WSGI/ASGI 서버 매니저이고,

멀티 프로세스 처리(worker)를 통해 동시 요청 처리량을 크게 향상시켜줍니다.

Gunicorn은 멀티코어 프로세스를 관리하고, 각 프로세스는 Uvicorn으로 Fastapi를 실행하여

멀티코어를 활용한 더 많은 요청을 처리를 감당할 수 있습니다.

1
2
3
4
5
6
# 개발용: 단일 프로세스, 코드 자동 리로드
uvicorn main:app --reload

# 운영용: Gunicorn이 Uvicorn worker를 여러 개 띄워서 병렬 처리
gunicorn -k uvicorn.workers.UvicornWorker main:app --workers 4

옵션의미
-k uvicorn.workers.UvicornWorkerUvicorn을 Worker로 사용
–workers 4워커(프로세스)를 4개 띄움 (코어 수에 맞게 조절)

9. 동시성 이슈 예방: 락, 세마포어, 레이스 컨디션 동시 요청 처리 중에 발생하는 경쟁 조건(race condition)을 방지하려면

asyncio.Lock, ** ** **asyncio.Semaphore **

등의 동기화 도구를 사용해 주의 깊게 설계해야 합니다.

경쟁 조건이란, 두 개 이상의 요청/쓰레드가 동시에 같은 자원을 읽고/쓰며, 실행 순서에 따라 결과가 달라질 수 있는 상태입니다.

다음과 같은 예시 문제가 있다고 합시다.

1
2
3
4
5
6
7
8
9
10
counter = 0

@app.post("/increase")
async def increase():
    global counter
    current = counter      # 1. 읽기
    await asyncio.sleep(1) # 2. 다른 요청도 들어옴
    counter = current + 1  # 3. 쓰기 (경쟁 조건 발생)
    return {"counter": counter}

동시에 요청하게 되면 counter 값이 제대로 증가하지 않습니다.

그래서 이 경우 asyncio.Lock 으로 하나의 요청만 락 안에서 실행되도록 하여 경쟁 조건을 방지할 수 있습니다.

async with lock이 끝나야 다음 요청이 실행되는 흐름입니다.

1
2
3
4
async with lock:
    # 이 블록 안은 오직 한 코루틴만 진입 가능
    write_to_file()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

lock = asyncio.Lock()
shared_counter = 0

@app.post("/safe-increase")
async def safe_increase():
    global shared_counter 
    async with lock:  # 락 걸기
        current = shared_counter 
        await asyncio.sleep(1)
        counter = current + 1
    return {"counter": shared_counter }

그리고 lock보다 조금 더 유연한 제어도구로써 asyncio.Semaphore가 있습니다.

1
2
3
async with semaphore:  # N개 동시에 가능
    data = await fetch_data()

1
2
3
4
5
6
7
8
semaphore = asyncio.Semaphore(3)

@app.get("/limited-access")
async def limited_area():
    async with semaphore:
        await asyncio.sleep(2)
        return {"status": "ok"}

1개의 요청만 안전하게 격리해서 처리하는 lock과 다르게 semaphore는 동시에 최대 n개의 요청을 허용할 수 있도록 합니다.

항목LockSemaphore
제한 수1개만 허용여러 개 설정 가능 (예: 3개)
쓰기 보호✅ (강력)❌ (제한적)
읽기 제한

Lock은 “누가 먼저 진입했는지”에 민감하고, Semaphore는 “몇 명이 진입 중인지”에 집중함. ** 따라서 공유 변수를 동시에 수정하는 쓰기 작업에는 Lock, 제한된 리소스에 동시에 접근해야 하는 읽기 작업에는 Semaphore가 적절합니다. **

문제해결 도구예시
공유 변수 동시에 수정asyncio.Lock카운터, 포인트, 파일 쓰기
제한된 리소스 접근asyncio.SemaphoreAPI rate limit, 다운로드 슬롯 등

주의할 점은,

Lock은 과도하게 사용할 시 병목(Bottleneck)의 원인이 되므로 적절히 꼭 필요한 곳에만 사용하는 것이 좋습니다.

또한 락이 풀리지 않는 데드락(deadlock)이 생길 경우에 대처할 수 있도록 try-finally나 async with와 함께 쓰는 것이 좋습니다. (데드락 = 서로 락을 주고받기만 하다가 아무도 못 움직이는 상태. 해결법: 락 순서 정리, 타임아웃 회피, 락 최소화 등… 자세한 내용은 다른 포스팅에서 다룰 예정.)

그리고 Semaphore는 쓰기 작업에 대한 RaceCondition을 완전히 해소해주지 못하므로, 참고해야 합니다.


10. Redis를 이용한 분산 락과 상태 관리 Redis를 활용하면 다중 서버 환경에서도 분산된 락, TTL 기반 상태관리, 큐 캐싱 등을 통해 확장 가능한 동시성 제어 구조를 만들 수 있습니다. 특히 aioredis를 사용하면 FastAPI와도 자연스럽게 연동됩니다.* (aioredis는 Redis와 비동기적으로 통신할 수 있는 Python 라이브러리)*

원래대로라면 python의 asyncio.Lock은 프로세스 내부에서만 작동하므로, 서버가 2대 이상이면 서로의 락을 공유할 수 없습니다. 따라서 Redis에 락을 걸면, 모든 서버들은 중앙에 있는 Redis를 조회하여 동일한 상태의 락을 공유받을 수 있습니다.

1
2
SET lock_key value NX EX 10

(SET 명령을 락처럼 씀.)

옵션설명
NX키가 없을 때만 설정 (다른 프로세스가 이미 락을 잡았으면 실패)
EX 1010초 동안만 유효 (TTL: 유효시간(Time-To-Live))
value고유 ID (락 해제 시 본인이 잡은 락인지 확인할 때 사용)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import aioredis
import uuid

redis = aioredis.from_url("redis://localhost")

lock_key = "lock:resource"
lock_value = str(uuid.uuid4())  # 고유 식별자

# 락 획득 시도
success = await redis.set(lock_key, lock_value, ex=10, nx=True)

if success:
    try:
        # 락을 얻은 상태 → 안전한 작업 수행
        ...
    finally:
        # 락 해제 (본인만 가능)
        if await redis.get(lock_key) == lock_value:
            await redis.delete(lock_key)


FastAPI는 단순히 빠르기만 한 프레임워크가 아니라, 비동기 처리 구조를 정확히 이해하고 설계했을 때 진가를 발휘합니다.

이번에 소개한 10가지 항목은 FastAPI에서 동시성을 잘 다루기 위한 핵심 기술들이며, 이들을 어떻게 조합하고 활용하느냐에 따라 실제 서비스의 확장성과 안정성이 달라집니다.


  • 기대 성능 향상 효과 (이득 크기)
  • 적용 난이도 (시간/학습/구현 복잡도)
1
2
3
4
5
6
7
1. asyncpg + SQLAlchemy Async 도입
2. API 구조를 async def + await로 정리
3. 외부 I/O(httpx, aiofiles 등) 비동기화
4. Gunicorn worker 수 조정 (2~4개)
5. BackgroundTasks로 후처리 분리
6. Celery + Redis 도입 (고부하 처리 시)

단계항목기대성능향상폭적용난이도설명
1비동기 DB 드라이버 (asyncpg) 적용🟢 매우 큼🟡 보통SQLAlchemy Async 또는 Databases로 교체 → 대부분의 I/O 병목 해소
2API 로직 async def / await 구조화🟢 큼🟢 쉬움순수 Python 로직에서 awaitable 구조로 개선
3httpx, aiofiles 등 비동기 I/O 도입🟢 큼🟢 쉬움파일 처리, 외부 API, Redis 등을 비동기로
4Uvicorn + Gunicorn 튜닝 (멀티 워커)🟡 중간🟢 매우 쉬움–workers, –loop, –http 옵션 조정
5FastAPI BackgroundTasks 활용🟡 중간🟢 쉬움단순 후처리, 로그 저장 등에 적합
6Celery + Redis 백그라운드 큐 도입🟢 매우 큼🔴 어려움이미지 처리, 이메일, 대기 시간 긴 작업 처리
7asyncio.gather, create_task 등 병렬 처리 최적화🟡 중간🟡 보통다수의 API 호출 또는 작업을 동시에 실행
8ThreadPoolExecutor / ProcessPoolExecutor 분리 처리🟡 제한적🟡 보통CPU-bound 작업이 있을 때만 사용
9분산 락 / Redis TTL 상태 관리🟡 중간🔴 어려움경합 자원 보호, 상태 공유 시 적용
10asyncio.Lock / Semaphore 등 동시성 제어🔵 상황 의존🔵 상황 의존상태 경합, 실시간 카운팅 등에서만 필요

Executor, 분산 락, Semaphore 등 좀더 까다로워보이는 기법은 그 효과가 제한적이거나 상황에 따라 달라서 적용을 고려할 때에는 후순위에 자리하고 있네요.

그리고 API로직의 비동기화, 그리고 비동기 I/O 도입, 멀티워커, Celery와 백그라운드 큐, Fastapi의 백그라운드태스크 등은 적용 난이도도 쉽고 기대 성능 향상 폭도 큰 편입니다.

감사합니다.

sticker

#동시성 #비동기 #asyncio #asyncawait #Python비동기 #코루틴 #비동기프로그래밍 #Concurrency #병렬처리 #ThreadPoolExecutor #FastAPI #FastAPI튜토리얼 #FastAPI동시성 #FastAPI비동기 #FastAPI백엔드 #PythonFastAPI #FastAPI서버 #FastAPI개발 #Uvicorn #Gunicorn #Redis #Celery #백그라운드작업 #ASGI서버 #서버최적화 #비동기DB #aiofiles #httpx #비동기API #Python서버개발

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.