Langchain과 FastAPI를 사용하여 OpenAI의 GPT 모델을 동기 및 비동기로 호출하는 다양한 방법을 알아보자. 또한, 스트리밍 방식으로 응답을 받아오는 방법도 포함하여 각기 다른 호출 방식을 구현하는 방법을 알아보자.
1. 프로젝트 설정 및 환경 준비
우선, Langchain과 OpenAI API를 활용하려면 필요한 패키지를 설치하고, .env
파일을 로드하여 OpenAI API 키를 환경 변수로 설정해야 한다. 이를 위해 dotenv
패키지를 사용한다.
패키지 설치
pip install fastapi, uvicorn, langchain, langchain-openai, python-dotenv
.env
파일에는 다음과 같은 내용이 포함된다.
OPENAI_API_KEY=your_openai_api_key
이후 FastAPI 애플리케이션을 시작하고, Langchain의 ChatOpenAI
모델과 PromptTemplate을 사용하여 모델을 설정한다.
uvicorn 시작
uvicorn main:app --reload
기본 소스
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from fastapi import FastAPI
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
llm = ChatOpenAI(
temperature=0,
model_name="gpt-4o-mini"
)
template = "아래 질문에 대한 답변을 해주세요. \n{query}"
prompt = PromptTemplate.from_template(template=template)
chain = prompt | llm | StrOutputParser()
2. 동기적 호출: invoke
invoke
는 Langchain에서 동기적으로 OpenAI 모델을 호출하는 방식이다. 이 방식은 요청이 끝날 때까지 서버가 대기하며, 호출 후 결과를 반환한다.
@app.get("/sync/chat")
def sync_chat(query: str = Query(None, min_length=3, max_length=50)):
response = chain.invoke({"query": query})
return response
chain.invoke
메서드를 사용해 Langchain 모델을 호출하고, 해당 결과를 반환한다.
호출
GET http://localhost:8000/sync/chat?query=한국의 수도는?
응답
"한국의 수도는 서울입니다."
3. 비동기 호출: ainvoke
FastAPI의 비동기 지원을 활용하여 비동기적으로 OpenAI 모델을 호출할 수 있다. 이를 위해 Langchain의 ainvoke
메서드를 사용한다.
@app.get("/async/chat")
async def async_chat(query: str = Query(None, min_length=3, max_length=50)):
response = await chain.ainvoke({"query": query})
return response
이 방식은 비동기로 처리되어 응답이 준비될 때까지 서버의 다른 작업이 차단되지 않는다.
호출
GET http://localhost:8000/async/chat?query=한국의 수도는?
응답
"한국의 수도는 서울입니다."
4. 스트리밍 방식의 동기적 호출: stream
스트리밍 방식의 호출은 서버가 응답을 한 번에 반환하는 대신, 데이터를 여러 조각으로 나누어 클라이언트로 전송한다. 이를 통해 클라이언트는 실시간으로 응답을 받아볼 수 있다.
@app.get("/streaming_sync/chat")
def streaming_sync_chat(query: str = Query(None, min_length=3, max_length=50)):
def event_stream():
try:
for chunk in chain.stream({"query": query}):
if len(chunk) > 0:
yield f"data: {chunk}\n\n"
except Exception as e:
yield f"data: {str(e)}\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
chain.stream
메서드를 통해 데이터를 스트리밍하고, StreamingResponse
를 사용해 FastAPI에서 이를 text/event-stream
형식으로 클라이언트에 전달한다. 클라이언트는 스트림이 끝날 때까지 실시간으로 데이터를 수신한다.
호출
GET http://localhost:8000/streaming_sync/chat?query=한국의 수도는?
응답
data: 한국
data: 의
data: 수도
data: 는
data: 서울
data: 입니다
data: .
5. 스트리밍 방식의 비동기 호출: astream
마지막으로, 비동기 방식으로 스트리밍을 구현할 수 있다. 이는 대규모 트래픽을 처리할 때 매우 유용하다. Langchain의 astream
메서드를 통해 비동기 스트리밍을 구현한다.
@app.get("/streaming_async/chat")
async def streaming_async(query: str = Query(None, min_length=3, max_length=50)):
async def event_stream():
try:
async for chunk in chain.astream({"query": query}):
if len(chunk) > 0:
yield f"data: {chunk}\n\n"
except Exception as e:
yield f"data: {str(e)}\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
async for
문을 통해 비동기적으로 데이터를 스트리밍하고, StreamingResponse
로 클라이언트에게 실시간 데이터를 전송한다.
호출
GET http://localhost:8000/streaming_async/chat?query=한국의 수도는?
응답
data: 한국
data: 의
data: 수도
data: 는
data: 서울
data: 입니다
data: .
결론
이 글에서는 Langchain과 FastAPI를 사용하여 OpenAI 모델을 호출하는 네 가지 방식을 살펴보았다.
- 동기적 호출 (
invoke
) - 비동기적 호출 (
ainvoke
) - 동기적 스트리밍 (
stream
) - 비동기적 스트리밍 (
astream
)
위의 각 방식은 API 서버의 요구 사항에 맞춰 선택할 수 있으며, 특히 스트리밍 방식은 실시간 처리가 필요한 경우 유용하게 사용할 수 있다. FastAPI와 Langchain의 조합은 빠르고 유연한 AI 기반 서비스를 제공하기에 적합한 도구이다.