langchain / / 2024. 9. 19. 07:28

[langchain] fastapi로 openai dummy 서버 만들기

FastAPI를 사용하여 OpenAI API와 동일한 형태로 응답을 하는 Dummy 서버를 구축하는 방법을 알아보자. 주로 /v1/chat/completions 엔드포인트를 구현하며, 스트리밍 응답을 포함한 두 가지 방식으로 클라이언트의 요청에 응답하는 방법을 설명한다.

Dummy 서버는 성능테스트를 진행하거나, 응답내용은 중요하지 않고 반복적인 api 호출을 테스트 하는 경우에 필요하다.

Dummy서버의 응답은 스트리밍을 사용하는 경우와 그렇지 않은 경우가 있는데 스트리밍을 사용하는 방식과 사용하지 않는 방식의 응답형식이 조금 다르다. 그래서 적절한 응답의 형태로 다르게 전송해야 한다.

작업하는 순서는 아래와 같다.

  1. dummy server의 Fastapi 생성
    • openai와 같은 응답 형식을 반환하는 모델 생성 (동기식/스트리밍 방식)
    • API 생성 (/v1/chat/completions)
    • 스트리밍 방식에 대한 응답 형식 지정
  2. API 서버 생성 (dummy server를 호출하는 용도)
    1. 동기식 호출 (/chat)
    2. 스트리밍 호출 (/streaming/chat)

1. dummy server FastAPI 설정 및 서버 구조

모델 클래스 정의

openai에서 입력/출력에 필요한 모델 클래스를 구현하자. 해당 내용은 openai api 문서를 보면 자세히 나와있다.
https://platform.openai.com/docs/guides/chat-completions/getting-started

기본 응답 형식

{
  "id": "chatcmpl-123",
  "object": "chat.completion",
  "created": 1677652288,
  "model": "gpt-4o-mini",
  "system_fingerprint": "fp_44709d6fcb",
  "choices": [{
    "index": 0,
    "message": {
      "role": "assistant",
      "content": "\n\nHello there, how may I assist you today?",
    },
    "logprobs": null,
    "finish_reason": "stop"
  }],
  "usage": {
    "prompt_tokens": 9,
    "completion_tokens": 12,
    "total_tokens": 21
  }
}

스트리밍 응답 형식

{
  "id": "chatcmpl-123",
  "object": "chat.completion.chunk",
  "created": 1694268190,
  "model": "gpt-4o-mini",
  "system_fingerprint": "fp_44709d6fcb",
  "choices": [
    {
      "index": 0,
      "delta": {
        "role": "assistant",
        "content": "응답내용"
      },
      "logprobs": null,
      "finish_reason": null
    }
  ]
}

스트리밍은 다음과 같이 문자 단위로 여러 건이 한번에 내려온다.

{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"role":"assistant","content":""},"logprobs":null,"finish_reason":null}]}

{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]}

....

