Introduction to LangGraph Tutorial

LangGraph
LLMs
Agents
Python
Author

Lawrence Wu

Published

September 20, 2024

The LangChain team recently released the first course in their LangChain Academy called Introduction to LangGraph (repo). As I’m working through it I will make some notes on what I’ve learned. Note many of these snippets were generated using Claude 3.5 Sonnet (passing a prompt and the Jupyter notebook plain text, it did a better job than o1-preview, surprisingly)

Module 2 - State and Memory

Lesson 2 - State Reducers

Reducers are used to specify how state updates are performed when multiple nodes try to update the same key:

from typing import Annotated
from operator import add

class State(TypedDict):
    foo: Annotated[list[int], add]

Custom reducers can be defined to handle complex state update logic:

def reduce_list(left: list | None, right: list | None) -> list:
    if not left:
        left = []
    if not right:
        right = []
    return left + right

class CustomReducerState(TypedDict):
    foo: Annotated[list[int], reduce_list]

MessagesState is a useful shortcut for working with message-based states. These two are equivalent:

from typing import Annotated
from langgraph.graph import MessagesState
from langchain_core.messages import AnyMessage
from langgraph.graph.message import add_messages

# Define a custom TypedDict that includes a list of messages with add_messages reducer
class CustomMessagesState(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]
    added_key_1: str
    added_key_2: str
    # etc

# Use MessagesState, which includes the messages key with add_messages reducer
class ExtendedMessagesState(MessagesState):
    # Add any keys needed beyond messages, which is pre-built 
    added_key_1: str
    added_key_2: str
    # etc

The add_messages reducer allows appending messages to the state:

from langgraph.graph.message import add_messages
from langchain_core.messages import AIMessage, HumanMessage

new_state = add_messages(existing_messages, new_message)

Messages can be overwritten by using the same ID:

new_message = HumanMessage(content="New content", name="User", id="existing_id")
updated_state = add_messages(existing_messages, new_message)

Messages can be removed using RemoveMessage:

from langchain_core.messages import RemoveMessage

delete_messages = [RemoveMessage(id=m.id) for m in messages_to_delete]
updated_state = add_messages(existing_messages, delete_messages)

Lesson 3 - Multiple Schemas

  • Notebook
  • A graph can have multiple states. This is useful for controlling what information is shown to the user.

Private State: You can pass private state between nodes that isn’t relevant for the overall graph input or output.

from typing_extensions import TypedDict
from IPython.display import Image, display
from langgraph.graph import StateGraph, START, END

class OverallState(TypedDict):
    foo: int

class PrivateState(TypedDict):
    baz: int

def node_1(state: OverallState) -> PrivateState:
    print("---Node 1---")
    return {"baz": state['foo'] + 1}

def node_2(state: PrivateState) -> OverallState:
    print("---Node 2---")
    return {"foo": state['baz'] + 1}

