langgraph / / 2024. 12. 3. 07:30

[langgraph] 서브 그래프(subgraph)에서 상태를 조회하고 업데이트하는 방법

LangGraph 공식문서를 번역한 내용입니다. 필요한 경우 부연 설명을 추가하였고 이해하기 쉽게 예제를 일부 변경하였습니다. 문제가 되면 삭제하겠습니다.

https://langchain-ai.github.io/langgraph/how-tos/subgraphs-manage-state/

영속성을 추가하면 서브그래프의 상태를 언제든지 쉽게 조회하고 업데이트할 수 있다. 이것으로 인해 많은 사람이 개입하는 상호작용 패턴을 가능하게 한다.

  • 사용자에게 중단 시 상태를 표시하여 작업을 수락하도록 할 수 있다.
  • 서브그래프를 되돌려 문제를 재현하거나 피할 수 있다.
  • 사용자가 작업을 더 잘 제어할 수 있도록 상태를 수정할 수 있다.

이 가이드는 이를 수행하는 방법을 보여준다.

준비

우선, 필요한 패키지를 설치하자.

pip install langgraph

그래프 정의

먼저, 서브그래프를 설정하자. 이를 위해 특정 도시의 날씨를 가져오는 간단한 그래프를 만들자. 우리는 weather_node 이전에 중단점을 설정하여 이 그래프를 컴파일할 것이다.

from langgraph.graph import StateGraph, END, START, MessagesState
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI


@tool
def get_weather(city: str):
    """Get the weather for a specific city"""
    return f"{city}은 맑아요"


raw_model = ChatOpenAI()
model = raw_model.with_structured_output(get_weather)


class SubGraphState(MessagesState):
    city: str


def model_node(state: SubGraphState):
    result = model.invoke(state["messages"])
    return {"city": result["city"]}


def weather_node(state: SubGraphState):
    result = get_weather.invoke({"city": state["city"]})
    return {"messages": [{"role": "assistant", "content": result}]}


subgraph = StateGraph(SubGraphState)
subgraph.add_node(model_node)
subgraph.add_node(weather_node)
subgraph.add_edge(START, "model_node")
subgraph.add_edge("model_node", "weather_node")
subgraph.add_edge("weather_node", END)
subgraph = subgraph.compile(interrupt_before=["weather_node"])

부모 그래프 정의

이제 전체 그래프를 설정할 수 있다. 이 그래프는 날씨 정보를 얻어야 할 경우 서브그래프를 먼저 경유하고, 그렇지 않으면 일반 LLM으로 경유한다.

from typing import Literal
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver


memory = MemorySaver()


class RouterState(MessagesState):
    route: Literal["weather", "other"]


class Router(TypedDict):
    route: Literal["weather", "other"]


router_model = raw_model.with_structured_output(Router)


def router_node(state: RouterState):
    system_message = "다음 질문이 날씨에 관한 것인지 아닌지 분류해줘."
    messages = [{"role": "system", "content": system_message}] + state["messages"]
    route = router_model.invoke(messages)
    return {"route": route["route"]}


def normal_llm_node(state: RouterState):
    response = raw_model.invoke(state["messages"])
    return {"messages": [response]}


def route_after_prediction(
    state: RouterState,
) -> Literal["weather_graph", "normal_llm_node"]:
    if state["route"] == "weather":
        return "weather_graph"
    else:
        return "normal_llm_node"


graph = StateGraph(RouterState)
graph.add_node(router_node)
graph.add_node(normal_llm_node)
graph.add_node("weather_graph", subgraph)
graph.add_edge(START, "router_node")
graph.add_conditional_edges("router_node", route_after_prediction)
graph.add_edge("normal_llm_node", END)
graph.add_edge("weather_graph", END)
graph = graph.compile(checkpointer=memory)
from IPython.display import Image, display

