Pizza agent demo
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from typing import Annotated, TypedDict
from langchain_core.language_models.fake_chat_models import FakeMessagesListChatModel
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage
from langchain_core.tools import tool
from langgraph.graph import END, START, StateGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from memstate import InMemoryStorage, MemoryStore
from memstate.integrations.langgraph import MemStateCheckpointer
# --- 1. Business Logic (Tools) ---
# Note: These functions don't know anything about LLM. They simply modify data.
@tool
def create_order(pizza_type: str):
"""Start a new pizza order."""
return {"status": "created", "type": pizza_type, "toppings": []}
@tool
def add_topping(topping: str):
"""Add a topping to the current order."""
return f"Added {topping}"
@tool
def cancel_order():
"""Cancel the order."""
return "Order cancelled"
tools = [create_order, add_topping, cancel_order]
# --- 2. State Definition ---
class AgentState(TypedDict):
# Message history (standard for chat)
messages: Annotated[list[BaseMessage], add_messages]
# !!! IMPORTANT: Structured business state!!!
# LangGraph will store this state, and MemState will persist it in SQL.
current_order: dict | None
# --- 3. Mock Model ---
class FakeModelWithTools(FakeMessagesListChatModel):
def bind_tools(self, tools, **kwargs):
return self
# Dialogue Script:
# 1. User: "I want Pepperoni" -> AI calls create_order
response_1_tool = AIMessage(
content="", tool_calls=[{"name": "create_order", "args": {"pizza_type": "Pepperoni"}, "id": "call_1"}]
)
# 2. Tool works -> AI confirms
response_1_final = AIMessage(content="I've started a Pepperoni order for you.")
# 3. User: "Add mushrooms" -> AI calls add_topping
response_2_tool = AIMessage(
content="", tool_calls=[{"name": "add_topping", "args": {"topping": "mushrooms"}, "id": "call_2"}]
)
# 4. Tool works -> AI confirms
response_2_final = AIMessage(content="Added mushrooms to your Pepperoni pizza.")
model = FakeModelWithTools(responses=[response_1_tool, response_1_final, response_2_tool, response_2_final]).bind_tools(
tools
)
# --- 4. Nodes Logic ---
def agent_node(state: AgentState):
response = model.invoke(state["messages"])
return {"messages": [response]}
def tool_node_wrapper(state: AgentState):
"""
A wrapper around ToolNode.
Here we intercept the tool's execution result and update the 'current_order' in the state.
"""
last_msg = state["messages"][-1]
tool_calls = last_msg.tool_calls
# First, we make the tools in the standard way
tool_executor = ToolNode(tools)
tool_result = tool_executor.invoke(state)
# NOW THE MAGIC: Updating a structured state based on actions
new_order_state = state.get("current_order", {}) or {}
for call in tool_calls:
if call["name"] == "create_order":
# Initialize the order
new_order_state = {"id": 1, "type": call["args"]["pizza_type"], "toppings": [], "status": "active"}
elif call["name"] == "add_topping":
# Modifying the order
if new_order_state:
new_order_state["toppings"].append(call["args"]["topping"])
# Return updated messages AND updated business state
return {"messages": tool_result["messages"], "current_order": new_order_state}
def should_continue(state: AgentState):
if state["messages"][-1].tool_calls:
return "tools"
return END
# --- 5. Assembly and Persistence ---
# Connect MemState
storage = InMemoryStorage()
memory = MemoryStore(storage)
checkpointer = MemStateCheckpointer(memory=memory)
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node_wrapper) # Use our smart wrapper
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("tools", "agent")
app = workflow.compile(checkpointer=checkpointer)
# --- 6. RUN DEMO ---
config = {"configurable": {"thread_id": "user_session_1"}}
print("🍕 Pizza Agent Started (backed by MemState)\n")
# --- STEP 1: Creating an order ---
print(">>> User: I want a Pepperoni pizza.")
# Launch the graph
for update in app.stream({"messages": [HumanMessage(content="I want a Pepperoni pizza.")]}, config=config):
pass # Just scroll through the graph
# Get the final state from MemState
state_v1 = app.get_state(config)
print(f"🤖 AI: {state_v1.values['messages'][-1]['content']}")
print(f"📦 DB STATE (Current Order): {state_v1.values.get('current_order')}")
# Expected: type=Pepperoni, toppings=[]
print("\n... (Simulating server restart / user coffee break) ...\n")
# --- STEP 2: Change Order (Context Saved!) ---
print(">>> User: Actually, add mushrooms.")
# LangGraph will automatically load the state from MemState by thread_id
for update in app.stream({"messages": [HumanMessage(content="Actually, add mushrooms.")]}, config=config):
pass
state_v2 = app.get_state(config)
print(f"🤖 AI: {state_v2.values['messages'][-1]['content']}")
print(f"📦 DB STATE (Current Order): {state_v2.values.get('current_order')}")
# Expected: type=Pepperoni, toppings=['mushrooms']
print("\n--- 🕵️♀️ MEMSTATE AUDIT ---")
# Now let's show off its "power": we can view the order change history via SQL.
# LangGraph stores checkpoints.
checkpoints = memory.query(typename="langgraph_checkpoint")
print(f"Total transaction steps saved: {len(checkpoints)}")
# Let's find the moment when the order was created and when it was updated
for cp in checkpoints:
# Payload in MemState stores the LangGraph checkpoint structure
order_snapshot = cp["payload"]["checkpoint"]["channel_values"].get("current_order")
if order_snapshot:
ts = cp["ts"]
print(f"🕒 [{ts}] Order State: {order_snapshot}")
print("\n✅ Power Demo: State was persisted, modified transactionally, and is fully auditable via SQL.")