langchain / / 2024. 9. 10. 19:40

[langchain] Langchain과 FastAPI를 사용하여 OpenAI 모델 호출하기: `invoke`, `ainvoke`, `stream`, `astream`

LangchainFastAPI를 사용하여 OpenAI의 GPT 모델을 동기 및 비동기로 호출하는 다양한 방법을 알아보자. 또한, 스트리밍 방식으로 응답을 받아오는 방법도 포함하여 각기 다른 호출 방식을 구현하는 방법을 알아보자.

1. 프로젝트 설정 및 환경 준비

우선, Langchain과 OpenAI API를 활용하려면 필요한 패키지를 설치하고, .env 파일을 로드하여 OpenAI API 키를 환경 변수로 설정해야 한다. 이를 위해 dotenv 패키지를 사용한다.

패키지 설치

pip install fastapi, uvicorn, langchain, langchain-openai, python-dotenv

.env 파일에는 다음과 같은 내용이 포함된다.

OPENAI_API_KEY=your_openai_api_key

이후 FastAPI 애플리케이션을 시작하고, Langchain의 ChatOpenAI 모델과 PromptTemplate을 사용하여 모델을 설정한다.

uvicorn 시작

uvicorn main:app --reload

기본 소스

from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI
from fastapi import FastAPI
from dotenv import load_dotenv

load_dotenv()
app = FastAPI()

llm = ChatOpenAI(
    temperature=0, 
    model_name="gpt-4o-mini"
)

template = "아래 질문에 대한 답변을 해주세요. \n{query}"
prompt = PromptTemplate.from_template(template=template)
chain = prompt | llm | StrOutputParser()

2. 동기적 호출: invoke

invoke는 Langchain에서 동기적으로 OpenAI 모델을 호출하는 방식이다. 이 방식은 요청이 끝날 때까지 서버가 대기하며, 호출 후 결과를 반환한다.

@app.get("/sync/chat")
def sync_chat(query: str = Query(None, min_length=3, max_length=50)):
    response = chain.invoke({"query": query})
    return response

chain.invoke 메서드를 사용해 Langchain 모델을 호출하고, 해당 결과를 반환한다.

호출

GET http://localhost:8000/sync/chat?query=한국의 수도는?

응답

"한국의 수도는 서울입니다."

3. 비동기 호출: ainvoke

FastAPI의 비동기 지원을 활용하여 비동기적으로 OpenAI 모델을 호출할 수 있다. 이를 위해 Langchain의 ainvoke 메서드를 사용한다.

@app.get("/async/chat")
async def async_chat(query: str = Query(None, min_length=3, max_length=50)):
    response = await chain.ainvoke({"query": query})
    return response

이 방식은 비동기로 처리되어 응답이 준비될 때까지 서버의 다른 작업이 차단되지 않는다.

호출

GET http://localhost:8000/async/chat?query=한국의 수도는?

응답

"한국의 수도는 서울입니다."

4. 스트리밍 방식의 동기적 호출: stream

스트리밍 방식의 호출은 서버가 응답을 한 번에 반환하는 대신, 데이터를 여러 조각으로 나누어 클라이언트로 전송한다. 이를 통해 클라이언트는 실시간으로 응답을 받아볼 수 있다.

@app.get("/streaming_sync/chat")
def streaming_sync_chat(query: str = Query(None, min_length=3, max_length=50)):
    def event_stream():
        try:
            for chunk in chain.stream({"query": query}):
                if len(chunk) > 0:
                    yield f"data: {chunk}\n\n"
        except Exception as e:
            yield f"data: {str(e)}\n\n"

    return StreamingResponse(event_stream(), media_type="text/event-stream")

chain.stream 메서드를 통해 데이터를 스트리밍하고, StreamingResponse를 사용해 FastAPI에서 이를 text/event-stream 형식으로 클라이언트에 전달한다. 클라이언트는 스트림이 끝날 때까지 실시간으로 데이터를 수신한다.

호출

GET http://localhost:8000/streaming_sync/chat?query=한국의 수도는?

응답

data: 한국

data: 의

data:  수도

data: 는

data:  서울

data: 입니다

data: .

5. 스트리밍 방식의 비동기 호출: astream

마지막으로, 비동기 방식으로 스트리밍을 구현할 수 있다. 이는 대규모 트래픽을 처리할 때 매우 유용하다. Langchain의 astream 메서드를 통해 비동기 스트리밍을 구현한다.

@app.get("/streaming_async/chat")
async def streaming_async(query: str = Query(None, min_length=3, max_length=50)):
    async def event_stream():
        try:
            async for chunk in chain.astream({"query": query}):
                if len(chunk) > 0:
                    yield f"data: {chunk}\n\n"
        except Exception as e:
            yield f"data: {str(e)}\n\n"

    return StreamingResponse(event_stream(), media_type="text/event-stream")

async for 문을 통해 비동기적으로 데이터를 스트리밍하고, StreamingResponse로 클라이언트에게 실시간 데이터를 전송한다.

호출

GET http://localhost:8000/streaming_async/chat?query=한국의 수도는?

응답

data: 한국

data: 의

data:  수도

data: 는

data:  서울

data: 입니다

data: .

결론

이 글에서는 LangchainFastAPI를 사용하여 OpenAI 모델을 호출하는 네 가지 방식을 살펴보았다.

  1. 동기적 호출 (invoke)
  2. 비동기적 호출 (ainvoke)
  3. 동기적 스트리밍 (stream)
  4. 비동기적 스트리밍 (astream)

위의 각 방식은 API 서버의 요구 사항에 맞춰 선택할 수 있으며, 특히 스트리밍 방식은 실시간 처리가 필요한 경우 유용하게 사용할 수 있다. FastAPI와 Langchain의 조합은 빠르고 유연한 AI 기반 서비스를 제공하기에 적합한 도구이다.

반응형
  • 네이버 블로그 공유
  • 네이버 밴드 공유
  • 페이스북 공유
  • 카카오스토리 공유