display(
    Image(
        graph.get_graph(xray=1).draw_mermaid_png(
            output_file_path="how-to-view-and-update-state-in-subgraphs.png"
        )
    )
)

이제 정상적인 쿼리로 테스트하여 의도대로 작동하는지 확인해 보자.

config = {"configurable": {"thread_id": "1"}}
inputs = {"messages": [{"role": "user", "content": "안녕!"}]}
for update in graph.stream(inputs, config=config, stream_mode="updates"):
    print(update)
{'router_node': {'route': 'other'}}
{'normal_llm_node': {'messages': [AIMessage(content='안녕하세요! 무엇을 도와드릴까요?', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 21, 'prompt_tokens': 12, 'total_tokens': 33, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-3.5-turbo-0125', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-150cf0fb-f6a8-49e1-afc1-d3c55213b16b-0', usage_metadata={'input_tokens': 12, 'output_tokens': 21, 'total_tokens': 33, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}})]}}

날씨에 대해 묻지 않았기 때문에 LLM에서 정상적인 응답을 받았다.

중단점으로 부터 다시 시작

이제 중단점을 사용한 결과를 살펴보겠습니다. 날씨 서브그래프로 라우팅되어 중단점 노드가 있는 쿼리로 호출해보겠습니다.

config = {"configurable": {"thread_id": "2"}}
inputs = {"messages": [{"role": "user", "content": "서울 날씨 어때?"}]}
for update in graph.stream(inputs, config=config, stream_mode="updates"):
    print(update)
{'router_node': {'route': 'weather'}}

그래프 스트림에는 서브그래프 이벤트가 포함되지 않는다. 서브그래프 이벤트를 스트리밍하려면 subgraphs=True를 전달하여 서브그래프 이벤트를 다음과 같이 받을 수 있다.

config = {"configurable": {"thread_id": "3"}}
inputs = {"messages": [{"role": "user", "content": "서울 날씨 어때?"}]}
for update in graph.stream(inputs, config=config, stream_mode="values", subgraphs=True):
    print(update)
