langgraph / / 2024. 12. 1. 18:07

[langgraph] LLM 토큰을 스트리밍하는 방법 (LangChain을 사용하지 않고)

LangGraph 공식문서를 번역한 내용입니다. 필요한 경우 부연 설명을 추가하였고 이해하기 쉽게 예제를 일부 변경하였습니다. 문제가 되면 삭제하겠습니다.

https://langchain-ai.github.io/langgraph/how-tos/streaming-tokens-without-langchain/

이 예제에서는 에이전트를 구동하는 언어 모델에서 토큰을 스트리밍한다. LangChain의 채팅 모델을 사용하지 않고 OpenAI 클라이언트 라이브러리를 직접 사용할 것이다. 또한 예제로 ReAct 에이전트를 사용할 것이다.

준비

우선, 필요한 패키지를 설치하자.

pip install langgraph openai

모델, 도구, 그래프 정의

OpenAI API 호출하는 노드 정의

import asyncio

from dotenv import load_dotenv
from langchain_core.language_models.chat_models import ChatGenerationChunk
from langchain_core.runnables.config import (
    ensure_config,
    get_callback_manager_for_config,
)
from openai import AsyncOpenAI

load_dotenv()

openai_client = AsyncOpenAI()

tool = {
    "type": "function",
    "function": {
        "name": "get_items",
        "description": "Use this tool to look up which items are in the given place.",
        "parameters": {
            "type": "object",
            "properties": {"place": {"type": "string"}},
            "required": ["place"],
        },
    },
}


async def call_model(state, config=None):
    config = ensure_config(config | {"tags": ["agent_llm"]})
    callback_manager = get_callback_manager_for_config(config)
    messages = state["messages"]

    llm_run_manager = callback_manager.on_chat_model_start({}, [messages])[0]
    response = await openai_client.chat.completions.create(
        messages=messages, model="gpt-4o-mini", tools=[tool], stream=True
    )

    response_content = ""
    role = None

    tool_call_id = None
    tool_call_function_name = None
    tool_call_function_arguments = ""
    async for chunk in response:
        delta = chunk.choices[0].delta
        if delta.role is not None:
            role = delta.role

        if delta.content:
            response_content += delta.content
            # stream_mode="messages"를 사용하여 이를 다시 스트리밍할 수 있도록 응답을 ChatGenerationChunk로 래핑한다.
            chunk = ChatGenerationChunk(
                message=AIMessageChunk(
                    content=delta.content,
                )
            )
            llm_run_manager.on_llm_new_token(delta.content, chunk=chunk)

        if delta.tool_calls:
            # 단순화하기 위해 여기서 하나의 도구 호출만 처리한다.
            if delta.tool_calls[0].function.name is not None:
                tool_call_function_name = delta.tool_calls[0].function.name
                tool_call_id = delta.tool_calls[0].id

            # stream_mode="messages"를 사용하여 이를 다시 스트리밍할 수 있도록 응답을 ChatGenerationChunk로 래핑한다.
            tool_call_chunk = ChatGenerationChunk(
                message=AIMessageChunk(
                    content="",
                    additional_kwargs={"tool_calls": [delta.tool_calls[0].dict()]},
                )
            )
            llm_run_manager.on_llm_new_token("", chunk=tool_call_chunk)
            tool_call_function_arguments += delta.tool_calls[0].function.arguments

    if tool_call_function_name is not None:
        tool_calls = [
            {
                "id": tool_call_id,
                "function": {
                    "name": tool_call_function_name,
                    "arguments": tool_call_function_arguments,
                },
                "type": "function",
            }
        ]
    else:
        tool_calls = None

    response_message = {
        "role": role,
        "content": response_content,
        "tool_calls": tool_calls,
    }
    return {"messages": [response_message]}

도구와 도구 호출하는 노드 정의

import json


async def get_items(place: str) -> str:
    """Use this tool to look up which items are in the given place."""
    if "침실" in place:
        return "양말, 신발, 먼지 덩어리"
    if "선반" in place:
        return "책, 연필, 그림"
    else:
        return "고양이 스낵"


function_name_to_function = {"get_items": get_items}


async def call_tools(state):
    messages = state["messages"]

    tool_call = messages[-1]["tool_calls"][0]
    function_name = tool_call["function"]["name"]
    function_arguments = tool_call["function"]["arguments"]
    arguments = json.loads(function_arguments)

    function_response = await function_name_to_function[function_name](**arguments)
    tool_message = {
        "tool_call_id": tool_call["id"],
        "role": "tool",
        "name": function_name,
        "content": function_response,
    }
    return {"messages": [tool_message]}

그래프 정의

import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict

from langgraph.graph import StateGraph, END, START


class State(TypedDict):
    messages: Annotated[list, operator.add]


def should_continue(state) -> Literal["tools", END]:
    messages = state["messages"]
    last_message = messages[-1]
    if last_message["tool_calls"]:
        return "tools"
    return END


workflow = StateGraph(State)
workflow.add_edge(START, "model")
workflow.add_node("model", call_model) 
workflow.add_node("tools", call_tools)
workflow.add_conditional_edges("model", should_continue)
workflow.add_edge("tools", "model")
graph = workflow.compile()

Stream tokens

from langchain_core.messages import AIMessageChunk


async def stream_async():
    first = True
    async for msg, metadata in graph.astream(
        {"messages": [{"role": "user", "content": "침실에 뭐가 있어?"}]},
        stream_mode="messages",
    ):
        if msg.content:
            print(msg.content, end="|", flush=True)

        if isinstance(msg, AIMessageChunk):
            if first:
                gathered = msg
                first = False
            else:
                gathered = gathered + msg

            if msg.tool_call_chunks:
                print(gathered.tool_calls)


asyncio.run(stream_async())
[{'name': 'get_items', 'args': {}, 'id': 'call_lL6Us4trL7DYSoleD5hR9Wgm', 'type': 'tool_call'}]
[{'name': 'get_items', 'args': {}, 'id': 'call_lL6Us4trL7DYSoleD5hR9Wgm', 'type': 'tool_call'}]
[{'name': 'get_items', 'args': {}, 'id': 'call_lL6Us4trL7DYSoleD5hR9Wgm', 'type': 'tool_call'}]
[{'name': 'get_items', 'args': {'place': ''}, 'id': 'call_lL6Us4trL7DYSoleD5hR9Wgm', 'type': 'tool_call'}]
[{'name': 'get_items', 'args': {'place': '침'}, 'id': 'call_lL6Us4trL7DYSoleD5hR9Wgm', 'type': 'tool_call'}]
[{'name': 'get_items', 'args': {'place': '침실'}, 'id': 'call_lL6Us4trL7DYSoleD5hR9Wgm', 'type': 'tool_call'}]
[{'name': 'get_items', 'args': {'place': '침실'}, 'id': 'call_lL6Us4trL7DYSoleD5hR9Wgm', 'type': 'tool_call'}]
침|실|에|는| 양|말|,| 신|발|,| 그|리|고| 먼|지| 덩|어|리|가| 있|습니다|.|

LangGraph 참고 자료

반응형
  • 네이버 블로그 공유
  • 네이버 밴드 공유
  • 페이스북 공유
  • 카카오스토리 공유