1. 배경 및 목적
LAG 정보를 업데이트하는 인덱싱 요청API는 호출 시 평균 1.2초 이상의 시간이 소요되어, 서비스 응답 속도에 영향을 주고 있음.
이에 따라, 다량의 I/O 작업을 포함하는 해당 API의 비동기 처리 적용 가능성을 검토하고 성능 개선을 시도하였음.
2. 개선 전 성능 현황
- 주요 병목 구간:
- DB select 작업 : 최대 2.1s (약 75000건)
- Queue send 작업 : 최대 0.23s (약 500개 task)
- Redis set 작업: 최대 0.5s (약 500건)
- 평균 실행 시간: 1.2s
3. 최종 성능 개선 결과
- 전체 API 평균 실행 시간: 1.24s → 0.78s
- 개선 폭: 약 40**% 단축**
- 주요 개선 포인트: Redis pipeline + 비동기 처리 최적화
4. 성능 개선 방법
4.1 DB 조회 비동기 병렬 실행 (asyncio.gather) 👍
- 문제점: 문서 타입(faq/word/board)마다 insert/update/delete 된 item을 찾기 위해 3번의 SQL 조회 작업을 순차 실행, 총 9번 SQL 조회
- 개선 조치: asyncio.gather를 이용하여 각 문서 타입별 비동기 병렬 처리 적용
- 단, 때때로 오히려 시간이 더 걸리는 경우 발생
- 후보 원인:
- DB 커넥션 풀 부족
- DB 서버 리소스 부족 (CPU, Memory 등)
- → 모니터링 결과 리소스는 충분
- 주요 원인 : 내부 쿼리에서 Table(..., autoload_with=engine) 사용 → 동기 DB 엔진 사용으로 인한 오버헤드
- 원인 해결 : 해당 코드 제거, 별도 비동기 쿼리 사용
- 기존 코드 : 동기적 실행 CODE (0.7s)
- 개선 코드 : DB select 작업을 비동기 병렬로 실행 CODE (0.2s)
- 후보 원인:
- 단, 때때로 오히려 시간이 더 걸리는 경우 발생


