In this chapter we're going to cover streaming and async in Langchain. Now both using async code and using streaming are incredibly important components of I think almost any conversational chat interface or at least any good conversational chat interface. For async if your application is not async and you're spending a load of time in your API whatever else waiting for LLM calls because a lot of those are behind APIs you are waiting and your application is doing nothing because you've written synchronous code and that well there are many problems with that mainly it doesn't scale so async code generally performs much better and especially for AI where a lot of the time we're kind of waiting for API calls.
So async is incredibly important for that. For streaming now streaming is a slightly different thing so let's say I want to tell me a story okay I'm using gbt4 here it's a bit slower so we can actually stream we can see that token by token this text is being produced and sent to us.
Now this is not just a visual thing this is the LLM when it is generating tokens or words it is generating them one by one and that's because these LLMs literally generate tokens one by one so they're looking at all of the previous tokens in order to generate the next one and then generate next one generate next one that's how they work.
So when we are implementing streaming we're getting that feed of tokens directly from the LLM through to our you know our back end or our front end that is what we see when when we see that token by token interface. All right so that's one thing but what one other thing that I can do that let me switch across to 4.0 is I can say okay we just got this story I'm going to ask are there any standard storytelling techniques to follow use above please use search okay so look we get this very briefly there we saw that it was searching the web and the way it's not because we told it okay we told the LLM to use the search tool but then the LLM output some tokens to say use the search tool that is going to use a search tool and it also would have output the token saying what that search query would have been although we didn't see it there but what the chat gpt interface is doing there so it received those tokens saying hey I'm going to use a search tool it doesn't just send us those tokens like it does with the standard tokens here instead it used those tokens to show us that searching the web little text box so streaming is not just the streaming of these direct tokens it's also the streaming of these intermediate steps that the LLM may be thinking through which is particularly important when it comes to agents and agentic interfaces so it's also a feature thing right streaming doesn't just look nice it's also a feature then finally of course when we're looking at this okay let's say we go back to gpt4 and i say okay use all of this information to generate a long story for me right and okay we are getting the first token now so we know something is happening we need to start reading now imagine if we were not streaming anything here and we're just waiting right we're still waiting now we're still waiting and we wouldn't see anything we're just like oh it's just blank or maybe there's a little loading spinner so we'd still be waiting and even now we're still waiting right this is an extreme example but can you imagine just waiting for so long and not seeing anything as a user right now just now we would have got our answer if we were not streaming i mean that that would be painful as a user you you'd not want to wait especially in a chat interface you don't want to wait that long it's okay with okay for example deep research takes a long time to process but you know it's going to take a long time to process and it's a different use case right you're getting a report this is a chat interface and yes most messages are not going to take that long to generate also probably not going to be using gpt4 depending on i don't know maybe some people still do but in some scenarios it's painful to need to wait that long okay and it's also the same for agents it's nice when you're using agents to get an update on okay we're using this tool it's using this tool this is how it's using them perplexity for example have a very nice example of this so okay what's this opening eye co-founder joins marati's startup let's see right so we see this is really nice it's we're using pro search it's searching for news sharing with the results like we're getting all this information as we're waiting which is really cool and it helps us understand what is actually happening right it's not needed in all use cases but it's super nice to have those intermediate steps right so then we're not waiting and i think this bit probably also stream but it was just super fast so i didn't see it but that's pretty cool so streaming is pretty important let's dive into our example okay we'll open that in collab and off we go so starting with the prerequisites same as always langchain optionally langsmith we'll also enter our langchain api key if you'd like to use langsmith we'll also enter our openai api key so that is platform.openai.com and then as usual we can just invoke our lm right so we have that it's working now let's see how we would stream with astream okay so whenever a method so stream is actually a method as well we could use that but it's not async right so whenever we see a method in linechain has a prefixed onto what would be a another method that's like the async version of this so we can actually stream using async super easily using just lm astream okay now this is just a sample and to be completely honest you probably will not be able to use this in an actual application but it's just an example and we're going to see how we would use this or how we would stream asynchronously in an application further down in this notebook so starting with this you can see here that we're getting these tokens right we're just appending it to tokens here we don't actually need to do that i don't think we're even using this but maybe we yeah we'll do it here it's fine so we're just appending the tokens as they come back from our lm penning it to this we'll see what that is in a moment and then i'm just printing the token content right so the content of the token so in this case that would be l in this case it would be lp it would be sans for so on and so on so you can see for the most parts it tends to be word level but it can also be subword level as you see sent immense is one word of course so you know they get broken up in in various ways then adding this pipe character onto the end here so we can see okay where are our individual tokens then we also have flush so flush uh you can actually turn this off and it's still gonna stream you're still gonna see everything but it's going to be a bit more you can see it's kind of a it's like bit by bit when we use flush it forces the console to update what is being shown to us immediately all right so we get a much smoother um when we're looking at this it's much smoother versus when flush is not set to true so yeah when you're printing that is good to do just so you can see you don't necessarily need to okay now we added all those tokens to the tokens list so we can have a look at each individual object that was returned to us right and this is interesting so you see that we have the ai message chunk right that's an object and then you have the content the first one's actually empty second one has that n for nlp and yeah i mean that's all we really need to know they're very simple objects but they're actually quite useful because uh just look at this right so we can add each one of our ai message chunks right let's see what that does it doesn't create a list it creates this right so we still just have one ai message chunk ah but it's combined the content within those ai message chunks which is kind of cool right so for example like we could remove these right and then we just see nlp so that's kind of nice little feature there i do i actually quite like that but uh you do need to just be a little bit careful because obviously you can do that the wrong way and you're going to get like a i don't know what that is some weird token salad so yeah you need to just make sure you are going to be merging those in the correct order unless you i don't know unless you're doing something weird okay cool so streaming that that was streaming from a lm let's have a look at streaming with agents so we it gets a bit more complicated to be completely honest but we also need to things are going to get a bit more complicated so that we can implement this in for example an api right there is it's kind of like a necessary thing in any case so to just very quickly we're going to construct our agent executor like we did in the agent execution chapter and for that for the agent executor we're going to need tools chat prompt template lm agent and the agent executor itself okay very quickly i'm not going to go through these in detail we just define our tools you know add multiply exponentiate subtract and final answer tool merge those into a single list of tools then we have our prompt template again same as before we just have system message we have chat history we have query and then we have the agent scratch pad for those intermediate steps then we define our agent using lcell lcell works quite well with both streaming and async by the way it supports both out of the box which is nice so we define our agent then coming down here we're going to create agent executor this is the same as before all right so there's nothing new in here i don't think so just initialize our agent things there then it's yeah we're looping through looping through yeah nothing nothing new there so we're just executing we're invoking our agent seeing if there's a tool call uh this is slightly we could shift this to before or after it doesn't actually matter that much so we're checking if it's final answer if not we continue we're extra tools and so on okay cool so then we can invoke that okay we go what is 10 plus 10 there we go right so we have our agent executor it is working now when we are running our agent executor with every new query if we're putting this into an api we're probably going to need to provide it with a fresh callback handler okay so this is the callback handler is what's going to handle uh taking the tokens that are being generated by a lem or agent and giving them to some other piece of code like for example the the streaming response for an api and our core handler is going to put those tokens in a queue in our case and then our for example the streaming object is going to pick them up from the queue and put them wherever they need to be so to allow us to do that with every new query rather than us needing to initialize everything when we actually initialize our agent we can add a configurable field to our lem okay so we set the configurable fields here oh also one thing is that we set streaming equal to true that's very minor thing but just so you see that there we do do that so we add some configurable fields to our lm which means we can basically pass an object in for these on every new invocation so we set our configurable field it's going to be called callbacks and we just add a description right there's nothing more to it so this will now allow us to provide that field when we're invoking our agent okay now we need to define our callback handler and as i mentioned what is basically going to be happening is this callback handler is going to be passing tokens into our async io queue object and then we're going to be picking them up from the queue elsewhere okay so we can call it a queue callback handler okay and that is inheriting from the async callback handler because we want all this to be done asynchronously because we're thinking here about okay how do we implement all this stuff within apis and actual real world code and we we do want to be doing all this in async so let me execute that and i'll just explain a little bit of what we're looking at so we have the initialization right it's nothing nothing specific here we just what we really want to be doing is we want to be setting our queue object assigning that to the class attributes and then there's also this final answer scene which we're setting to false so what we're going to be using that for is we our lm will be streaming tokens towards whilst it's using its tool calling and we might not want to display those immediately or we might want to display them in a different way so by setting this final answer scene to false whilst our lm is outputting those tool tokens we can handle them in a different way and then as soon as we see that it's done with the tool calls and it's on to the final answer which is actually another tool call but once we see that it's on to the final answer tool call we can set this true and then we can start processing our tokens in a you know different way essentially okay so we have that then we have this a iter method this is required for any async generator object so what that is going to be doing is going to iterating through right it's a generator it's going to be going iterating through and saying okay if our queue is empty right this is the queue that we set up here if it's empty wait a moment right we use the sleep method here and this is an async sleep method this is super important we're using we're awaiting for an asynchronous sleep all right so whilst we're whilst we're waiting for that 0.1 seconds our our code can be doing other things right that that is important if we if we use i think the standard is time.sleep that is not asynchronous and so it will actually block the thread for that 0.1 seconds so we don't want that to happen generally our queue should probably not be empty that frequently given how quickly tokens are going to be added to the queue so the only way that this would potentially be empty is maybe our lm stops maybe there's like a connection interruption for a you know brief second or something and no tokens are added so in that case we don't actually do anything we don't keep checking the queue we just wait a moment okay and then we check again now if it was empty we wait and then we continue on to the next iteration otherwise it probably won't be empty we get whatever is from our inside our queue we get that out pull it out then we say okay if that token is a done token we're going to return so we're going to stop this generator right we're finished otherwise if it's something else we're going to yield that token which means we're we're returning that token but then we're continuing through that loop again right so that is our generator logic then we have some other methods here these are l these are line chain specific okay we have on lm new token and we have on lm end starting with on lm new token this is basically when an lm returns a token to us line chain is going to run or execute this method okay this is the method that will be called what this is going to do is it's going to go into the keyword arguments and it's going to get the chunk object so this is coming from our lm if there is something in that chunk it's going to check for a final answer tool call first okay so we get our tool calls and we say if the name within our chunk right probably this will be empty most of the tokens will return right so you remember before when we're looking at the chunks here this is what we're looking at right the content for us is actually always going to be empty and instead we're actually going to get the additional keyword args here and inside there we're going to have our tool calling our tool calls as we saw in the previous videos right so that's what we're extracting we're extracting that information that's why we're going additional keyword args right and get those tool the tool call information right or it will be none right so if if it is none i don't think it ever would be none to be honest it would be strange if it's none i think that means something would be wrong okay so here we're using the walrus operator so the walrus operator what it's doing here is whilst we're checking the if logic here whilst we do that it's also assigning whatever is inside this it's assigning over to tool calls and then with the if we're checking where the tool calls is well something or none right because we're using get here so if if this get operation fails and there is no tool calls this object here will be equal to none which gets assigned to tool calls here and then this this if none will return false and this logic will not run okay and it will just continue if this is true so if there is something returned here we're going to check if that something returned is using the function name or tool name final answer if it is we're going to set our finance scene equal to true otherwise we're just going to add our chunk into the queue okay we use put no weight here because we're we're using async otherwise if you were not using async i think you might just put weight or maybe even put put like no okay you would you'd use put if it's just synchronous code but i i don't think i've ever implemented this synchronously so it would actually just be put no weight for async okay and then return so we have that then we have on llm end okay so this is when line chain sees that the llm has returned or indicated that it is finished with the response line chain will call this so you you have to be aware that this will happen multiple times during agent execution because if you think within our agent executor we're hitting the llm multiple times we have that first step where it's deciding oh i'm going to use the add tool or the multiply tool and then that response gets back towards we execute that tool and then we pass the output from that tool and all the the original user query in the chat history we pass that back to our llm again all right so that's another call to our llm that's going to come back it's going to finish all right it's going to give us something else right so there's multiple llm calls happening throughout our agent execution logic so this on llm call will actually get called at the end of every single one of those llm calls now if we get to the end of our llm call and it was just a it was a tool invocation so we had the you know it called the add tool we don't want to put the done token into our queue because when the done token is added to our queue we're going to stop iterating okay instead if it was just a tool call we're going to say step end right and we'll actually get this token back so this is useful on for example the front end you could have okay i've i've used the add tool these are the parameters and it's the end of the set so you could have that your tool call is being used on some front end and when as soon as it sees step end it knows okay we're done with that here was the response right and it can just show you that and we're going to use that we'll see that soon but let's say we get to the final answer tool we're on the final answer tool and then we get this signal that the llm has finished then we need to stop iterating otherwise our our stream generator is just going to keep going forever right nothing's going to stop it or maybe it will time out i don't think it will though so at that point we need to send okay stop right we need to say we're done and and then that will that will come back to here to our iterator and to our async iterator and it will return and stop the generator okay so that's the core logic that we have inside that i know there's a lot going on there it's but we need all of this so it's important to be aware of it okay so now let's see how we might actually call our agent with all this streaming in this way so we're going to initialize our queue i'm going to use that to initialize a streamer okay using the the custom streamer that we just sell custom callback handler whatever you want to call it okay then i'm going to define a function so this is an asynchronous function it has to be if we're using async and what it's going to do is it's going to call our agent with a config here and we're going to pass it that call the the callback which is the streamer right note here i'm not calling the agent executor i'm just calling the agent right so the uh if we come back up here we're calling this all right so that's not going to include all the tool execution logic and importantly we're calling the agent with the config that uses callbacks right so this this configurable fields here from our lm is actually being fed through it propagates through to our agent object as well to the runnable serializable all right so that's what we're executing here we see agent with config and we're passing in those callbacks which is just one actually okay so that sets up our agent and then we invoke it with a stream okay like we did before and we're just going to return everything so let's uh run that okay and we see all the token or the chunk objects are being returned and this is useful to understand what we're actually doing up here all right so when we're doing this chunk message additional cake keyword arguments all right we can see that in here so this would be the chunk message object we get the additional keyword logs we go into tool calls and we get the information here so we have the id for that tool call which we saw in the previous chapters then we have our function right so the function includes the name right so we know what tool we're calling from this first chunk but we don't know the arguments right those arguments are going to stream to us so we can see them begin to come through in the next chunk so next chunk is just it's just the first token for for the add function right and we can see these all come together over multiple steps and we actually get all of our arguments okay that's pretty cool so actually one thing i would like to show you here as well so if we just do token equals tokens sorry and we do tokens dot append token okay we have all of our tokens in here now all right see that they're all ai message chunks so we can actually add those together all right so let's we'll go with these here and based on these we're going to get all the arguments okay so this is kind of interesting so it's one until i think like the second to last maybe right so we have these and actually we just want to add those together so i'm going to go with tokens one and i'm just going to go four uh four token in we're going to go from the second onwards i'm going to tk plus token right and let's see what tk looks like at the end here tk okay so now you see it's kind of merged all those um arguments here sorry plus equal okay so run that and you can see here that it's merged those arguments it didn't get all of them so i kind of missed some at the end there but it's merging them right so you can see that that logic where it's you know before it was adding the content from various trunks it also does the same for the other parameters within your chunk object which is i i think it's pretty cool you can see here the name wasn't included that's because we started on token one or on token zero where the name was so if we actually started from token zero and let's just let's just pull them in there all right so from one onwards we're going to get a complete ai message chunk which includes the name here and all of those arguments and you'll you'll see also here right populate everything which is pretty cool okay so we have that now based on this we're going to want to modify our custom agent executor because we're streaming everything right so we want to add streaming inside our agent executor which we're doing here right so this is async def stream and we're sharing async for token in the a stream okay so this is like the very first instance if output is non we're just going to be adding our token so that the chunk sorry to our output like the first token becomes our output otherwise we're just appending our tokens to the output okay if the token content is empty which it should be right because we're using tool calls all the time we're just going to print content okay i just added these as so we see like print everything i just want to want to be able to see that i wouldn't expect this to run because we're saying it has to use tool calling okay so within our agent if we come up to here we said tool choice any so it's been forced to use tool calling so it should never really be returning anything inside the content field but just in case it's there right so we'll see if that is actually true then we're just getting out our tool calls information okay from our trunk and we're going to say okay if there's something in there we're going to print what is in there okay and then we're going to extract our tool name if there is some if there is a tool name i'm going to show you the tool name then we're going to get the args and if the args are not empty we're going to see what we get in there okay and then from all of this we're actually going to we merge all of it into our ai message right because we're merging everything as we're going through merging everything into outputs as i showed you before okay cool and then we're just awaiting our stream that will like kick it off okay and then we do the the standard agent executor stuff again here right so we're just pulling out tool name tool logs tool call id and then we're using all that to execute our tool here and then we're creating a new tool message and passing that back in and then also here i move the break for the final answer into the final step so that is our custom agent executor with streaming and let's see what let's see what it does okay same verbose equal true so we see all those print statements okay so you can kind of see it's a little bit messy but you can see we have tool calls that had some stuff inside it had add here and what we're printing out here is we're printing out the full ai message chunk with tool calls and then i'm just printing out okay what are we actually pulling out from from that so these are actually coming from the same thing okay and then same here all right so we're looking at the full message and then we're looking okay we're getting this argument out from it okay so we can see everything that is being pulled out you know chunk by chunk or token by token and that's it okay so we could just get everything like that however right so i'm printing everything so we can see that it's streaming what if i don't print okay so we're setting verbose or by default verbose is equal to false here so what happens if we invoke now let's see okay cool we got nothing so the reason we got nothing is we're not printing but we don't if you are if you're building an api for example you're pulling your tokens through you can't print them to your like a front end or print them as to the output of your api printing goes to your terminal or your console window it doesn't go anywhere else instead what we want to do is we actually want to get those tokens out right but if but how do we do that all right so we we printed them but another place that those tokens are is in our queue all right because we set them up to go to the queue so we can actually pull them out of our queue whilst our agent executor is running and then we can do whatever we want with them because our code is async so it can be doing multiple things at the same time so whilst our code is running the agent executor whilst that is happening our code can also be pulling out from our queue tokens are in there and sending them to like an api for example right or whatever downstream logic you have so let's see what that looks like we start by just initializing our queue initializing our streamer with that queue then we create a task so this is basically saying okay i i want to run this but don't run it right now i'm not ready yet the reason that i say i'm not ready yet is because i also want to define here my async loop which is going to be printing those tokens right but this is async right so we we set this up this is like get ready to run this because it is async this is running right this is just running like it's there it's already running so we get this we continue we continue this none of this is actually executed yet right only here when we await the task that we set up here only then does our agent executor run and our async object here begin getting tokens right and here again i'm printing but i don't need to print i could i could have like a let's say where this is within an api or something let's say i'm i'm saying okay send token to xyz token right that's sending up tokens somewhere or if we're maybe we're yielding this to our some sort of streamer object within our api right we can do whatever we want with those tokens okay i'm just printing them because i want to actually see them okay but just important here is that we're not printing them within our agent executor we're printing them outside the agent executor we've got them out and we can put them wherever we want which is perfect when you're building an actual sort of real world use case we're using an api or something else okay so let's run that let's see what we get look at that we get all of the information we could need and a little bit more right because now we're using the agent executor and now we can also see oh we have this step end right so i know or i know just from looking at this right this is my first tool use so what tool is it let's have a look it's the add tool and then we have these arguments so i can then pass them right downstream then we have the next tool use which is here down here so then we can then pass them in the way that we like so that's pretty cool let's i mean let's see right so we're we're getting those fingers out can we can we do something with them before i before i print them and show them yes let's see okay so we're now modifying our our loop here same stuff right we're still initializing our queue initializing our streamer initializing our tasks okay and we're still doing this async for token streamer okay but then we're doing stuff with our tokens so i'm saying okay if we're on stream end i'm not actually going to print stream and i'm going to print new line okay otherwise if we're getting a tool call here we're going to say if that tool call is the tool name i am going to print calling tool name okay if it's the arguments i'm going to print the tool argument and i'm going to end that with nothing so that we don't go onto a new line so we're actually going to be streaming everything okay so let's just see what this looks like oh my bad i just added that okay you see that so it goes it goes very fast so it's kind of hard to see it i'm going to slow it down so you can see so you can see that we as soon as we get the tool name we stream that we're calling the add tool then we stream token by token the actual arguments for that tool then for the next one again we do the same we're calling this tool name then we're streaming token by token again we're processing everything downstream from outside of the agent executor and this is an essential thing to be able to do when we're actually implementing streaming and async and everything else in an actual application so i know that's a lot but it's important so that is it for our chapter on streaming and async i hope it's all been useful thanks you you