{"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gpt-4o-mini", "system_fingerprint": "fp_44709d6fcb", "choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]}

이를 기반으로 모델 클래스를 만들자.

[openai_model.py]

import time
from typing import List, Union, Optional

from pydantic import BaseModel


class ChatCompletionRequestMessage(BaseModel):
    content: str
    role: str
    name: str = None


class CreateChatCompletionRequest(BaseModel):
    messages: List[ChatCompletionRequestMessage]
    model: Union[str, List[str]] = "gpt-4o"
    temperature: Optional[float] = 1
    stream: Optional[bool] = False


class MessageChoice(BaseModel):
    message: dict
    finish_reason: str = "stop"
    index: int = 0


class DeltaChoice(BaseModel):
    delta: dict
    finish_reason: Optional[str] = None
    index: int = 0


class CompletionUsage(BaseModel):
    completion_tokens: int
    prompt_tokens: int
    total_tokens: int


class CreateChatCompletionResponse(BaseModel):
    id: str
    object: str
    created: int
    model: str
    choices: List[MessageChoice]
    usage: Optional[CompletionUsage]

    @staticmethod
    def sample(content: str):
        return CreateChatCompletionResponse(
            id="chatcmpl-abc123",
            object="chat.completion",
            created=int(time.time()),
            model="gpt-4o",
            choices=[
                MessageChoice(
                    message={"role": "assistant", "content": content},
                    finish_reason="stop",
                    index=0,
                )
            ],
            usage=CompletionUsage(
                prompt_tokens=10, completion_tokens=10, total_tokens=20
            ),
        )


class CreateChatCompletionStreamingResponse(BaseModel):
    id: str
    object: str
    created: int
    model: str
    choices: List[DeltaChoice]

    @staticmethod
    def sample(content: str, finish_reason: Optional[str] = None):
        return CreateChatCompletionStreamingResponse(
            id="chatcmpl-abc123",
            object="chat.completion",
            created=int(time.time()),
            model="gpt-4o",
            choices=[
                DeltaChoice(
                    delta={"content": content},
                    finish_reason=finish_reason,
                    index=0,
                )
            ],
        )

입력을 받을 때는 CreateChatCompletionRequest를 사용하고 응답값을 출력할 때는 CreateChatCompletionResponse를 사용한다. 그리고 응답내용은 dummy로 응답하도록 sample 함수를 정의하고 해당 내용을 샘플로 리턴한다.

스트리밍 응답은 choices 내에 delta로 메시지 내용을 전송하므로 별도의 클래스로 구분하여 리턴한다. (CreateChatCompletionStreamingResponse)

fastapi 서버 구현 (dummy.py)

import json
import os
import sys

from starlette.responses import StreamingResponse

from fastapi import FastAPI

# 필요한 경우 아래 주석 해제
# sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))

from openai_dummy.openai_model import (
    CreateChatCompletionRequest,
    CreateChatCompletionResponse,
)

app = FastAPI()

def event_stream(completion: str):
    try:
        for i, token in enumerate(completion):
            if i == len(completion) - 1:
                finish_reason = "stop"
            else:
                finish_reason = None
            sample = CreateChatCompletionStreamingResponse.sample(
                content=token, finish_reason=finish_reason
            )
            yield f"data: {sample.model_dump_json()}\n\n"
        yield "data: [DONE]\n\n"
    except Exception as e:
        yield f"error occurred\n\n"
        yield f"data: {str(e)}\n\n"

def send_response(request_body: CreateChatCompletionRequest):
    prompt = request_body.messages[0].content
    completion = f"{prompt}의 응답 내용"
    print("request_body.stream", request_body.stream)
    if request_body.stream:  # streaming을 사용할 경우
        return StreamingResponse(
            event_stream(completion),
            media_type="application/x-ndjson",
        )

    else:  # streaming이 아닌 경우
        return CreateChatCompletionResponse.sample(completion)

@app.post("/v1/chat/completions")
def create_chat_completion(request_body: CreateChatCompletionRequest):
    return send_response(request_body)

위 코드에서는 FastAPI 앱을 설정하고, 필요한 모델들을 임포트한다.

2. dummy server 응답 처리 설명

dummy server의 응답은 스트리밍을 사용하는 경우와 아닌 경우로 나눌 수 있다. 이는 클라이언트에서 요청 시 stream=True/False 요청에 따라 달라진다.

2.1 스트리밍 응답 처리

OpenAI API의 중요한 특징 중 하나는 스트리밍 응답이다. 즉, 서버가 한번에 응답을 전송하는 대신, 데이터를 지속적으로 보내는 방식이다. 이를 구현하기 위해 StreamingResponse를 사용한다.

event_stream 함수: 이 함수는 클라이언트에게 스트리밍 방식으로 데이터를 전송한다.

def event_stream(completion: str):
    try:
        for i, token in enumerate(completion):
            if i == len(completion) - 1:
                finish_reason = "stop"
            else:
                finish_reason = None
            sample = CreateChatCompletionStreamingResponse.sample(
                content=token, finish_reason=finish_reason
            )
            yield f"data: {sample.model_dump_json()}\n\n"
        yield "data: [DONE]\n\n"
    except Exception as e:
        yield f"error occurred\n\n"
        yield f"data: {str(e)}\n\n"
  • event_stream 함수: 이 함수는 텍스트 completion을 받아 한 글자씩 스트리밍하며 클라이언트로 전송한다.
  • yield: Python의 yield는 데이터를 한 번에 반환하는 것이 아니라, 필요할 때마다 하나씩 반환하는 역할을 한다. 이 덕분에 스트리밍 응답이 가능한다.

2.2 요청 처리 및 응답 반환

/v1/chat/completions 경로로 들어온 POST 요청을 처리하는 로직이다. 스트리밍을 사용하는지 여부에 따라 응답 방식을 다르게 처리한다.

def send_response(request_body: CreateChatCompletionRequest):
    prompt = request_body.messages[0].content
    completion = f"{prompt}의 응답 내용"
    if request_body.stream:  # streaming을 사용할 경우
        return StreamingResponse(
            event_stream(completion),
            media_type="application/x-ndjson",
        )

    else:  # streaming이 아닌 경우
        return CreateChatCompletionResponse.sample(content=completion)
  • 스트리밍 모드: 클라이언트가 stream=True로 요청을 보낸 경우, StreamingResponse로 처리하여 스트리밍 응답을 반환한다.
  • 일반 응답 모드: 스트리밍을 사용하지 않는 경우, 단일 JSON 응답을 반환한다.

2.3 dummy 서버 실행

아래 명령어로 7777포트로 실행을 해보자.

uvicorn dummy:app --reload --port 7777

서버가 정상적으로 기동되는 것을 확인할 수 있다.

일반 호출

정상적으로 호출이 되는지 확인해보자. 아래 http를 호출해보자.

POST http://localhost:7777/v1/chat/completions
content-Type: application/json

{
  "model": "gpt-4o",
  "messages": [{"role": "user", "content": "Hi"}],
  "temperature": 0.7
}

응답

{
  "id": "chatcmpl-abc123",
  "object": "chat.completion",
  "created": 1726405605.270324,
  "model": "gpt-4o",
  "choices": [
    {
      "message": {
        "role": "assistant",
        "content": "Hi의 응답 내용"
      },
      "finish_reason": null,
      "index": 0
    }
  ],
  "usage": {
    "prompt_tokens": 10,
    "completion_tokens": 10,
    "total_tokens": 20
  }
}

위와 같이 응답이 오는 것을 확인할 수 있다.

스트리밍 호출

이번에는 스트리밍으로 호출해보자. 아래에 stream: true로 호출한다.

POST http://localhost:7777/v1/chat/completions
content-Type: application/json

{
  "model": "gpt-4o",
  "messages": [{"role": "user", "content": "Hi"}],
  "temperature": 0.7,
  "stream": true
}

응답

data: {"id": "chatcmpl-abc123", "object": "chat.completion", "created": 1726405647.3994, "model": "gpt-4o", "choices": [{"delta": {"content": "H"}, "finish_reason": null, "index": 0}]}

data: {"id": "chatcmpl-abc123", "object": "chat.completion", "created": 1726405647.3997362, "model": "gpt-4o", "choices": [{"delta": {"content": "i"}, "finish_reason": null, "index": 0}]}

data: {"id": "chatcmpl-abc123", "object": "chat.completion", "created": 1726405647.4001842, "model": "gpt-4o", "choices": [{"delta": {"content": "\uc758"}, "finish_reason": null, "index": 0}]}

data: {"id": "chatcmpl-abc123", "object": "chat.completion", "created": 1726405647.400488, "model": "gpt-4o", "choices": [{"delta": {"content": " "}, "finish_reason": null, "index": 0}]}

data: {"id": "chatcmpl-abc123", "object": "chat.completion", "created": 1726405647.4007092, "model": "gpt-4o", "choices": [{"delta": {"content": "\uc751"}, "finish_reason": null, "index": 0}]}

data: {"id": "chatcmpl-abc123", "object": "chat.completion", "created": 1726405647.400925, "model": "gpt-4o", "choices": [{"delta": {"content": "\ub2f5"}, "finish_reason": null, "index": 0}]}

data: {"id": "chatcmpl-abc123", "object": "chat.completion", "created": 1726405647.401145, "model": "gpt-4o", "choices": [{"delta": {"content": " "}, "finish_reason": null, "index": 0}]}

data: {"id": "chatcmpl-abc123", "object": "chat.completion", "created": 1726405647.401341, "model": "gpt-4o", "choices": [{"delta": {"content": "\ub0b4"}, "finish_reason": null, "index": 0}]}

data: {"id": "chatcmpl-abc123", "object": "chat.completion", "created": 1726405647.4015481, "model": "gpt-4o", "choices": [{"delta": {"content": "\uc6a9"}, "finish_reason": "stop", "index": 0}]}

data: [DONE]

위와 같이 응답이 전송되는 것을 확인할 수 있다.

3. 호출 서버의 동기 및 스트리밍 처리 예제

이제 위에서 만든 dummy server를 호출할 서버도 필요하다. 이 서버는 특정 프롬프트에 기반해 OpenAI API에 요청을 보내고, 그 응답을 반환한다. 모델로는 ChatOpenAI를 사용하며, gpt-4o-mini라는 모델을 설정한다.

from starlette.responses import StreamingResponse
from fastapi import FastAPI, Query
from dotenv import load_dotenv
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.output_parsers import StrOutputParser
from langchain.llms import ChatOpenAI

# 환경 변수 로드
load_dotenv()

# FastAPI 앱 생성
app = FastAPI()

# 모델 설정
llm = ChatOpenAI(
    temperature=0, model_name="gpt-4o-mini", base_url="http://localhost:7777/v1"
)

# 프롬프트 템플릿 설정
template = "{query}"
prompt = PromptTemplate.from_template(template=template)

@app.get("/chat")
def sync_chat(query: str):
    chain = prompt | llm | StrOutputParser()
    return chain.invoke({"query": query})

@app.get("/streaming/chat")
def streaming_sync_chat(query: str):
    chain = prompt | llm | StrOutputParser()

    def event_stream():
        try:
            for chunk in chain.stream({"query": query}):
                yield f"data: {chunk}\n\n"
        except Exception as e:
            yield f"data: error : {str(e)}\n\n"

    return StreamingResponse(event_stream(), media_type="text/event-stream")

기본적인 openai 호출 방식에서 다른점은 위에서 만든 dummy api의 url을 지정하는 곳이다. (base_url="http://localhost:7777/v1")

base_url을 지정하여 dummy server를 호출하도록 한다.

서버를 기동해보자. (8000번 기본 포트로 실행)

uvicorn main:app --reload --port 8000

호출방식은 동기식, 스트리밍 방식 두 가지를 구현해 볼 것이다.

동기식 호출

@app.get("/chat")
def sync_chat(query: str):
    chain = prompt | llm
    return chain.invoke({"query": query})

이 엔드포인트는 입력된 쿼리에 대한 응답을 한 번에 반환합니다.

요청

GET http://localhost:8000/chat?query=한국의 수도는?

응답

"한국의 수도는?의 응답 내용"

스트리밍 호출

@app.get("/streaming/chat")
def streaming_chat(query: str):
    chain = prompt | llm | StrOutputParser()

    def event_stream():
        try:
            for chunk in chain.stream({"query": query}):
                if len(chunk) > 0:
                    yield f"data: {chunk}\n\n"
        except Exception as e:
            yield f"data: error : {str(e)}\n\n"

    return StreamingResponse(event_stream(), media_type="text/event-stream")

이 엔드포인트는 스트리밍 방식으로 응답을 전송한다. 클라이언트는 실시간으로 데이터를 받을 수 있다.

요청

GET http://localhost:8000/streaming/chat?query=한국의 수도는?

응답

data: 한

data: 국

data: 의

data:  

data: 수

data: 도

data: 는

data: ?

data: 의

data:  

data: 응

data: 답

data:  

data: 내

data: 용

참고

반응형
  • 네이버 블로그 공유
  • 네이버 밴드 공유
  • 페이스북 공유
  • 카카오스토리 공유