본문 바로가기

카테고리 없음

API 성능 개선 - 비동기 연구

1. 배경 및 목적

LAG 정보를 업데이트하는 인덱싱 요청API는 호출 시 평균 1.2초 이상의 시간이 소요되어, 서비스 응답 속도에 영향을 주고 있음.

이에 따라, 다량의 I/O 작업을 포함하는 해당 API의 비동기 처리 적용 가능성을 검토하고 성능 개선을 시도하였음.

2. 개선 전 성능 현황

  • 주요 병목 구간:
    • DB select 작업 : 최대 2.1s (약 75000건)
    • Queue send 작업 : 최대 0.23s (약 500개 task)
    • Redis set 작업: 최대 0.5s (약 500건)
  • 평균 실행 시간: 1.2s

3. 최종 성능 개선 결과

  • 전체 API 평균 실행 시간: 1.24s → 0.78s
  • 개선 폭: 약 40**% 단축**
  • 주요 개선 포인트: Redis pipeline + 비동기 처리 최적화

4. 성능 개선 방법

4.1 DB 조회 비동기 병렬 실행 (asyncio.gather) 👍

  • 문제점: 문서 타입(faq/word/board)마다 insert/update/delete 된 item을 찾기 위해 3번의 SQL 조회 작업을 순차 실행, 총 9번 SQL 조회
  • 개선 조치: asyncio.gather를 이용하여 각 문서 타입별 비동기 병렬 처리 적용
    • 단, 때때로 오히려 시간이 더 걸리는 경우 발생
      • 후보 원인:
        • DB 커넥션 풀 부족
        • DB 서버 리소스 부족 (CPU, Memory 등)
        • → 모니터링 결과 리소스는 충분
      • 주요 원인 : 내부 쿼리에서 Table(..., autoload_with=engine) 사용 → 동기 DB 엔진 사용으로 인한 오버헤드
      • 원인 해결 : 해당 코드 제거, 별도 비동기 쿼리 사용
        • 기존 코드 : 동기적 실행 CODE (0.7s)
        • 개선 코드 : DB select 작업을 비동기 병렬로 실행 CODE (0.2s)

기존 코드 / 개선 코드

  • 결과:
    • 각 문서 타입별 SQL 평균 실행 시간
      • (약 25000건) : 0.7s→ 0.2s
      • (약 50건) : 0.35s → 0.02s

4.2 Celery Queue send 스레드 분리 시도 👎

  • 문제점 : send_queue는 1~수백 회 호출되며 모두 동기 처리되어 순차적으로 큐에 전송됨
  • 개선 조치 : asyncio.to_thread를 사용해 큐 전송 작업을 쓰레드로 나누고 병렬로 분산 실행
    ⚠️ 개선 실패 원인 (비동기 전환 후 오히려 성능 악화 (0.2s → 0.6s))
  • 주요 원인 : Celery에 작업을 보내는 클라이언트 함수 (send_task)는 모두 동기로 작동함. 비동기 함수 안에서 동기 작업을 실행하면 오히려 오버헤드가 더 커짐.
    • Celery는 기본적으로 kombu라는 동기 AMQP 클라이언트를 사용
    • AMQP 통신 자체가 blocking
    • kombu 자체가 비동기를 지원하지 않음
      • 공식 문서에서도 "비동기 전송을 위한 공식 지원 없음"
  • 해결 방법 1 : send_queue 작업만 to_thread를 사용해 스레드를 분리시키고 asyncio.gather를 이용해 병렬로 처리
    • 동기적 실행 CODE (0.2s)
    • send_queue 큐에 task넣는 작업을 비동기 병렬로 실행 CODE (0.6s)

동기적 실행 code

더보기
for i in range(1, len(update_items) + 1):
    (
        doc_uid,
        earliest_uid,
        earliest_uuid,
        latest_uid,
        latest_uuid,
    ) = update_items[i - 1]
    update_uids[doc_uid].append(
        (
            earliest_uid,
            earliest_uuid,
            latest_uid,
            latest_uuid,
        )
    )

    ## update 작업들을 동기적으로 실행
    if i % CHUNK_SIZE == 0 or i == len(update_items):
        chunk_count += 1
        input_task = document_service.regroup_into_chunk(
            "update", update_uids
        )
        input_value = {
            "configuration": configuration,
            "chunk_number": chunk_count,
            "task": input_task,
            "usr_api_token": user.usr_api_token,
            "cpn_code_uid": user.cpn_code_uid,
        }
        queue_service.send_to_queue(
            task_name="chat_llm_function_document_task",
            queue_name="chat_llm_function_document_tasks",
            input_value=input_value,
            log=f"update {task_id}:{chunk_count}",
            user_info=USER_INFO,
        )
        update_uids.clear()

send_queue 큐에 task넣는 작업을 비동기 병렬로 실행 CODE 