- 결과:
- 각 문서 타입별 SQL 평균 실행 시간
- (약 25000건) : 0.7s→ 0.2s
- (약 50건) : 0.35s → 0.02s
- 각 문서 타입별 SQL 평균 실행 시간
4.2 Celery Queue send 스레드 분리 시도 👎
- 문제점 : send_queue는 1~수백 회 호출되며 모두 동기 처리되어 순차적으로 큐에 전송됨
- 개선 조치 : asyncio.to_thread를 사용해 큐 전송 작업을 쓰레드로 나누고 병렬로 분산 실행
⚠️ 개선 실패 원인 (비동기 전환 후 오히려 성능 악화 (0.2s → 0.6s)) - 주요 원인 : Celery에 작업을 보내는 클라이언트 함수 (send_task)는 모두 동기로 작동함. 비동기 함수 안에서 동기 작업을 실행하면 오히려 오버헤드가 더 커짐.
- Celery는 기본적으로 kombu라는 동기 AMQP 클라이언트를 사용
- AMQP 통신 자체가 blocking
- kombu 자체가 비동기를 지원하지 않음
- 공식 문서에서도 "비동기 전송을 위한 공식 지원 없음"
- 해결 방법 1 : send_queue 작업만 to_thread를 사용해 스레드를 분리시키고 asyncio.gather를 이용해 병렬로 처리
- 동기적 실행 CODE (0.2s)
- send_queue 큐에 task넣는 작업을 비동기 병렬로 실행 CODE (0.6s)
동기적 실행 code
더보기
for i in range(1, len(update_items) + 1):
(
doc_uid,
earliest_uid,
earliest_uuid,
latest_uid,
latest_uuid,
) = update_items[i - 1]
update_uids[doc_uid].append(
(
earliest_uid,
earliest_uuid,
latest_uid,
latest_uuid,
)
)
## update 작업들을 동기적으로 실행
if i % CHUNK_SIZE == 0 or i == len(update_items):
chunk_count += 1
input_task = document_service.regroup_into_chunk(
"update", update_uids
)
input_value = {
"configuration": configuration,
"chunk_number": chunk_count,
"task": input_task,
"usr_api_token": user.usr_api_token,
"cpn_code_uid": user.cpn_code_uid,
}
queue_service.send_to_queue(
task_name="chat_llm_function_document_task",
queue_name="chat_llm_function_document_tasks",
input_value=input_value,
log=f"update {task_id}:{chunk_count}",
user_info=USER_INFO,
)
update_uids.clear()
send_queue 큐에 task넣는 작업을 비동기 병렬로 실행 CODE
더보기
update_queue_tasks = []
for i in range(1, len(update_items) + 1):
doc_uid, earliest_uid, earliest_uuid, latest_uid, latest_uuid = (
update_items[i - 1]
)
update_uids[doc_uid].append(
[(earliest_uid, earliest_uuid, latest_uid, latest_uuid)]
)
if i % CHUNK_SIZE == 0 or i == len(update_items):
chunk_count += 1
input_task = document_service.regroup_into_chunk(
"update", update_uids
)
input_value = {
"configuration": configuration,
"chunk_number": chunk_count,
"task": input_task,
}
update_queue_tasks.append(
asyncio.to_thread( # thread 분리
queue_service.send_to_queue,
input_value,
log=f"update {task_id}:{chunk_count}",
user_info=USER_INFO,
)
)
update_uids.clear()
# Update 큐 작업들을 병렬로 실행
if update_queue_tasks:
await asyncio.gather(*update_queue_tasks) # 비동기로 실행
- 해결 방법 2 : 연관된 작업들이 동기적으로 실행되며 그 함수가 실행되어야 send_queue가 이루어짐. 따라서 연관된 작업까지 모두 한 함수로 모아 스레드를 분리시킴.
- 해결 방안의 실패 원인 : ThreadPoolExecutor 오버헤드
- asyncio.to_thread는 내부적으로 ThreadPoolExecutor를 사용하여 별도의 스레드에서 동기 함수를 실행한다.
- 이 과정에서 다음과 같은 오버헤드가 발생
- 쓰레드 풀 큐잉 대기
- 컨텍스트 스위칭
- GIL 경쟁
- 특히, send_task처럼 실행 시간이 짧은 동기 함수는 위 오버헤드에 비해 이점이 작아 순차 실행보다 오히려 느려질 수 있음
- 향후 개선 방향
- Kafka 등 비동기 지원 큐 시스템 도입
- 쓰레드풀 관리
- 작업 자체를 묶어 batch 처리
- 큐 전송 횟수를 줄이기
4.3 Redis Pipeline 적용 👍
- 문제점: Redis 명령어가 매번 RTT(Round Trip Time)를 발생시켜 병목 발생
- 개선 조치: Redis pipeline 적용으로 소켓 왕복 시간 최소화
- 동기적 실행 CODE (0.5s)
- redis set하는 작업을 pipeline을 사용해 한번에 실행 CODE (0.6s)
동기적 실행 CODE
더보기
redis_service.set_value(
f"root:{user.cpn_code_uid}:document:indexing:state:status", "ing"
)
redis_service.set_value(
f"root:{user.cpn_code_uid}:document:indexing:state:start_date",
timestamp,
216000,
)
redis_service.set_value(
f"root:{user.cpn_code_uid}:document:indexing:state:update_date",
timestamp,
216000,
)
redis_service.set_value(
f"root:{user.cpn_code_uid}:document:indexing:state:task_id",
str(task_id),
216000,
)
redis_service.set_value(
f"root:{user.cpn_code_uid}:document:indexing:state:total_task_count", # task 총 개수
total_task_count,
216000,
)
redis_service.set_value(
f"root:{user.cpn_code_uid}:document:indexing:state:total_task_batch_count", # chunck 총 개수
chunk_count,
216000,
)
redis_service.set_value(
f"root:{user.cpn_code_uid}:document:indexing:state:completed_task_batch_count", # 완료된 chunk 개수
0,
216000,
)
if task_id and chunk_count:
for i in range(1, chunk_count + 1):
redis_service.set_value(
f"root:{user.cpn_code_uid}:document:indexing:state:{task_id}#{i}:completed", # 각 chunk마다 완료 여부
0,
216000,
)
redis_service.set_value(
f"root:{user.cpn_code_uid}:document:indexing:state:{task_id}#{i}:update_date",
timestamp,
216000,
)
redis set하는 작업을 pipeline을 사용해 한번에 실행 CODE
더보기
redis_values = [
(f"root:{user.cpn_code_uid}:document:indexing:state:status", "ing", None),
(
f"root:{user.cpn_code_uid}:document:indexing:state:start_date",
timestamp,
216000,
),
(
f"root:{user.cpn_code_uid}:document:indexing:state:update_date",
timestamp,
216000,
),
(
f"root:{user.cpn_code_uid}:document:indexing:state:task_id",
str(task_id),
216000,
),
(
f"root:{user.cpn_code_uid}:document:indexing:state:total_task_count",
total_task_count,
216000,
),
(
f"root:{user.cpn_code_uid}:document:indexing:state:task_batch_count",
chunk_count,
216000,
),
]
# 청크별 값 추가
for i in range(1, chunk_count + 1):
redis_values.append(
(
f"root:{user.cpn_code_uid}:document:indexing:state:{task_id}#{i}:completed_task_count",
0,
216000,
)
)
redis_values.append(
(
f"root:{user.cpn_code_uid}:document:indexing:state:{task_id}#{i}:update_date",
timestamp,
216000,
)
)
await redis_service.set_values_pipeline(redis_values)
- 결과:
- 실행 시간 0.5s → 0.01s
- 전체 API 시간 약 0.5s 감소
5. 향후 개선 방향
- DB 연결 풀 튜닝 (현재 max 100개)
- SQL 최적화 및 ORM 여부 검토
- DB 컬럼 인덱싱 적용
6. CODE
API
insert_items, update_items, delete_items = await asyncio.gather(
document_service.select_faq_insert_items(
target_document_uids[target_enum_document_code_uid], enum_document
),
document_service.select_faq_update_items(
target_document_uids[target_enum_document_code_uid], enum_document
),
document_service.select_faq_delete_items(
target_document_uids[target_enum_document_code_uid], enum_document
),
)
Service
async def select_faq_insert_items(
self, document_uids: list[int], enum_document: EnumDocumentModel
):
return await self.document_crud.select_document_insert_items_orm(document_uids)
Crud
async def select_document_insert_items_orm(self, document_uids: list[int]):
async with AsyncSessionLocal() as session:
# 1. 먼저 uuid별로 is_indexed True가 없는 uuid만 추출
subq = (
select(DocumentItemFaq.docitmfaq_uuid)
.where(
DocumentItemFaq.doc_uid.in_(document_uids),
DocumentItemFaq.docitmfaq_deleted_at.is_(None),
)
.group_by(DocumentItemFaq.docitmfaq_uuid)
.having(
func.count(case((DocumentItemFaq.docitmfaq_is_indexed == True, 1)))
== 0
)
.subquery()
)
# 2. 해당 uuid에 대해 최신 row 추출
stmt = (
select(
DocumentItemFaq.doc_uid.label("document_uid"),
DocumentItemFaq.docitmfaq_uid.label("item_uid"),
DocumentItemFaq.docitmfaq_uuid.label("item_uuid"),
)
.distinct(DocumentItemFaq.docitmfaq_uuid)
.join(
subq,
DocumentItemFaq.docitmfaq_uuid == subq.c.docitmfaq_uuid,
)
.where(DocumentItemFaq.docitmfaq_deleted_at.is_(None))
.order_by(
DocumentItemFaq.docitmfaq_uuid,
desc(DocumentItemFaq.docitmfaq_updated_at),
)
)
result = await session.execute(stmt)
return result.all()
Postgresql setting
import contextlib
from typing import AsyncGenerator
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from chat_backend.config import settings
async_engine = create_async_engine(
f"postgresql+asyncpg://{settings.DB_BE_USERNAME}:{settings.DB_BE_PASSWORD}@{settings.DB_ENDPOINT}:{settings.DB_PORT}/{settings.DB_NAME}",
)
AsyncSessionLocal = sessionmaker(
bind=async_engine, class_=AsyncSession, expire_on_commit=False
)
Redis setting
import os
import redis.asyncio as redis
from chat_backend.config import settings
client = redis.Redis(
host=settings.RD_ENDPOINT, port=settings.RD_PORT, decode_responses=True
)
from typing import Optional
from redis.asyncio import Redis
import json
from chat_backend.chat_nosql.client import client
class RedisService:
def __init__(self, client: Redis):
self.client = client
async def set_value(
self, key: str, value: str, expire: Optional[int] = None
) -> bool:
try:
value = json.dumps(value)
if expire:
await self.client.set(key, value, ex=expire)
else:
await self.client.set(key, value)
return True
except Exception as e:
print(f"Error setting value in Redis: {e}")
return False
async def get_value(self, key: str) -> str | None:
try:
result = await self.client.get(key)
if result:
return json.loads(result)
return None
except Exception as e:
print(f"Error getting value from Redis: {e}")
return None
async def remove_value(self, key: str) -> int:
try:
return await self.client.delete(key)
except Exception as e:
print(f"Error deleting value from Redis: {e}")
return 0
async def set_values_pipeline(self, values: list[tuple[str, str, int | None]]):
try:
pipe = self.client.pipeline()
for key, value, expire in values:
value = json.dumps(value)
if expire:
pipe.set(key, value, ex=expire)
else:
pipe.set(key, value)
await pipe.execute()
return True
except Exception as e:
print(f"Error setting values in Redis pipeline: {e}")
return False
def get_redis_service() -> RedisService:
return RedisService(client)
pyproject.toml
asyncio = "^3.4.3"
asyncpg = "^0.30.0"