langgraph / / 2024. 12. 1. 18:07

[langgraph] 커스텀 데이터를 스트리밍하는 방법

LangGraph 공식문서를 번역한 내용입니다. 필요한 경우 부연 설명을 추가하였고 이해하기 쉽게 예제를 일부 변경하였습니다. 문제가 되면 삭제하겠습니다.

https://langchain-ai.github.io/langgraph/how-tos/streaming-content/

노드 내에서 스트리밍을 사용하는 가장 일반적인 용도는 LLM 토큰을 스트리밍하는 것이지만, 사용자 정의 데이터를 스트리밍하는 경우도 있다.

예를 들어, 긴 실행 시간을 가진 도구 호출이 있을 경우, 각 단계 사이에서 사용자 정의 이벤트를 발송하여 진행 상황을 모니터링할 수 있다. 이러한 사용자 정의 이벤트를 애플리케이션의 최종 사용자에게 표시하여 현재 작업이 어떻게 진행되고 있는지 보여줄 수도 있다.

두 가지 방법으로 이를 구현할 수 있다.

  • graph.stream / .astream 메서드에서 stream_mode="custom"을 사용
  • adispatch_custom_events를 사용하여 사용자 정의 이벤트 발송

아래에서 두 가지 API를 사용하는 방법을 살펴보자.

준비

우선, 필요한 패키지를 설치하자.

pip install langgraph

.stream / .astream을 사용한 커스텀 데이터 스트림

그래프 정의

import asyncio

from langchain_core.messages import AIMessage
from langgraph.graph import START, StateGraph, MessagesState, END
from langgraph.types import StreamWriter


async def my_node(
    state: MessagesState,
    writer: StreamWriter,  # <-- chunk가 스트리밍되도록 StreamWriter를 제공한다.
):
    chunks = [
       "87년",
        "전",
        ",",
        "우리",
        "의",
        "선조",
        "들",
        "께서",
        "...",
    ]
    for chunk in chunks:
        # stream_mode=custom을 사용하여 스트리밍될 chunk를 작성한다.
        writer(chunk)

    return {"messages": [AIMessage(content=" ".join(chunks))]}


workflow = StateGraph(MessagesState)

workflow.add_node("model", my_node)
workflow.add_edge(START, "model")
workflow.add_edge("model", END)

app = workflow.compile()

content 스트리밍

inputs = [HumanMessage(content="무슨 생각 하고 있어?")]


async def stream_content():
    async for chunk in app.astream({"messages": inputs}, stream_mode="custom"):
        print(chunk, flush=True)


asyncio.run(stream_content())
87년
전
,
우리의
선조
들
께서
...

아마도 사용자 정의 데이터와 상태 업데이트 모두에 접근하려면 여러 스트리밍 모드를 사용해야 할 것이다.

from langchain_core.messages import HumanMessage

inputs = [HumanMessage(content="What are you thinking about?")]
async for chunk in app.astream({"messages": inputs}, stream_mode=["custom", "updates"]):
    print(chunk, flush=True)
('custom', '87년')
('custom', '전')
('custom', ',')
('custom', '우리')
('custom', '의')
('custom', '선조')
('custom', '들')
('custom', '께서')
('custom', '...')
('updates', {'model': {'messages': [AIMessage(content='87년 전 , 우리 의 선조 들 께서 ...', additional_kwargs={}, response_metadata={})]}})

.astream_events를 사용한 커스텀 데이터 스트림

이미 그래프의 .astream_events 메서드를 워크플로우에서 사용하고 있다면, adispatch_custom_event를 사용하여 사용자 정의 이벤트를 발생함으로써 사용자 정의 데이터를 스트리밍할 수 있다.

그래프 정의

from langchain_core.runnables import RunnableConfig, RunnableLambda
from langchain_core.callbacks.manager import adispatch_custom_event


async def my_node(state: MessagesState, config: RunnableConfig):
    chunks = [
        "87년",
        "전",
        ",",
        "우리",
        "의",
        "선조",
        "들",
        "께서",
        "...",
    ]
    for chunk in chunks:
        await adispatch_custom_event(
            "my_custom_event",
            {"chunk": chunk},
            config=config, 
        )

    return {"messages": [AIMessage(content=" ".join(chunks))]}


workflow = StateGraph(MessagesState)

workflow.add_node("model", my_node)
workflow.add_edge(START, "model")
workflow.add_edge("model", END)

app = workflow.compile()

content 스트리밍

from langchain_core.messages import HumanMessage

inputs = [HumanMessage(content="무슨 생각 하고 있어?")]


async def stream_content():
    async for event in app.astream_events({"messages": inputs}, version="v2"):
        tags = event.get("tags", [])
        if event["event"] == "on_custom_event" and event["name"] == "my_custom_event":
            data = event["data"]
            if data:
                print(data["chunk"], end="|", flush=True)


asyncio.run(stream_content())
87년|전|,|우리|의|선조|들|께서|...|

LangGraph 참고 자료

반응형
  • 네이버 블로그 공유
  • 네이버 밴드 공유
  • 페이스북 공유
  • 카카오스토리 공유