더보기
update_queue_tasks = []
  for i in range(1, len(update_items) + 1):
      doc_uid, earliest_uid, earliest_uuid, latest_uid, latest_uuid = (
          update_items[i - 1]
      )
      update_uids[doc_uid].append(
          [(earliest_uid, earliest_uuid, latest_uid, latest_uuid)]
      )
      if i % CHUNK_SIZE == 0 or i == len(update_items):
          chunk_count += 1
          input_task = document_service.regroup_into_chunk(
              "update", update_uids
          )
          input_value = {
              "configuration": configuration,
              "chunk_number": chunk_count,
              "task": input_task,
          }
          update_queue_tasks.append(
              asyncio.to_thread(                 # thread 분리
                  queue_service.send_to_queue,
                  input_value,
                  log=f"update {task_id}:{chunk_count}",
                  user_info=USER_INFO,
              )
          )
          update_uids.clear()

  # Update 큐 작업들을 병렬로 실행
  if update_queue_tasks:
      await asyncio.gather(*update_queue_tasks)   # 비동기로 실행

 

  • 해결 방법 2 : 연관된 작업들이 동기적으로 실행되며 그 함수가 실행되어야 send_queue가 이루어짐. 따라서 연관된 작업까지 모두 한 함수로 모아 스레드를 분리시킴.
  • 해결 방안의 실패 원인 : ThreadPoolExecutor 오버헤드
    • asyncio.to_thread는 내부적으로 ThreadPoolExecutor를 사용하여 별도의 스레드에서 동기 함수를 실행한다.
    • 이 과정에서 다음과 같은 오버헤드가 발생
      • 쓰레드 풀 큐잉 대기
      • 컨텍스트 스위칭
      • GIL 경쟁
    • 특히, send_task처럼 실행 시간이 짧은 동기 함수는 위 오버헤드에 비해 이점이 작아 순차 실행보다 오히려 느려질 수 있음
  • 향후 개선 방향
    • Kafka 등 비동기 지원 큐 시스템 도입
    • 쓰레드풀 관리
    • 작업 자체를 묶어 batch 처리
    • 큐 전송 횟수를 줄이기

4.3 Redis Pipeline 적용 👍

  • 문제점: Redis 명령어가 매번 RTT(Round Trip Time)를 발생시켜 병목 발생
  • 개선 조치: Redis pipeline 적용으로 소켓 왕복 시간 최소화
  • 동기적 실행 CODE (0.5s)
  • redis set하는 작업을 pipeline을 사용해 한번에 실행 CODE (0.6s)

동기적 실행 CODE 

더보기
redis_service.set_value(
f"root:{user.cpn_code_uid}:document:indexing:state:status", "ing"
)
redis_service.set_value(
f"root:{user.cpn_code_uid}:document:indexing:state:start_date",
timestamp,
216000,
)
redis_service.set_value(
f"root:{user.cpn_code_uid}:document:indexing:state:update_date",
timestamp,
216000,
)
redis_service.set_value(
f"root:{user.cpn_code_uid}:document:indexing:state:task_id",
str(task_id),
216000,
)
redis_service.set_value(
f"root:{user.cpn_code_uid}:document:indexing:state:total_task_count",  # task 총 개수
total_task_count,
216000,
)
redis_service.set_value(
f"root:{user.cpn_code_uid}:document:indexing:state:total_task_batch_count",  # chunck 총 개수
chunk_count,
216000,
)
redis_service.set_value(
f"root:{user.cpn_code_uid}:document:indexing:state:completed_task_batch_count",  # 완료된 chunk 개수
0,
216000,
)
if task_id and chunk_count:
	for i in range(1, chunk_count + 1):
		redis_service.set_value(
		    f"root:{user.cpn_code_uid}:document:indexing:state:{task_id}#{i}:completed",  # 각 chunk마다 완료 여부
		    0,
		    216000,
		)
		redis_service.set_value(
		    f"root:{user.cpn_code_uid}:document:indexing:state:{task_id}#{i}:update_date",
		    timestamp,
		    216000,
		)

redis set하는 작업을 pipeline을 사용해 한번에 실행 CODE

더보기
redis_values = [
    (f"root:{user.cpn_code_uid}:document:indexing:state:status", "ing", None),
    (
        f"root:{user.cpn_code_uid}:document:indexing:state:start_date",
        timestamp,
        216000,
    ),
    (
        f"root:{user.cpn_code_uid}:document:indexing:state:update_date",
        timestamp,
        216000,
    ),
    (
        f"root:{user.cpn_code_uid}:document:indexing:state:task_id",
        str(task_id),
        216000,
    ),
    (
        f"root:{user.cpn_code_uid}:document:indexing:state:total_task_count",
        total_task_count,
        216000,
    ),
    (
        f"root:{user.cpn_code_uid}:document:indexing:state:task_batch_count",
        chunk_count,
        216000,
    ),
]