((), {'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='1c9af327-2f7e-4f5b-b399-5729788f287f')]})
((), {'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='1c9af327-2f7e-4f5b-b399-5729788f287f')], 'route': 'weather'})
(('weather_graph:9654f16f-8ea8-1c2c-c011-e5dda6128a12',), {'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='1c9af327-2f7e-4f5b-b399-5729788f287f')]})
(('weather_graph:9654f16f-8ea8-1c2c-c011-e5dda6128a12',), {'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='1c9af327-2f7e-4f5b-b399-5729788f287f')], 'city': 'Seoul'})

지금 상태를 확인하면, weather_graph에서 일시 중지된 것을 볼 수 있다.

state = graph.get_state(config)
print(state.next)
('weather_graph',)

현재 상태에서 대기 중인 작업을 보면 weather_graph라는 하나의 작업이 있으며, 이는 서브그래프 작업에 해당한다.

print(state.tasks)
(PregelTask(id='f51c8e97-5092-d9f9-4f2d-393f640d545c', name='weather_graph', path=('__pregel_pull', 'weather_graph'), error=None, interrupts=(), state={'configurable': {'thread_id': '3', 'checkpoint_ns': 'weather_graph:f51c8e97-5092-d9f9-4f2d-393f640d545c'}}, result=None),)

그러나 부모 그래프의 구성으로 상태를 가져왔기 때문에 서브그래프 상태에 접근할 수 없다. 위의 PregelTask 상태 값을 보면 부모 그래프의 구성만 포함되어 있음을 알 수 있다. 실제로 서브그래프 상태를 채우려면 get_statesubgraphs=True를 전달해야 한다.

state = graph.get_state(config, subgraphs=True)
print(state.tasks[0])
PregelTask(id='f51c8e97-5092-d9f9-4f2d-393f640d545c', name='weather_graph', path=('__pregel_pull', 'weather_graph'), error=None, interrupts=(), state=StateSnapshot(values={'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='b2a66468-edba-47dd-97ce-b94bc041ee56')], 'city': 'Seoul'}, next=('weather_node',), config={'configurable': {'thread_id': '3', 'checkpoint_ns': 'weather_graph:f51c8e97-5092-d9f9-4f2d-393f640d545c', 'checkpoint_id': '1efb034a-ba9b-638a-8001-775eab99ad0f', 'checkpoint_map': {'': '1efb034a-b4ea-6666-8001-0df2ef757fe9', 'weather_graph:f51c8e97-5092-d9f9-4f2d-393f640d545c': '1efb034a-ba9b-638a-8001-775eab99ad0f'}}}, metadata={'source': 'loop', 'writes': {'model_node': {'city': 'Seoul'}}, 'thread_id': '3', 'langgraph_step': 2, 'langgraph_node': 'weather_graph', 'langgraph_triggers': ['branch:router_node:route_after_prediction:weather_graph'], 'langgraph_path': ['__pregel_pull', 'weather_graph'], 'langgraph_checkpoint_ns': 'weather_graph:f51c8e97-5092-d9f9-4f2d-393f640d545c', 'checkpoint_ns': 'weather_graph:f51c8e97-5092-d9f9-4f2d-393f640d545c', 'step': 1, 'parents': {'': '1efb034a-b4ea-6666-8001-0df2ef757fe9'}}, created_at='2024-12-01T22:36:13.658766+00:00', parent_config={'configurable': {'thread_id': '3', 'checkpoint_ns': 'weather_graph:f51c8e97-5092-d9f9-4f2d-393f640d545c', 'checkpoint_id': '1efb034a-b4ee-6d4c-8000-a5f9e832b141', 'checkpoint_map': {'': '1efb034a-b4ea-6666-8001-0df2ef757fe9', 'weather_graph:f51c8e97-5092-d9f9-4f2d-393f640d545c': '1efb034a-b4ee-6d4c-8000-a5f9e832b141'}}}, tasks=(PregelTask(id='56bf1637-c1b5-4338-49ef-3fca05221768', name='weather_node', path=('__pregel_pull', 'weather_node'), error=None, interrupts=(), state=None, result=None),)), result=None)

이제 서브그래프 상태에 접근할 수 있다. PregelTask의 상태 값을 보면, weather_node와 같은 다음 노드와 현재 상태 값(예: 도시)과 같은 필요한 모든 정보를 볼 수 있다.

실행을 재개하려면, 일반적으로 외부 그래프를 호출하면 된다.

for update in graph.stream(None, config=config, stream_mode="values", subgraphs=True):
    print(update)
((), {'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='b2a66468-edba-47dd-97ce-b94bc041ee56')], 'route': 'weather'})
(('weather_graph:f51c8e97-5092-d9f9-4f2d-393f640d545c',), {'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='b2a66468-edba-47dd-97ce-b94bc041ee56')], 'city': 'Seoul'})
(('weather_graph:f51c8e97-5092-d9f9-4f2d-393f640d545c',), {'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='b2a66468-edba-47dd-97ce-b94bc041ee56'), AIMessage(content='Seoul은 맑아요', additional_kwargs={}, response_metadata={}, id='edfcc313-65a5-471a-b044-faa542cef2ae')], 'city': 'Seoul'})
((), {'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='b2a66468-edba-47dd-97ce-b94bc041ee56'), AIMessage(content='Seoul은 맑아요', additional_kwargs={}, response_metadata={}, id='edfcc313-65a5-471a-b044-faa542cef2ae')], 'route': 'weather'})

특정 서브그래프 노드에서 다시 시작

위 예제에서는 외부 그래프에서 재생을 진행했다. 이는 자동으로 서브그래프를 이전에 있던 상태(이번 경우에는 weather_node 앞에서 일시 정지된 상태)에서 다시 재생했기 때문이다. 하지만 서브그래프 내부에서 재생을 시작하는 것도 가능하다. 이를 위해서는 재생을 시작하고자 하는 정확한 서브그래프 상태에서 설정을 가져와야 한다.

서브그래프의 상태 기록을 탐색하여 model_node 앞의 상태를 선택하는 방법을 사용할 수 있다. 이는 .next 매개변수를 기준으로 필터링하여 할 수 있다.

서브그래프의 상태 기록을 가져오려면 먼저 설정을 전달해야 한다.

parent_graph_state_before_subgraph = next(
    h for h in graph.get_state_history(config) if h.next == ("weather_graph",)
)
subgraph_state_before_model_node = next(
    h
    for h in graph.get_state_history(parent_graph_state_before_subgraph.tasks[0].state)
    if h.next == ("model_node",)
)

# 이 패턴은 얼마나 깊이 들어가도 확장될 수 있다.
# subsubgraph_stat_history = next(h for h in graph.get_state_history(subgraph_state_before_model_node.tasks[0].state) if h.next == ('my_subsubgraph_node',))

우리는 서브그래프 상태 기록에서 subgraph_state_before_model_node.next 매개변수를 비교함으로써 올바른 상태를 얻었음을 확인할 수 있다.

print(subgraph_state_before_model_node.next)
('model_node',)

완벽합니다! 우리는 올바른 상태 스냅샷을 얻었고, 이제 서브그래프 내의 model_node에서 실행을 재개할 수 있다.

for value in graph.stream(
    None,
    config=subgraph_state_before_model_node.config,
    stream_mode="values",
    subgraphs=True,
):
    print(value)
((), {'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='cdcb4f23-d9d6-4dcf-a2fa-ef6a5ec6e701')], 'route': 'weather'})
(('weather_graph:79e5887e-3323-4abe-7262-59880fba9595',), {'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='cdcb4f23-d9d6-4dcf-a2fa-ef6a5ec6e701')]})
(('weather_graph:79e5887e-3323-4abe-7262-59880fba9595',), {'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='cdcb4f23-d9d6-4dcf-a2fa-ef6a5ec6e701')], 'city': 'Seoul'})

이 하위 섹션에서는 그래프 내에서 얼마나 깊게 중첩되어 있든지 상관없이 어떤 노드에서든 재생할 수 있는 방법을 보여주었다. 이는 에이전트의 결정론적 성질을 테스트하는 데 매우 유용한 도구이다.

상태 수정하기

서브그래프 상태 수정

서브그래프의 상태를 수정하려면 어떻게 해야 할까? 이는 일반 그래프의 상태를 업데이트하는 방법과 비슷하게 할 수 있으며, 서브그래프의 상태를 업데이트할 때는 서브그래프의 구성(config)을 update_state에 전달하는 것을 주의해야 한다.

config = {"configurable": {"thread_id": "4"}}
inputs = {"messages": [{"role": "user", "content": "서울 날씨 어때?"}]}
for update in graph.stream(inputs, config=config, stream_mode="updates"):
    print(update)
{'router_node': {'route': 'weather'}}
state = graph.get_state(config, subgraphs=True)
print(state.values["messages"])
[HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='ed11a965-79d4-4988-92e1-62b843e7bcfb')]

내부 그래프의 상태를 업데이트하려면 내부 그래프의 구성을 전달해야 한다. 이는 state.tasks[0].state.config를 호출하여 얻을 수 있다. 서브그래프 내부에서 중단했기 때문에, 작업의 상태는 바로 서브그래프의 상태이다.

graph.update_state(state.tasks[0].state.config, {"city": "부산"})
{'configurable': {'thread_id': '4',
  'checkpoint_ns': 'weather_graph:67f32ef7-aee0-8a20-0eb0-eeea0fd6de6e',
  'checkpoint_id': '1ef75e5a-0b00-6bc0-8002-5726e210fef4',
  'checkpoint_map': {'': '1ef75e59-1b13-6ffe-8001-0844ae748fd5',
   'weather_graph:67f32ef7-aee0-8a20-0eb0-eeea0fd6de6e': '1ef75e5a-0b00-6bc0-8002-5726e210fef4'}}}

이제 외부 그래프의 스트리밍을 재개할 수 있다(서브그래프도 재개하게 된다). 그리고 우리가 서울 대신 부산을 사용하도록 검색을 업데이트한 것을 확인할 수 있다.

for update in graph.stream(None, config=config, stream_mode="updates", subgraphs=True):
    print(update)
(('weather_graph:bd207c21-f79a-582d-61c1-d4336d6c0c1f',), {'weather_node': {'messages': [{'role': 'assistant', 'content': '부산은 맑아요'}]}})
((), {'weather_graph': {'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='ed11a965-79d4-4988-92e1-62b843e7bcfb'), AIMessage(content='부산은 맑아요', additional_kwargs={}, response_metadata={}, id='7f8943f0-4042-49db-87ad-2a97b5c3d70e')]}})

AI가 예상대로 "부산은 맑아요"라고 응답했다.

서브그래프 노트처럼 행동하기

상태를 업데이트하는 또 다른 방법은 위에서 했던 것처럼 weather_node가 실행되기 전에 상태를 수정하는 대신, 우리가 직접 weather_node 역할을 하는 것이다. 이를 위해서는 subgraph 구성과 함께 as_node 인자를 전달하면 된다. 이를 통해 지정한 노드처럼 상태를 업데이트할 수 있다. 즉, weather_node 전에 인터럽트를 설정하고 상태 업데이트 기능을 weather_node로 사용하면, 그래프 자체는 weather_node를 직접 호출하지 않고 대신 우리가 weather_node의 출력을 결정하게 된다.

config = {"configurable": {"thread_id": "14"}}
inputs = {"messages": [{"role": "user", "content": "서울 날씨 어때?"}]}
for update in graph.stream(
    inputs, config=config, stream_mode="updates", subgraphs=True
):
    print(update)
print("interrupted!")

state = graph.get_state(config, subgraphs=True)

graph.update_state(
    state.tasks[0].state.config,
    {"messages": [{"role": "assistant", "content": "비와요"}]},
    as_node="weather_node",
)
for update in graph.stream(None, config=config, stream_mode="updates", subgraphs=True):
    print(update)

print(graph.get_state(config).values["messages"])
((), {'router_node': {'route': 'weather'}})
(('weather_graph:1ac23112-9b5d-caf1-24f2-18b7b7c2adf1',), {'model_node': {'city': 'Seoul'}})
((), {'__interrupt__': ()})
interrupted!
((), {'weather_graph': {'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='8a7c8872-5cd6-4c8d-939f-667b1148a5da'), AIMessage(content='rainy', additional_kwargs={}, response_metadata={}, id='78c8c244-a6d8-47db-8088-b2f86374cabc')]}})
[HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='8a7c8872-5cd6-4c8d-939f-667b1148a5da'), AIMessage(content='비와요', additional_kwargs={}, response_metadata={}, id='78c8c244-a6d8-47db-8088-b2f86374cabc')]

AI가 우리가 직접 전달한 메시지로 응답했다.

전체 서브그래프처럼 행동하기

마지막으로, 우리는 전체 서브그래프를 대상으로 그래프를 업데이트할 수도 있다. 이는 위의 경우와 유사하지만, 날씨 노드만 대상으로 하는 대신 전체 서브그래프를 대상으로 하는 방식이다. 이는 일반 그래프 구성을 전달하고, as_node 인자를 사용하여 우리가 전체 서브그래프 노드로서 행동하고 있음을 지정함으로써 이루어진다.

config = {"configurable": {"thread_id": "8"}}
inputs = {"messages": [{"role": "user", "content": "서울 날씨 어때?"}]}
for update in graph.stream(
    inputs, config=config, stream_mode="updates", subgraphs=True
):
    print(update)
print("interrupted!")

graph.update_state(
    config,
    {"messages": [{"role": "assistant", "content": "비와요"}]},
    as_node="weather_graph",
)
for update in graph.stream(None, config=config, stream_mode="updates"):
    print(update)

print(graph.get_state(config).values["messages"])
((), {'router_node': {'route': 'weather'}})
(('weather_graph:a36c46f1-a2e7-cc73-f42d-547d5fec0a2f',), {'model_node': {'city': 'Seoul'}})
((), {'__interrupt__': ()})
interrupted!
[HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='2e7e064f-238b-454a-89d8-a4a9493d31d2'), AIMessage(content='비와요', additional_kwargs={}, response_metadata={}, id='ac0bb3e0-db56-41db-a99a-2f1e97b0d5f9')]

다시 말해, AI는 우리가 예상한 대로 "비와요"라고 응답했다.

이중 중첩된 서브그래프

이 동일한 기능은 중첩 수준에 관계없이 계속 작동한다. 다음은 이중 중첩 서브그래프에서 동일한 작업을 수행하는 예이다(어떤 수준의 중첩도 작동한다). 우리는 이미 정의된 그래프 위에 또 다른 라우터를 추가한다.

from typing import Literal

from dotenv import load_dotenv
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.constants import START, END
from langgraph.graph import StateGraph, MessagesState
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver

load_dotenv()


@tool
def get_weather(city: str):
    """Get the weather for a specific city"""
    return f"{city}은 맑아요"


raw_model = ChatOpenAI()
model = raw_model.with_structured_output(get_weather)


class SubGraphState(MessagesState):
    city: str


def model_node(state: SubGraphState):
    result = model.invoke(state["messages"])
    return {"city": result["city"]}


def weather_node(state: SubGraphState):
    result = get_weather.invoke({"city": state["city"]})
    return {"messages": [{"role": "assistant", "content": result}]}


subgraph = StateGraph(SubGraphState)
subgraph.add_node(model_node)
subgraph.add_node(weather_node)
subgraph.add_edge(START, "model_node")
subgraph.add_edge("model_node", "weather_node")
subgraph.add_edge("weather_node", END)
subgraph = subgraph.compile(interrupt_before=["weather_node"])

memory = MemorySaver()


class RouterState(MessagesState):
    route: Literal["weather", "other"]


class Router(TypedDict):
    route: Literal["weather", "other"]


router_model = raw_model.with_structured_output(Router)


def router_node(state: RouterState):
    system_message = "다음 질문이 날씨에 관한 것인지 아닌지 분류해줘."
    messages = [{"role": "system", "content": system_message}] + state["messages"]
    route = router_model.invoke(messages)
    return {"route": route["route"]}


def normal_llm_node(state: RouterState):
    response = raw_model.invoke(state["messages"])
    return {"messages": [response]}


def route_after_prediction(
    state: RouterState,
) -> Literal["weather_graph", "normal_llm_node"]:
    if state["route"] == "weather":
        return "weather_graph"
    else:
        return "normal_llm_node"


graph = StateGraph(RouterState)
graph.add_node(router_node)
graph.add_node(normal_llm_node)
graph.add_node("weather_graph", subgraph)
graph.add_edge(START, "router_node")
graph.add_conditional_edges("router_node", route_after_prediction)
graph.add_edge("normal_llm_node", END)
graph.add_edge("weather_graph", END)
graph = graph.compile()
from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()


class GrandfatherState(MessagesState):
    to_continue: bool


def router_node(state: GrandfatherState):
    return {"to_continue": True}


def route_after_prediction(state: GrandfatherState):
    if state["to_continue"]:
        return "graph"
    else:
        return END


grandparent_graph = StateGraph(GrandfatherState)
grandparent_graph.add_node(router_node)
grandparent_graph.add_node("graph", graph)
grandparent_graph.add_edge(START, "router_node")
grandparent_graph.add_conditional_edges(
    "router_node", route_after_prediction, ["graph", END]
)
grandparent_graph.add_edge("graph", END)
grandparent_graph = grandparent_graph.compile(checkpointer=MemorySaver())
from IPython.display import Image, display

display(
    Image(
        grandparent_graph.get_graph(xray=2).draw_mermaid_png(
            output_file_path="how-to-view-and-update-state-in-subgraphs-double-nested-subgraph.png"
        )
    )
)

인터럽트까지 실행하면 이제 세 개의 그래프 상태에 대한 스냅샷을 볼 수 있다.

config = {"configurable": {"thread_id": "2"}}
inputs = {"messages": [{"role": "user", "content": "서울 날씨 어때?"}]}
for update in grandparent_graph.stream(
    inputs, config=config, stream_mode="updates", subgraphs=True
):
    print(update)
((), {'router_node': {'to_continue': True}})
(('graph:ce4f9447-79cc-8097-ac0d-ad0755ca0736',), {'router_node': {'route': 'weather'}})
(('graph:ce4f9447-79cc-8097-ac0d-ad0755ca0736', 'weather_graph:8e7d41c2-2928-8a7a-1469-fe2d9b6150cc'), {'model_node': {'city': 'Seoul'}})
((), {'__interrupt__': ()})
state = grandparent_graph.get_state(config, subgraphs=True)
print("Grandparent State:")
print(state.values)
print("---------------")
print("Parent Graph State:")
print(state.tasks[0].state.values)
print("---------------")
print("Subgraph State:")
print(state.tasks[0].state.tasks[0].state.values)
Grandparent State:
{'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='fff417b0-448e-46d3-9ee8-0363e10204d1')], 'to_continue': True}
---------------
Parent Graph State:
{'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='fff417b0-448e-46d3-9ee8-0363e10204d1')], 'route': 'weather'}
---------------
Subgraph State:
{'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='fff417b0-448e-46d3-9ee8-0363e10204d1')], 'city': 'Seoul'}

이제 세 번째 레벨의 노드로 행동하면서 계속 진행할 수 있다.

grandparent_graph_state = state
parent_graph_state = grandparent_graph_state.tasks[0].state
subgraph_state = parent_graph_state.tasks[0].state
grandparent_graph.update_state(
    subgraph_state.config,
    {"messages": [{"role": "assistant", "content": "rainy"}]},
    as_node="weather_node",
)
for update in grandparent_graph.stream(
    None, config=config, stream_mode="updates", subgraphs=True
):
    print(update)

print(grandparent_graph.get_state(config).values["messages"])
(('graph:ce4f9447-79cc-8097-ac0d-ad0755ca0736',), {'weather_graph': {'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='fff417b0-448e-46d3-9ee8-0363e10204d1'), AIMessage(content='비와요', additional_kwargs={}, response_metadata={}, id='01c76f1a-87ae-4d9e-ae6f-2e04fcddfad2')]}})
((), {'graph': {'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='fff417b0-448e-46d3-9ee8-0363e10204d1'), AIMessage(content='비와요', additional_kwargs={}, response_metadata={}, id='01c76f1a-87ae-4d9e-ae6f-2e04fcddfad2')]}})
[HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='fff417b0-448e-46d3-9ee8-0363e10204d1'), AIMessage(content='비와요', additional_kwargs={}, response_metadata={}, id='01c76f1a-87ae-4d9e-ae6f-2e04fcddfad2')]

위의 경우들과 마찬가지로, AI는 예상대로 "rainy"라는 응답을 반환한다. 우리는 상태 기록을 탐색하여 각 단계에서 조부모 그래프의 상태가 어떻게 업데이트되었는지 확인할 수 있다.

for state in grandparent_graph.get_state_history(config):
    print(state)
    print("-----")
StateSnapshot(values={'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='fff417b0-448e-46d3-9ee8-0363e10204d1'), AIMessage(content='비와요', additional_kwargs={}, response_metadata={}, id='01c76f1a-87ae-4d9e-ae6f-2e04fcddfad2')], 'to_continue': True}, next=(), config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1efb036c-2a2b-6224-8002-6cec70444b27'}}, metadata={'source': 'loop', 'writes': {'graph': {'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='fff417b0-448e-46d3-9ee8-0363e10204d1'), AIMessage(content='비와요', additional_kwargs={}, response_metadata={}, id='01c76f1a-87ae-4d9e-ae6f-2e04fcddfad2')]}}, 'thread_id': '2', 'step': 2, 'parents': {}}, created_at='2024-12-01T22:51:11.193941+00:00', parent_config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1efb036c-1df7-64ee-8001-5d493edbbab0'}}, tasks=())
-----
StateSnapshot(values={'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='fff417b0-448e-46d3-9ee8-0363e10204d1')], 'to_continue': True}, next=('graph',), config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1efb036c-1df7-64ee-8001-5d493edbbab0'}}, metadata={'source': 'loop', 'writes': {'router_node': {'to_continue': True}}, 'thread_id': '2', 'step': 1, 'parents': {}}, created_at='2024-12-01T22:51:09.914424+00:00', parent_config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1efb036c-1df3-64ca-8000-41f128acea1b'}}, tasks=(PregelTask(id='ce4f9447-79cc-8097-ac0d-ad0755ca0736', name='graph', path=('__pregel_pull', 'graph'), error=None, interrupts=(), state={'configurable': {'thread_id': '2', 'checkpoint_ns': 'graph:ce4f9447-79cc-8097-ac0d-ad0755ca0736'}}, result={'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='fff417b0-448e-46d3-9ee8-0363e10204d1'), AIMessage(content='비와요', additional_kwargs={}, response_metadata={}, id='01c76f1a-87ae-4d9e-ae6f-2e04fcddfad2')]}),))
-----
StateSnapshot(values={'messages': [HumanMessage(content='서울 날씨 어때?', additional_kwargs={}, response_metadata={}, id='fff417b0-448e-46d3-9ee8-0363e10204d1')]}, next=('router_node',), config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1efb036c-1df3-64ca-8000-41f128acea1b'}}, metadata={'source': 'loop', 'writes': None, 'thread_id': '2', 'step': 0, 'parents': {}}, created_at='2024-12-01T22:51:09.912774+00:00', parent_config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1efb036c-1de9-6f4c-bfff-6f5056430c0f'}}, tasks=(PregelTask(id='5850c40f-01d8-7709-9560-c32db515a159', name='router_node', path=('__pregel_pull', 'router_node'), error=None, interrupts=(), state=None, result={'to_continue': True}),))
-----
StateSnapshot(values={'messages': []}, next=('__start__',), config={'configurable': {'thread_id': '2', 'checkpoint_ns': '', 'checkpoint_id': '1efb036c-1de9-6f4c-bfff-6f5056430c0f'}}, metadata={'source': 'input', 'writes': {'__start__': {'messages': [{'role': 'user', 'content': '서울 날씨 어때?'}]}}, 'thread_id': '2', 'step': -1, 'parents': {}}, created_at='2024-12-01T22:51:09.908958+00:00', parent_config=None, tasks=(PregelTask(id='9b978aca-6f00-0327-b758-43b915583c55', name='__start__', path=('__pregel_pull', '__start__'), error=None, interrupts=(), state=None, result={'messages': [{'role': 'user', 'content': '서울 날씨 어때?'}]}),))
-----

LangGraph 참고 자료

반응형
  • 네이버 블로그 공유
  • 네이버 밴드 공유
  • 페이스북 공유
  • 카카오스토리 공유