langgraph / / 2024. 12. 1. 18:10

[langgraph] 도구 내에 이벤트를 스트리밍하는 방법 (LangChain을 사용하지 않고)

LangGraph 공식문서를 번역한 내용입니다. 필요한 경우 부연 설명을 추가하였고 이해하기 쉽게 예제를 일부 변경하였습니다. 문제가 되면 삭제하겠습니다.

https://langchain-ai.github.io/langgraph/how-tos/streaming-events-from-within-tools-without-langchain/

이 가이드에서는 LangChain의 채팅 모델이나 도구 호출 기능에 의존하지 않고, 커스텀 ReAct 에이전트에서 사용하는 도구로부터 토큰을 스트리밍하는 방법을 보여준다.

채팅 모델 상호작용은 OpenAI 클라이언트 라이브러리를 직접 사용하고, 도구 실행은 처음부터 구현할 것이다.

이 예시는 LangGraph가 LangChain의 내장 구성 요소인 채팅 모델이나 도구 없이 독립적으로 어떻게 활용될 수 있는지를 보여준다.

준비

우선, 필요한 패키지를 설치하자.

pip install langgraph openai

그래프 정의

OpenAI API를 호출하는 노드 정의

import asyncio

from dotenv import load_dotenv
from openai import AsyncOpenAI
from langchain_core.language_models.chat_models import ChatGenerationChunk
from langchain_core.messages import AIMessageChunk
from langchain_core.runnables.config import (
    ensure_config,
    get_callback_manager_for_config,
)

load_dotenv()

openai_client = AsyncOpenAI()

tool = {
    "type": "function",
    "function": {
        "name": "get_items",
        "description": "Use this tool to look up which items are in the given place.",
        "parameters": {
            "type": "object",
            "properties": {"place": {"type": "string"}},
            "required": ["place"],
        },
    },
}


async def call_model(state, config=None):
    config = ensure_config(config | {"tags": ["agent_llm"]})
    callback_manager = get_callback_manager_for_config(config)
    messages = state["messages"]

    llm_run_manager = callback_manager.on_chat_model_start({}, [messages])[0]
    response = await openai_client.chat.completions.create(
        messages=messages, model="gpt-3.5-turbo", tools=[tool], stream=True
    )

    response_content = ""
    role = None

    tool_call_id = None
    tool_call_function_name = None
    tool_call_function_arguments = ""
    async for chunk in response:
        delta = chunk.choices[0].delta
        if delta.role is not None:
            role = delta.role

        if delta.content:
            response_content += delta.content
            llm_run_manager.on_llm_new_token(delta.content)

        if delta.tool_calls:
            if delta.tool_calls[0].function.name is not None:
                tool_call_function_name = delta.tool_calls[0].function.name
                tool_call_id = delta.tool_calls[0].id

            tool_call_chunk = ChatGenerationChunk(
                message=AIMessageChunk(
                    content="",
                    additional_kwargs={"tool_calls": [delta.tool_calls[0].dict()]},
                )
            )
            llm_run_manager.on_llm_new_token("", chunk=tool_call_chunk)
            tool_call_function_arguments += delta.tool_calls[0].function.arguments

    if tool_call_function_name is not None:
        tool_calls = [
            {
                "id": tool_call_id,
                "function": {
                    "name": tool_call_function_name,
                    "arguments": tool_call_function_arguments,
                },
                "type": "function",
            }
        ]
    else:
        tool_calls = None

    response_message = {
        "role": role,
        "content": response_content,
        "tool_calls": tool_calls,
    }
    return {"messages": [response_message]}

도구와 도구 호출 노드 정의

import json
from langchain_core.callbacks import adispatch_custom_event


async def get_items(place: str) -> str:
    """Use this tool to look up which items are in the given place."""

    def stream(place: str):
        if "bed" in place:  
            yield from ["socks", "shoes", "dust bunnies"]
        elif "shelf" in place:  
            yield from ["books", "penciles", "pictures"]
        else:  
            yield "cat snacks"


    tokens = []
    for token in stream(place):
        await adispatch_custom_event(
            "tool_call_token_stream",
            {
                "function_name": "get_items",
                "arguments": {"place": place},
                "tool_output_token": token,
            },
            config={"tags": ["tool_call"]},
        )
        tokens.append(token)

    return ", ".join(tokens)


function_name_to_function = {"get_items": get_items}


async def call_tools(state):
    messages = state["messages"]

    tool_call = messages[-1]["tool_calls"][0]
    function_name = tool_call["function"]["name"]
    function_arguments = tool_call["function"]["arguments"]
    arguments = json.loads(function_arguments)

    function_response = await function_name_to_function[function_name](**arguments)
    tool_message = {
        "tool_call_id": tool_call["id"],
        "role": "tool",
        "name": function_name,
        "content": function_response,
    }
    return {"messages": [tool_message]}

그래프 정의

import operator
from typing import Annotated, Literal
from typing_extensions import TypedDict

from langgraph.graph import StateGraph, END, START


class State(TypedDict):
    messages: Annotated[list, operator.add]


def should_continue(state) -> Literal["tools", END]:
    messages = state["messages"]
    last_message = messages[-1]
    if last_message["tool_calls"]:
        return "tools"
    return END


workflow = StateGraph(State)
workflow.add_edge(START, "model")
workflow.add_node("model", call_model)  # i.e. our "agent"
workflow.add_node("tools", call_tools)
workflow.add_conditional_edges("model", should_continue)
workflow.add_edge("tools", "model")
graph = workflow.compile()

도구 내에서 token 스트림

여기에서는 astream_events API를 사용하여 개별 이벤트를 스트리밍하는 방법을 보여준다. 자세한 내용은 astream_events를 참조하라.

async def stream_content():
    async for event in graph.astream_events(
        {"messages": [{"role": "user", "content": "what's in the bedroom"}]},
        version="v2",
    ):
        tags = event.get("tags", [])
        if event["event"] == "on_custom_event" and "tool_call" in tags:
            print("Tool token", event["data"]["tool_output_token"])


asyncio.run(stream_content())
Tool token socks
Tool token shoes
Tool token dust bunnies
반응형
  • 네이버 블로그 공유
  • 네이버 밴드 공유
  • 페이스북 공유
  • 카카오스토리 공유