# Build graph
builder = StateGraph(OverallState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", node_2)

# Logic
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
builder.add_edge("node_2", END)

# Add
graph = builder.compile()

Input/Output Schema: You can define explicit input and output schemas for a graph, which is useful for constraining the input and output. Filtering: Input and output schemas perform filtering on what keys are permitted on the input and output of the graph.

class InputState(TypedDict):
    question: str

class OutputState(TypedDict):
    answer: str

class OverallState(TypedDict):
    question: str
    answer: str
    notes: str

def thinking_node(state: InputState):
    return {"answer": "bye", "notes": "... his is name is Lance"}

def answer_node(state: OverallState) -> OutputState:
    return {"answer": "bye Lance"}

graph = StateGraph(OverallState, input=InputState, output=OutputState)
graph.add_node("answer_node", answer_node)
graph.add_node("thinking_node", thinking_node)
graph.add_edge(START, "thinking_node")
graph.add_edge("thinking_node", "answer_node")
graph.add_edge("answer_node", END)

graph = graph.compile()

# View
display(Image(graph.get_graph().draw_mermaid_png()))

graph.invoke({"question":"hi"})
# Output: {'answer': 'bye Lance'}

Lesson 4 - Trim and Filter Messages

  • Notebook
  • You can filter messages using the RemoveMessage class.
  • As a use case, you can preserve the state (e.g. with 5 messages in the message history) but only call the LLM with the last n messages
  • You can also trim messages based on a set number of tokens using trim_messages

Filtering messages using RemoveMessage:

from langchain_core.messages import RemoveMessage

def filter_messages(state: MessagesState):
    # Delete all but the 2 most recent messages
    delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
    return {"messages": delete_messages}

builder = StateGraph(MessagesState)
builder.add_node("filter", filter_messages)
builder.add_node("chat_model", chat_model_node)
builder.add_edge(START, "filter")
builder.add_edge("filter", "chat_model")

Trimming messages based on token count:

from langchain_core.messages import trim_messages

def chat_model_node(state: MessagesState):
    messages = trim_messages(
            state["messages"],
            max_tokens=100,
            strategy="last",
            token_counter=ChatOpenAI(model="gpt-4o"),
            allow_partial=False,
        )
    return {"messages": [llm.invoke(messages)]}

Lesson 5 - Chatbot w/ Summarizing Messages and Memory

  • Interesting example of using the above ideas to create a chatbot that creates a running summary of messages as a way of condensing the memory.
  • You can pass a thread to the LangChain runnable and the runnable will continue the conversation from that previous state.
from langgraph.graph import MessagesState
class State(MessagesState):
    summary: str

from langchain_core.messages import SystemMessage, HumanMessage, RemoveMessage

# Define the logic to call the model
def call_model(state: State):
    
    # Get summary if it exists
    summary = state.get("summary", "")

    # If there is summary, then we add it
    if summary:
        
        # Add summary to system message
        system_message = f"Summary of conversation earlier: {summary}"

        # Append summary to any newer messages
        messages = [SystemMessage(content=system_message)] + state["messages"]
    
    else:
        messages = state["messages"]
    
    response = model.invoke(messages)
    return {"messages": response}

Note, here we’ll use RemoveMessage to filter our state after we’ve produced the summary.

def summarize_conversation(state: State):
    
    # First, we get any existing summary
    summary = state.get("summary", "")

    # Create our summarization prompt 
    if summary:
        
        # A summary already exists
        summary_message = (
            f"This is summary of the conversation to date: {summary}\n\n"
            "Extend the summary by taking into account the new messages above:"
        )
        
    else:
        summary_message = "Create a summary of the conversation above:"

    # Add prompt to our history
    messages = state["messages"] + [HumanMessage(content=summary_message)]
    response = model.invoke(messages)
    
    # Delete all but the 2 most recent messages
    delete_messages = [RemoveMessage(id=m.id) for m in state["messages"][:-2]]
    return {"summary": response.content, "messages": delete_messages}

Adding memory:

from IPython.display import Image, display
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, START

# Define a new graph
workflow = StateGraph(State)
workflow.add_node("conversation", call_model)
workflow.add_node(summarize_conversation)

# Set the entrypoint as conversation
workflow.add_edge(START, "conversation")
workflow.add_conditional_edges("conversation", should_continue)
workflow.add_edge("summarize_conversation", END)

A checkpointer saves the state at each step as a checkpoint. These saved checkpoints can be grouped into a thread of conversation. Below we setting a thread_id. You can then continue the conversation by passing the config to the LangChain Runnable.

# Create a thread
config = {"configurable": {"thread_id": "1"}}

# Start conversation
input_message = HumanMessage(content="hi! I'm Lance")
output = graph.invoke({"messages": [input_message]}, config) 
for m in output['messages'][-1:]:
    m.pretty_print()

input_message = HumanMessage(content="what's my name?")
output = graph.invoke({"messages": [input_message]}, config) 
for m in output['messages'][-1:]:
    m.pretty_print()

input_message = HumanMessage(content="i like the 49ers!")
output = graph.invoke({"messages": [input_message]}, config) 
for m in output['messages'][-1:]:
    m.pretty_print()

Lesson 6 - Chatbot w/ Summarizing Messages and External Memory

  • Notebook
  • You can easily configure external memory to a database like sqlite.
  • Therefore you can persist memory across notebook sessions