LangGraph 공식문서를 번역한 내용입니다. 필요한 경우 부연 설명을 추가하였고 이해하기 쉽게 예제를 일부 변경하였습니다. 문제가 되면 삭제하겠습니다.
https://langchain-ai.github.io/langgraph/how-tos/streaming-content/
노드 내에서 스트리밍을 사용하는 가장 일반적인 용도는 LLM 토큰을 스트리밍하는 것이지만, 사용자 정의 데이터를 스트리밍하는 경우도 있다.
예를 들어, 긴 실행 시간을 가진 도구 호출이 있을 경우, 각 단계 사이에서 사용자 정의 이벤트를 발송하여 진행 상황을 모니터링할 수 있다. 이러한 사용자 정의 이벤트를 애플리케이션의 최종 사용자에게 표시하여 현재 작업이 어떻게 진행되고 있는지 보여줄 수도 있다.
두 가지 방법으로 이를 구현할 수 있다.
graph
의.stream
/.astream
메서드에서stream_mode="custom"
을 사용- adispatch_custom_events를 사용하여 사용자 정의 이벤트 발송
아래에서 두 가지 API를 사용하는 방법을 살펴보자.
준비
우선, 필요한 패키지를 설치하자.
pip install langgraph
.stream / .astream을 사용한 커스텀 데이터 스트림
그래프 정의
import asyncio
from langchain_core.messages import AIMessage
from langgraph.graph import START, StateGraph, MessagesState, END
from langgraph.types import StreamWriter
async def my_node(
state: MessagesState,
writer: StreamWriter, # <-- chunk가 스트리밍되도록 StreamWriter를 제공한다.
):
chunks = [
"87년",
"전",
",",
"우리",
"의",
"선조",
"들",
"께서",
"...",
]
for chunk in chunks:
# stream_mode=custom을 사용하여 스트리밍될 chunk를 작성한다.
writer(chunk)
return {"messages": [AIMessage(content=" ".join(chunks))]}
workflow = StateGraph(MessagesState)
workflow.add_node("model", my_node)
workflow.add_edge(START, "model")
workflow.add_edge("model", END)
app = workflow.compile()
content 스트리밍
inputs = [HumanMessage(content="무슨 생각 하고 있어?")]
async def stream_content():
async for chunk in app.astream({"messages": inputs}, stream_mode="custom"):
print(chunk, flush=True)
asyncio.run(stream_content())
87년
전
,
우리의
선조
들
께서
...
아마도 사용자 정의 데이터와 상태 업데이트 모두에 접근하려면 여러 스트리밍 모드를 사용해야 할 것이다.
from langchain_core.messages import HumanMessage
inputs = [HumanMessage(content="What are you thinking about?")]
async for chunk in app.astream({"messages": inputs}, stream_mode=["custom", "updates"]):
print(chunk, flush=True)
('custom', '87년')
('custom', '전')
('custom', ',')
('custom', '우리')
('custom', '의')
('custom', '선조')
('custom', '들')
('custom', '께서')
('custom', '...')
('updates', {'model': {'messages': [AIMessage(content='87년 전 , 우리 의 선조 들 께서 ...', additional_kwargs={}, response_metadata={})]}})
.astream_events를 사용한 커스텀 데이터 스트림
이미 그래프의 .astream_events
메서드를 워크플로우에서 사용하고 있다면, adispatch_custom_event
를 사용하여 사용자 정의 이벤트를 발생함으로써 사용자 정의 데이터를 스트리밍할 수 있다.
그래프 정의
from langchain_core.runnables import RunnableConfig, RunnableLambda
from langchain_core.callbacks.manager import adispatch_custom_event
async def my_node(state: MessagesState, config: RunnableConfig):
chunks = [
"87년",
"전",
",",
"우리",
"의",
"선조",
"들",
"께서",
"...",
]
for chunk in chunks:
await adispatch_custom_event(
"my_custom_event",
{"chunk": chunk},
config=config,
)
return {"messages": [AIMessage(content=" ".join(chunks))]}
workflow = StateGraph(MessagesState)
workflow.add_node("model", my_node)
workflow.add_edge(START, "model")
workflow.add_edge("model", END)
app = workflow.compile()
content 스트리밍
from langchain_core.messages import HumanMessage
inputs = [HumanMessage(content="무슨 생각 하고 있어?")]
async def stream_content():
async for event in app.astream_events({"messages": inputs}, version="v2"):
tags = event.get("tags", [])
if event["event"] == "on_custom_event" and event["name"] == "my_custom_event":
data = event["data"]
if data:
print(data["chunk"], end="|", flush=True)
asyncio.run(stream_content())
87년|전|,|우리|의|선조|들|께서|...|
LangGraph 참고 자료
- Controllability
- Persistence
- Memory
- Human-in-the-loop
- Streaming
- Tool calling
- Subgraphs
- State Management
- Other
- Prebuilt ReAct Agent
반응형