앞서 단일 케이스에 대해 계산하는 API 에 대해 성능을 올렸지만, 최적 케이스를 찾는 API를 최적화하는 것이 최종적인 목표였고, 이 API 에서는 각각의 케이스를 여러 개의 스레드로 나누어 병렬 처리를 하고 있었다. 그런데 단일 케이스(단일 스레드로 동작)를 최적화하는 과정에서 세션 관리 방식을 변경하였는데, 이로 인해 다중 스레드 환경에서 문제가 발생하였다.
기존 최적 케이스 API 소요 시간 61683ms
asyncio.run 을 통한 비동기 처리에서 세션을 같이 사용하는 방식은 문제가 발생함
Instances of
Session
andAsyncSession
are mutable, stateful objects with no built-in synchronization of method calls, and represent a single, ongoing database transaction upon a single database connection at a time for a particularEngine
orAsyncEngine
to which the object is bound (note that these objects both support being bound to multiple engines at once, however in this case there will still be only one connection per engine in play within the scope of a transaction). A single database transaction is not an appropriate target for concurrent SQL commands; instead, an application that runs concurrent database operations should use concurrent transactions. For these objects then it follows that the appropriate pattern isSession
per thread, orAsyncSession
per task.
-> AsyncSession 을 테스크별로 사용해야한다고 한다. 비동기 세션도 결국 코루틴으로 다른 I/O wait 시간 동안 다른 스레드 처리를 할 수 있도록 CPU 점유율을 높이기 위한 것이지, 다중 스레드가 동일한 세션을 사용한다고 잘못 이해하고 있었다.
기존 방식에서는 매번 세션을 새로 생성해 쿼리를 실행시키므로 비동기 방식에서 문제가 없었지만, 세션을 공유해 identity map 을 사용하거나 read only 트랜잭션에서는 같은 세션을 사용하게 하는 등의 처리에서 문제가 생긴다.
기존에 request 단위 세션을 사용하기 위해 async_scoped_session 을 사용중이었다. sqlalchemy 공식문서를 참고하면, scoped_session 은 쓰레드 로컬을 사용하고, async_scoped_session 은 task 로컬을 사용한다고 한다.
쓰레드 로컬은 쓰레드만의 공간을 사용한다는 의미이다. task 로컬 역시 asyncio 의 비동기 처리 단위인 task 단위로 공간을 사용한다는 의미이다. fastapi 에서도 하나의 request 를 하나의 task 로 처리하기 때문에 기존 방식을 잘 사용할 수 있었던 것이다.
따라서 위 문제를 해결하기 위해서는 task 별 세션을 관리해주어야 한다.
async def process_asyncio(
semaphore: asyncio.Semaphore,
func: Callable[..., Any],
*params: Tuple[Any, ...]
) -> List[Any]:
async def wrapped_func(*args, **kwargs) -> Any:
async with semaphore:
session_id = str(uuid4())
context = set_session_context(session_id=session_id)
try:
return await func(*args, **kwargs)
finally:
await session.remove()
reset_session_context(context=context)
return await wrapped_func(*params)
함수를 wrapping 하여 앞뒤로 세션 컨텍스트를 관리한다.async_scoped_session 의 scope 로 해당 세션 컨텍스트를 사용하므로 컨텍스트만 관리해주면 scope 를 조절할 수 있다.
semaphore 를 받아 동시에 실행될 task 수를 조절한다. 설정하지 않을 경우 task 수만큼 세션이 생성되게 되어 최대 커넥션 수를 초과해 에러가 발생하게 된다.
기존 asyncio.gather 함수를 사용하는 부분을 아래와 같이 수정한다.
semaphore = asyncio.Semaphore(30)
tasks = [process_asyncio(semaphore, inner_case_process, selling_order) for selling_order in selling_orders]
return await asyncio.gather(*tasks)
최적화 후 api 소요 시간 35187ms
단일 케이스가 1/10 가량 개선된 것에 비해 많이 최적화 되지는 않는다. 이미 비동기/병렬 처리를 통해 I/O time 을 어느 정도 줄여주고 있었기 때문에 단순히 연산이 너무 많아 오래걸리는 듯 하다.