In this example, we will stream tokens from the language model powering an
agent. We will use a ReAct agent as an example.
Note
If you are using a version of @langchain/core < 0.2.3, when calling chat models or LLMs you need to call await model.stream() within your nodes to get token-by-token streaming events, and aggregate final outputs if needed to update the graph state. In later versions of @langchain/core, this occurs automatically, and you can call await model.invoke().
For more on how to upgrade @langchain/core, check out the instructions here.
This how-to guide closely follows the others in this directory, showing how to
incorporate the functionality into a prototypical agent in LangGraph.
Streaming Support
Token streaming is supported by many, but not all chat models. Check to see if your LLM integration supports token streaming here (doc). Note that some integrations may support general token streaming but lack support for streaming tool calls.
Note
In this how-to, we will create our agent from scratch to be transparent (but verbose). You can accomplish similar functionality using the createReactAgent({ llm, tools }) (API doc) constructor. This may be more appropriate if you are used to LangChain's AgentExecutor class.
First define the tools you want to use. For this simple example, we'll create a placeholder search engine, but see the documentation here on how to create your own custom tools.
import{tool}from"@langchain/core/tools";import{z}from"zod";constsearchTool=tool((_)=>{// This is a placeholder for the actual implementationreturn"Cold, with a low of 3℃";},{name:"search",description:"Use to surf the web, fetch current information, check the weather, and retrieve other information.",schema:z.object({query:z.string().describe("The query to use in your search."),}),});awaitsearchTool.invoke({query:"What's the weather like?"});consttools=[searchTool];
We can now wrap these tools in a prebuilt
ToolNode.
This object will actually run the tools (functions) whenever they are invoked by
our LLM.
import{StateGraph,END}from"@langchain/langgraph";import{AIMessage}from"@langchain/core/messages";constrouteMessage=(state:typeofStateAnnotation.State)=>{const{messages}=state;constlastMessage=messages[messages.length-1]asAIMessage;// If no tools are called, we can finish (respond to the user)if(!lastMessage?.tool_calls?.length){returnEND;}// Otherwise if there is, we continue and call the toolsreturn"tools";};constcallModel=async(state:typeofStateAnnotation.State,)=>{// For versions of @langchain/core < 0.2.3, you must call `.stream()`// and aggregate the message from chunks instead of calling `.invoke()`.const{messages}=state;constresponseMessage=awaitboundModel.invoke(messages);return{messages:[responseMessage]};};constworkflow=newStateGraph(StateAnnotation).addNode("agent",callModel).addNode("tools",toolNode).addEdge("__start__","agent").addConditionalEdges("agent",routeMessage).addEdge("tools","agent");constagent=workflow.compile();
This section requires @langchain/langgraph>=0.2.20. For help upgrading, see this guide.
For this method, you must be using an LLM that supports streaming as well (e.g. new ChatOpenAI({ model: "gpt-4o-mini" })) or call .stream on the internal LLM call.
import{isAIMessageChunk}from"@langchain/core/messages";conststream=awaitagent.stream({messages:[{role:"user",content:"What's the current weather in Nepal?"}]},{streamMode:"messages"},);forawait(const[message,_metadata]ofstream){if(isAIMessageChunk(message)&&message.tool_call_chunks?.length){console.log(`${message.getType()} MESSAGE TOOL CALL CHUNK: ${message.tool_call_chunks[0].args}`);}else{console.log(`${message.getType()} MESSAGE CONTENT: ${message.content}`);}}
If you wish to disable streaming for a given node or model call, you can add a "nostream" tag. Here's an example where we add an initial node with an LLM call that will not be streamed in the final output:
import{RunnableLambda}from"@langchain/core/runnables";constunstreamed=async(_:typeofStateAnnotation.State)=>{constmodel=newChatOpenAI({model:"gpt-4o-mini",temperature:0,});constres=awaitmodel.invoke("How are you?");console.log("LOGGED UNSTREAMED MESSAGE",res.content);// Don't update the state, this is just to show a call that won't be streamedreturn{};}constagentWithNoStream=newStateGraph(StateAnnotation).addNode("unstreamed",// Add a "nostream" tag to the entire nodeRunnableLambda.from(unstreamed).withConfig({tags:["nostream"]})).addNode("agent",callModel).addNode("tools",toolNode)// Run the unstreamed node before the agent.addEdge("__start__","unstreamed").addEdge("unstreamed","agent").addConditionalEdges("agent",routeMessage).addEdge("tools","agent").compile();conststream=awaitagentWithNoStream.stream({messages:[{role:"user",content:"What's the current weather in Nepal?"}]},{streamMode:"messages"},);forawait(const[message,_metadata]ofstream){if(isAIMessageChunk(message)&&message.tool_call_chunks?.length){console.log(`${message.getType()} MESSAGE TOOL CALL CHUNK: ${message.tool_call_chunks[0].args}`);}else{console.log(`${message.getType()} MESSAGE CONTENT: ${message.content}`);}}
LOGGED UNSTREAMED MESSAGE I'm just a computer program, so I don't have feelings, but I'm here and ready to help you! How can I assist you today?ai MESSAGE TOOL CALL CHUNK: ai MESSAGE TOOL CALL CHUNK: {"ai MESSAGE TOOL CALL CHUNK: queryai MESSAGE TOOL CALL CHUNK: ":"ai MESSAGE TOOL CALL CHUNK: currentai MESSAGE TOOL CALL CHUNK: weatherai MESSAGE TOOL CALL CHUNK: inai MESSAGE TOOL CALL CHUNK: Nepalai MESSAGE TOOL CALL CHUNK: "}ai MESSAGE CONTENT: tool MESSAGE CONTENT: Cold, with a low of 3℃ai MESSAGE CONTENT: ai MESSAGE CONTENT: Theai MESSAGE CONTENT: currentai MESSAGE CONTENT: weatherai MESSAGE CONTENT: inai MESSAGE CONTENT: Nepalai MESSAGE CONTENT: isai MESSAGE CONTENT: coldai MESSAGE CONTENT: ,ai MESSAGE CONTENT: withai MESSAGE CONTENT: aai MESSAGE CONTENT: lowai MESSAGE CONTENT: temperatureai MESSAGE CONTENT: ofai MESSAGE CONTENT: ai MESSAGE CONTENT: 3ai MESSAGE CONTENT: ℃ai MESSAGE CONTENT: .ai MESSAGE CONTENT:
If you removed the tag from the "unstreamed" node, the result of the model call within would also be in the final stream.
You can also use the streamEvents method like this:
consteventStream=agent.streamEvents({messages:[{role:"user",content:"What's the weather like today?"}]},{version:"v2"},);forawait(const{event,data}ofeventStream){if(event==="on_chat_model_stream"&&isAIMessageChunk(data.chunk)){if(data.chunk.tool_call_chunks!==undefined&&data.chunk.tool_call_chunks.length>0){console.log(data.chunk.tool_call_chunks);}}}