# 청크별 값 추가
for i in range(1, chunk_count + 1):
    redis_values.append(
        (
            f"root:{user.cpn_code_uid}:document:indexing:state:{task_id}#{i}:completed_task_count",
            0,
            216000,
        )
    )
    redis_values.append(
        (
            f"root:{user.cpn_code_uid}:document:indexing:state:{task_id}#{i}:update_date",
            timestamp,
            216000,
        )
    )

await redis_service.set_values_pipeline(redis_values)
  • 결과:
    • 실행 시간 0.5s → 0.01s
    • 전체 API 시간 약 0.5s 감소

 

5. 향후 개선 방향

  • DB 연결 풀 튜닝 (현재 max 100개)
  • SQL 최적화 및 ORM 여부 검토
  • DB 컬럼 인덱싱 적용

 

6. CODE

API

insert_items, update_items, delete_items = await asyncio.gather(
    document_service.select_faq_insert_items(
        target_document_uids[target_enum_document_code_uid], enum_document
    ),
    document_service.select_faq_update_items(
        target_document_uids[target_enum_document_code_uid], enum_document
    ),
    document_service.select_faq_delete_items(
        target_document_uids[target_enum_document_code_uid], enum_document
    ),
)

 


Service

async def select_faq_insert_items(
    self, document_uids: list[int], enum_document: EnumDocumentModel
):
    return await self.document_crud.select_document_insert_items_orm(document_uids)

 

 

Crud

async def select_document_insert_items_orm(self, document_uids: list[int]):
  async with AsyncSessionLocal() as session:
      # 1. 먼저 uuid별로 is_indexed True가 없는 uuid만 추출
      subq = (
          select(DocumentItemFaq.docitmfaq_uuid)
          .where(
              DocumentItemFaq.doc_uid.in_(document_uids),
              DocumentItemFaq.docitmfaq_deleted_at.is_(None),
          )
          .group_by(DocumentItemFaq.docitmfaq_uuid)
          .having(
              func.count(case((DocumentItemFaq.docitmfaq_is_indexed == True, 1)))
              == 0
          )
          .subquery()
      )

      # 2. 해당 uuid에 대해 최신 row 추출
      stmt = (
          select(
              DocumentItemFaq.doc_uid.label("document_uid"),
              DocumentItemFaq.docitmfaq_uid.label("item_uid"),
              DocumentItemFaq.docitmfaq_uuid.label("item_uuid"),
          )
          .distinct(DocumentItemFaq.docitmfaq_uuid)
          .join(
              subq,
              DocumentItemFaq.docitmfaq_uuid == subq.c.docitmfaq_uuid,
          )
          .where(DocumentItemFaq.docitmfaq_deleted_at.is_(None))
          .order_by(
              DocumentItemFaq.docitmfaq_uuid,
              desc(DocumentItemFaq.docitmfaq_updated_at),
          )
      )

      result = await session.execute(stmt)
      return result.all()

 

 

Postgresql setting

import contextlib
from typing import AsyncGenerator

from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

from chat_backend.config import settings


async_engine = create_async_engine(
    f"postgresql+asyncpg://{settings.DB_BE_USERNAME}:{settings.DB_BE_PASSWORD}@{settings.DB_ENDPOINT}:{settings.DB_PORT}/{settings.DB_NAME}",
)
AsyncSessionLocal = sessionmaker(
    bind=async_engine, class_=AsyncSession, expire_on_commit=False
)

 

 

Redis setting

import os

import redis.asyncio as redis

from chat_backend.config import settings

client = redis.Redis(
    host=settings.RD_ENDPOINT, port=settings.RD_PORT, decode_responses=True
)
from typing import Optional

from redis.asyncio import Redis
import json

from chat_backend.chat_nosql.client import client

class RedisService:
    def __init__(self, client: Redis):
        self.client = client

    async def set_value(
        self, key: str, value: str, expire: Optional[int] = None
    ) -> bool:
        try:
            value = json.dumps(value)
            if expire:
                await self.client.set(key, value, ex=expire)
            else:
                await self.client.set(key, value)
            return True
        except Exception as e:
            print(f"Error setting value in Redis: {e}")
            return False

    async def get_value(self, key: str) -> str | None:
        try:
            result = await self.client.get(key)
            if result:
                return json.loads(result)
            return None
        except Exception as e:
            print(f"Error getting value from Redis: {e}")
            return None

    async def remove_value(self, key: str) -> int:
        try:
            return await self.client.delete(key)
        except Exception as e:
            print(f"Error deleting value from Redis: {e}")
            return 0

    async def set_values_pipeline(self, values: list[tuple[str, str, int | None]]):
        try:
            pipe = self.client.pipeline()
            for key, value, expire in values:
                value = json.dumps(value)
                if expire:
                    pipe.set(key, value, ex=expire)
                else:
                    pipe.set(key, value)
            await pipe.execute()
            return True
        except Exception as e:
            print(f"Error setting values in Redis pipeline: {e}")
            return False

def get_redis_service() -> RedisService:
    return RedisService(client)

 

 

pyproject.toml

asyncio = "^3.4.3"
asyncpg = "^0.30.0"