back to index

LangChain Streaming and API Integration


Chapters

0:0 LangChain Streaming
0:54 Streaming for AI
6:8 Basic LangChain Streaming
10:16 Streaming with Agents
28:13 Custom Agent and Streaming
31:13 Streaming to an API

Whisper Transcript | Transcript Only Page

00:00:00.000 | In this chapter we're going to cover streaming and async in Langchain. Now both using async code
00:00:06.800 | and using streaming are incredibly important components of I think almost any conversational
00:00:15.700 | chat interface or at least any good conversational chat interface. For async if your application is
00:00:24.660 | not async and you're spending a load of time in your API whatever else waiting for LLM calls
00:00:31.800 | because a lot of those are behind APIs you are waiting and your application is doing nothing
00:00:37.740 | because you've written synchronous code and that well there are many problems with that
00:00:43.220 | mainly it doesn't scale so async code generally performs much better and especially for AI where
00:00:51.500 | a lot of the time we're kind of waiting for API calls. So async is incredibly important
00:00:56.000 | for that. For streaming now streaming is a slightly different thing so let's say I want to tell
00:01:04.140 | me a story okay I'm using gbt4 here it's a bit slower so we can actually stream we can see
00:01:10.960 | that token by token this text is being produced and sent to us. Now this is not just a visual
00:01:16.820 | thing this is the LLM when it is generating tokens or words it is generating them one by one and that's
00:01:27.260 | because these LLMs literally generate tokens one by one so they're looking at all of the previous
00:01:33.060 | tokens in order to generate the next one and then generate next one generate next one that's how they
00:01:37.500 | work. So when we are implementing streaming we're getting that feed of tokens directly from the LLM
00:01:46.260 | through to our you know our back end or our front end that is what we see when when we see that token
00:01:51.700 | by token interface. All right so that's one thing but what one other thing that I can do that let me switch
00:01:58.960 | across to 4.0 is I can say okay we just got this story I'm going to ask are there any
00:02:08.860 | standard storytelling techniques to follow use above please use search
00:02:19.820 | okay so look we get this very briefly there we saw that it was searching the web and the way it's not
00:02:32.200 | because we told it okay we told the LLM to use the search tool but then the LLM output some tokens to say
00:02:40.780 | use the search tool that is going to use a search tool and it also would have output the token saying
00:02:47.360 | what that search query would have been although we didn't see it there
00:02:50.320 | but what the chat gpt interface is doing there so it received those tokens saying hey I'm going to use
00:02:58.820 | a search tool it doesn't just send us those tokens like it does with the standard tokens here instead
00:03:03.740 | it used those tokens to show us that searching the web little text box so streaming is not just
00:03:12.960 | the streaming of these direct tokens it's also the streaming of these intermediate steps that the LLM may
00:03:22.320 | be thinking through which is particularly important when it comes to agents and agentic interfaces so it's
00:03:30.420 | also a feature thing right streaming doesn't just look nice it's also a feature then finally of course
00:03:37.240 | when we're looking at this okay let's say we go back to gpt4 and i say okay use all of this information
00:03:49.960 | to generate a long story for me right and okay we are getting the first token now so we know something is
00:04:03.420 | happening we need to start reading now imagine if we were not streaming anything here and we're just
00:04:09.160 | waiting right we're still waiting now we're still waiting and we wouldn't see anything we're just like
00:04:14.760 | oh it's just blank or maybe there's a little loading spinner so we'd still be waiting
00:04:18.940 | and even now we're still waiting right this is an extreme example but can you imagine just waiting
00:04:32.040 | for so long and not seeing anything as a user right now just now we would have got our answer if we were
00:04:38.980 | not streaming i mean that that would be painful as a user you you'd not want to wait especially in a chat
00:04:45.800 | interface you don't want to wait that long it's okay with okay for example deep research takes a long
00:04:52.460 | time to process but you know it's going to take a long time to process and it's a different use case
00:04:57.340 | right you're getting a report this is a chat interface and yes most messages are not going to take that long
00:05:05.480 | to generate also probably not going to be using gpt4 depending on i don't know maybe some people still do
00:05:12.340 | but in some scenarios it's painful to need to wait that long okay and it's also the same for agents
00:05:19.940 | it's nice when you're using agents to get an update on okay we're using this tool it's using this tool
00:05:25.180 | this is how it's using them perplexity for example have a very nice example of this so okay what's this
00:05:33.000 | opening eye co-founder joins marati's startup let's see right so we see this is really nice it's we're using
00:05:38.840 | pro search it's searching for news sharing with the results like we're getting all this information
00:05:44.160 | as we're waiting which is really cool and it helps us understand what is actually happening
00:05:50.580 | right it's not needed in all use cases but it's super nice to have those intermediate steps
00:05:55.740 | right so then we're not waiting and i think this bit probably also stream but it was just super fast
00:06:01.040 | so i didn't see it but that's pretty cool so streaming is pretty important let's dive into
00:06:09.380 | our example okay we'll open that in collab and off we go so starting with the prerequisites same as
00:06:16.340 | always langchain optionally langsmith we'll also enter our langchain api key if you'd like to use
00:06:23.200 | langsmith we'll also enter our openai api key so that is platform.openai.com and then as usual we can
00:06:31.480 | just invoke our lm right so we have that it's working now let's see how we would stream with
00:06:39.000 | astream okay so whenever a method so stream is actually a method as well we could use that but
00:06:46.480 | it's not async right so whenever we see a method in linechain has a prefixed onto what would be a
00:06:53.020 | another method that's like the async version of this so we can actually stream using async super easily
00:07:03.600 | using just lm astream okay now this is just a sample and to be completely honest you probably will
00:07:14.180 | not be able to use this in an actual application but it's just an example and we're going to see
00:07:19.200 | how we would use this or how we would stream asynchronously in an application further down in this
00:07:25.840 | notebook so starting with this you can see here that we're getting these tokens right we're just
00:07:32.700 | appending it to tokens here we don't actually need to do that i don't think we're even using this but
00:07:36.700 | maybe we yeah we'll do it here it's fine so we're just appending the tokens as they come back
00:07:43.240 | from our lm penning it to this we'll see what that is in a moment and then i'm just printing
00:07:49.500 | the token content right so the content of the token so in this case that would be l in this case it would
00:07:56.640 | be lp it would be sans for so on and so on so you can see for the most parts it tends to be word level
00:08:04.480 | but it can also be subword level as you see sent immense is one word of course so you know they get
00:08:11.620 | broken up in in various ways
00:08:13.740 | then adding this pipe character onto the end here so we can see okay where are our individual tokens
00:08:21.740 | then we also have flush so flush uh you can actually turn this off and it's still gonna stream you're still gonna see
00:08:27.740 | everything but it's going to be a bit more you can see it's kind of a it's like bit by bit when we use
00:08:34.000 | flush it forces the console to update what is being shown to us immediately all right so we get a much
00:08:42.340 | smoother um when we're looking at this it's much smoother versus when flush is not set to true so
00:08:49.940 | yeah when you're printing that is good to do just so you can see you don't necessarily need to okay
00:08:55.220 | now we added all those tokens to the tokens list so we can have a look at each individual object that
00:09:01.820 | was returned to us right and this is interesting so you see that we have the ai message chunk right
00:09:07.660 | that's an object and then you have the content the first one's actually empty
00:09:10.920 | second one has that n for nlp and yeah i mean that's all we really need to know they're very simple
00:09:18.820 | objects but they're actually quite useful because uh just look at this right so we can add each one of
00:09:26.660 | our ai message chunks right let's see what that does it doesn't create a list it creates this right
00:09:32.560 | so we still just have one ai message chunk ah but it's combined the content within those ai message
00:09:41.080 | chunks which is kind of cool right so for example like we could remove these
00:09:46.260 | right and then we just see nlp so that's kind of nice little feature there i do i actually quite like
00:09:54.620 | that but uh you do need to just be a little bit careful because obviously you can do that the wrong
00:10:00.660 | way and you're going to get like a i don't know what that is some weird token salad so yeah you need to
00:10:08.280 | just make sure you are going to be merging those in the correct order unless you i don't know unless
00:10:14.420 | you're doing something weird okay cool so streaming that that was streaming from a lm let's have a look
00:10:21.320 | at streaming with agents so we it gets a bit more complicated to be completely honest but we also
00:10:30.240 | need to things are going to get a bit more complicated so that we can implement this in
00:10:35.420 | for example an api right there is it's kind of like a necessary thing in any case so to just very
00:10:44.340 | quickly we're going to construct our agent executor like we did in the agent execution chapter and for
00:10:51.320 | that for the agent executor we're going to need tools chat prompt template lm agent and the agent
00:10:57.360 | executor itself okay very quickly i'm not going to go through these in detail we just define our tools
00:11:03.040 | you know add multiply exponentiate subtract and final answer tool merge those into a single list
00:11:09.340 | of tools then we have our prompt template again same as before we just have system message we have chat
00:11:16.460 | history we have query and then we have the agent scratch pad for those intermediate steps then we
00:11:24.300 | define our agent using lcell lcell works quite well with both streaming and async by the way it supports
00:11:31.740 | both out of the box which is nice so we define our agent then coming down here we're going to create
00:11:41.580 | agent executor this is the same as before all right so there's nothing new in here i don't think so just
00:11:47.900 | initialize our agent things there then it's yeah we're looping through looping through yeah nothing
00:11:57.500 | nothing new there so we're just executing we're invoking our agent seeing if there's a tool call uh this
00:12:05.740 | is slightly we could shift this to before or after it doesn't actually matter that much
00:12:10.700 | so we're checking if it's final answer if not we continue we're extra tools and so on okay cool
00:12:18.140 | so then we can invoke that okay we go what is 10 plus 10
00:12:25.340 | there we go right so we have our agent executor it is working now when we are running our agent executor with
00:12:37.580 | every new query if we're putting this into an api we're probably going to need to provide it with a
00:12:44.940 | fresh callback handler okay so this is the callback handler is what's going to handle uh taking the
00:12:51.100 | tokens that are being generated by a lem or agent and giving them to some other piece of code like for
00:12:57.740 | example the the streaming response for an api and our core handler is going to put those tokens in a queue in
00:13:07.740 | our case and then our for example the streaming object is going to pick them up from the queue and put them
00:13:14.220 | wherever they need to be so to allow us to do that with every new query rather than us needing to initialize
00:13:23.020 | everything when we actually initialize our agent we can add a configurable field to our lem okay
00:13:30.300 | so we set the configurable fields here oh also one thing is that we set streaming equal to true that's
00:13:36.620 | very minor thing but just so you see that there we do do that so we add some configurable fields to our
00:13:43.180 | lm which means we can basically pass an object in for these on every new invocation so we set our
00:13:51.340 | configurable field it's going to be called callbacks and we just add a description right there's nothing
00:13:56.780 | more to it so this will now allow us to provide that field when we're invoking our agent okay now we need
00:14:07.820 | to define our callback handler and as i mentioned what is basically going to be happening is this callback
00:14:14.380 | handler is going to be passing tokens into our async io queue object and then we're going to be picking
00:14:22.140 | them up from the queue elsewhere okay so we can call it a queue callback handler okay and that is
00:14:28.780 | inheriting from the async callback handler because we want all this to be done asynchronously because
00:14:33.980 | we're thinking here about okay how do we implement all this stuff within apis and actual real world code
00:14:39.820 | and we we do want to be doing all this in async so let me execute that and i'll just explain a little
00:14:46.380 | bit of what we're looking at so we have the initialization right it's nothing nothing specific
00:14:52.380 | here we just what we really want to be doing is we want to be setting our queue object assigning that to
00:14:58.540 | the class attributes and then there's also this final answer scene which we're setting to false so
00:15:05.580 | what we're going to be using that for is we our lm will be streaming tokens towards whilst it's using its
00:15:14.140 | tool calling and we might not want to display those immediately or we might want to display them in a
00:15:19.100 | different way so by setting this final answer scene to false whilst our lm is outputting those tool
00:15:28.540 | tokens we can handle them in a different way and then as soon as we see that it's done with the tool
00:15:34.060 | calls and it's on to the final answer which is actually another tool call but once we see that it's
00:15:37.980 | on to the final answer tool call we can set this true and then we can start processing our tokens in
00:15:44.460 | a you know different way essentially okay so we have that then we have this a iter method this is required
00:15:52.860 | for any async generator object so what that is going to be doing is going to iterating through right it's
00:16:00.940 | a generator it's going to be going iterating through and saying okay if our queue is empty right this is
00:16:06.940 | the queue that we set up here if it's empty wait a moment right we use the sleep method here and this
00:16:13.900 | is an async sleep method this is super important we're using we're awaiting for an asynchronous sleep
00:16:20.940 | all right so whilst we're whilst we're waiting for that 0.1 seconds our our code can be doing other
00:16:27.820 | things right that that is important if we if we use i think the standard is time.sleep that is not
00:16:35.100 | asynchronous and so it will actually block the thread for that 0.1 seconds so we don't want that to happen
00:16:41.900 | generally our queue should probably not be empty that frequently given how quickly tokens are going
00:16:47.900 | to be added to the queue so the only way that this would potentially be empty is maybe our lm stops maybe
00:16:56.860 | there's like a connection interruption for a you know brief second or something and no tokens are added so in
00:17:03.020 | that case we don't actually do anything we don't keep checking the queue we just wait a moment okay
00:17:08.380 | and then we check again now if it was empty we wait and then we continue on to the next iteration
00:17:15.500 | otherwise it probably won't be empty we get whatever is from our inside our queue we get that out pull it out
00:17:23.580 | then we say okay if that token is a done token we're going to return so we're going to stop this generator
00:17:33.020 | right we're finished otherwise if it's something else we're going to yield that token which means we're
00:17:39.180 | we're returning that token but then we're continuing through that loop again right so that is our generator
00:17:46.860 | logic then we have some other methods here these are l these are line chain specific
00:17:53.340 | okay we have on lm new token and we have on lm end starting with on lm new token this is basically
00:18:01.900 | when an lm returns a token to us line chain is going to run or execute this method okay this is the method
00:18:09.900 | that will be called what this is going to do is it's going to go into the keyword arguments and it's
00:18:15.420 | going to get the chunk object so this is coming from our lm if there is something in that chunk
00:18:21.420 | it's going to check for a final answer tool call first okay so we get our tool calls and we say
00:18:29.260 | if the name within our chunk right probably this will be empty most of the tokens will return
00:18:35.820 | right so you remember before when we're looking at the chunks here this is what we're looking at right
00:18:41.580 | the content for us is actually always going to be empty and instead we're actually going to get the
00:18:46.140 | additional keyword args here and inside there we're going to have our tool calling our tool calls as we
00:18:52.300 | saw in the previous videos right so that's what we're extracting we're extracting that information
00:18:58.140 | that's why we're going additional keyword args right and get those tool the tool call information
00:19:05.100 | right or it will be none right so if if it is none i don't think it ever would be none to be honest it
00:19:11.340 | would be strange if it's none i think that means something would be wrong okay so here we're using
00:19:16.460 | the walrus operator so the walrus operator what it's doing here is whilst we're checking the if logic here
00:19:24.860 | whilst we do that it's also assigning whatever is inside this it's assigning over to tool calls
00:19:31.340 | and then with the if we're checking where the tool calls is well something or none right because we're
00:19:38.380 | using get here so if if this get operation fails and there is no tool calls this object here will be
00:19:45.660 | equal to none which gets assigned to tool calls here and then this this if none will return false and this
00:19:53.660 | logic will not run okay and it will just continue if this is true so if there is something returned here
00:20:00.460 | we're going to check if that something returned is using the function name or tool name final answer if it
00:20:06.620 | is we're going to set our finance scene equal to true otherwise we're just going to add our chunk into
00:20:13.100 | the queue okay we use put no weight here because we're we're using async otherwise if you were not
00:20:18.860 | using async i think you might just put weight or maybe even put put like no okay you would you'd use
00:20:26.220 | put if it's just synchronous code but i i don't think i've ever implemented this synchronously so it would
00:20:32.780 | actually just be put no weight for async okay and then return so we have that then we have on llm end
00:20:41.900 | okay so this is when line chain sees that the llm has returned or indicated that it is finished with
00:20:50.620 | the response line chain will call this so you you have to be aware that this will happen multiple times
00:21:00.300 | during agent execution because if you think within our agent executor we're hitting the llm multiple
00:21:08.460 | times we have that first step where it's deciding oh i'm going to use the add tool or the multiply tool
00:21:14.700 | and then that response gets back towards we execute that tool and then we pass the output from that tool
00:21:21.340 | and all the the original user query in the chat history we pass that back to our llm again
00:21:25.980 | all right so that's another call to our llm that's going to come back it's going to finish
00:21:29.820 | all right it's going to give us something else right so there's multiple llm calls happening
00:21:34.300 | throughout our agent execution logic so this on llm call will actually get called at the end of every
00:21:41.340 | single one of those llm calls now if we get to the end of our llm call and it was just a it was a tool
00:21:50.060 | invocation so we had the you know it called the add tool we don't want to put the done token into our queue
00:21:59.180 | because when the done token is added to our queue we're going to stop iterating okay
00:22:04.540 | instead if it was just a tool call we're going to say step end right and we'll actually get this token
00:22:12.140 | back so this is useful on for example the front end you could have okay i've i've used the add tool
00:22:20.220 | these are the parameters and it's the end of the set so you could have that your tool call is being
00:22:27.020 | used on some front end and when as soon as it sees step end it knows okay we're done with that here was
00:22:31.900 | the response right and it can just show you that and we're going to use that we'll see that soon but
00:22:38.300 | let's say we get to the final answer tool we're on the final answer tool and then we get this signal
00:22:43.340 | that the llm has finished then we need to stop iterating otherwise our our stream generator is
00:22:52.060 | just going to keep going forever right nothing's going to stop it or maybe it will time out i don't
00:22:56.540 | think it will though so at that point we need to send okay stop right we need to say we're done and
00:23:04.300 | and then that will that will come back to here to our iterator and to our async iterator and it will
00:23:11.660 | return and stop the generator okay so that's the core logic that we have inside that i know there's a
00:23:19.340 | lot going on there it's but we need all of this so it's important to be aware of it okay so now let's see
00:23:27.500 | how we might actually call our agent with all this streaming in this way so we're going to initialize
00:23:37.100 | our queue i'm going to use that to initialize a streamer okay using the the custom streamer that
00:23:42.860 | we just sell custom callback handler whatever you want to call it okay then i'm going to define a
00:23:48.940 | function so this is an asynchronous function it has to be if we're using async and what it's going to do is
00:23:54.700 | it's going to call our agent with a config here and we're going to pass it that call the the callback
00:24:02.860 | which is the streamer right note here i'm not calling the agent executor i'm just calling the agent
00:24:07.660 | right so the uh if we come back up here we're calling this all right so that's not going to include all the
00:24:15.020 | tool execution logic and importantly we're calling the agent with the config that uses callbacks right
00:24:23.660 | so this this configurable fields here from our lm is actually being fed through it propagates through to
00:24:28.460 | our agent object as well to the runnable serializable all right so that's what we're executing here we see
00:24:35.500 | agent with config and we're passing in those callbacks which is just one actually okay so that sets up our
00:24:41.980 | agent and then we invoke it with a stream okay like we did before and we're just going to return everything
00:24:48.860 | so let's uh run that okay and we see all the token or the chunk objects are being returned and this is
00:24:57.100 | useful to understand what we're actually doing up here all right so when we're doing this chunk message
00:25:04.140 | additional cake keyword arguments all right we can see that in here so this would be the chunk message
00:25:09.260 | object we get the additional keyword logs we go into tool calls and we get the information here so we
00:25:15.820 | have the id for that tool call which we saw in the previous chapters then we have our function right so the
00:25:23.660 | function includes the name right so we know what tool we're calling from this first chunk but we don't
00:25:29.580 | know the arguments right those arguments are going to stream to us so we can see them begin to come
00:25:35.100 | through in the next chunk so next chunk is just it's just the first token for for the add function
00:25:42.380 | right and we can see these all come together over multiple steps and we actually get all of our arguments
00:25:50.060 | okay that's pretty cool so actually one thing i would like to show you here as well so if we just do token
00:25:58.380 | equals tokens sorry and we do
00:26:04.700 | tokens dot append token
00:26:09.660 | okay we have all of our tokens in here now all right see that they're all ai message chunks
00:26:19.260 | so we can actually add those together all right so let's we'll go with these
00:26:24.780 | here and based on these we're going to get all the arguments okay so this is kind of interesting
00:26:30.060 | so it's one until i think like the second to last maybe
00:26:35.660 | right so we have these and actually we just want to add those together so i'm going to go with tokens
00:26:46.620 | one and i'm just going to go four uh four token in we're going to go from the second onwards i'm going
00:27:00.380 | to tk plus token right and let's see what tk looks like at the end here tk
00:27:11.020 | okay so now you see it's kind of merged all those um arguments here sorry plus equal okay so run that
00:27:20.700 | and you can see here that it's merged those arguments it didn't get all of them so i kind of missed some
00:27:24.940 | at the end there but it's merging them right so you can see that that logic where it's you know before it
00:27:30.220 | was adding the content from various trunks it also does the same for the other parameters within your
00:27:37.580 | chunk object which is i i think it's pretty cool you can see here the name wasn't included that's
00:27:43.020 | because we started on token one or on token zero where the name was so if we actually started from token
00:27:49.740 | zero and let's just let's just pull them in there all right so from one onwards
00:27:55.660 | we're going to get a complete ai message chunk which includes the name here and all of those arguments
00:28:04.860 | and you'll you'll see also here right populate everything which is pretty cool
00:28:08.860 | okay so we have that now based on this we're going to want to modify our custom agent
00:28:16.700 | executor because we're streaming everything right so we want to add streaming inside our agent executor
00:28:24.220 | which we're doing here right so this is async def stream and we're sharing async for token in
00:28:30.460 | the a stream okay so this is like the very first instance if output is non we're just going to be
00:28:38.460 | adding our token so that the chunk sorry to our output like the first token becomes our output
00:28:46.380 | otherwise we're just appending our tokens to the output okay if the token content is empty which it should be
00:28:55.980 | right because we're using tool calls all the time we're just going to print content okay i just added
00:29:00.940 | these as so we see like print everything i just want to want to be able to see that i wouldn't expect
00:29:06.700 | this to run because we're saying it has to use tool calling okay so within our agent if we come up to here
00:29:14.940 | we said tool choice any so it's been forced to use tool calling so it should never really be returning
00:29:20.380 | anything inside the content field but just in case it's there right so we'll see if that is actually
00:29:26.140 | true then we're just getting out our tool calls information okay from our trunk and we're going
00:29:32.300 | to say okay if there's something in there we're going to print what is in there okay and then we're
00:29:36.060 | going to extract our tool name if there is some if there is a tool name i'm going to show you the tool name
00:29:40.860 | then we're going to get the args and if the args are not empty we're going to see what we get in there okay
00:29:49.180 | and then from all of this we're actually going to we merge all of it into our ai message right because
00:29:54.780 | we're merging everything as we're going through merging everything into outputs as i showed you
00:29:58.700 | before okay cool and then we're just awaiting our stream that will like kick it off okay and then we
00:30:04.540 | do the the standard agent executor stuff again here right so we're just pulling out tool name tool logs
00:30:10.620 | tool call id and then we're using all that to execute our tool here and then we're creating a new tool
00:30:15.980 | message and passing that back in and then also here i move the break for the final answer into the final
00:30:23.260 | step so that is our custom agent executor with streaming and let's see what let's see what it does
00:30:29.740 | okay same verbose equal true so we see all those print statements
00:30:36.460 | okay so you can kind of see it's a little bit messy but you can see we have tool calls that had some
00:30:44.060 | stuff inside it had add here and what we're printing out here is we're printing out the full ai message
00:30:50.060 | chunk with tool calls and then i'm just printing out okay what are we actually pulling out from from that
00:30:55.580 | so these are actually coming from the same thing okay and then same here all right so we're looking at the
00:31:00.780 | full message and then we're looking okay we're getting this argument out from it okay so we can see
00:31:06.620 | everything that is being pulled out you know chunk by chunk or token by token and that's it okay so we could
00:31:14.300 | just get everything like that however right so i'm printing everything so we can see that it's streaming what if i don't print
00:31:20.940 | okay so we're setting verbose or by default verbose is equal to false here so what happens if we invoke now
00:31:29.660 | let's see
00:31:35.340 | cool we got nothing so the reason we got nothing is we're not printing but we don't if you are
00:31:50.540 | if you're building an api for example you're pulling your tokens through you can't print them
00:31:57.420 | to your like a front end or print them as to the output of your api printing goes to your terminal
00:32:07.740 | or your console window it doesn't go anywhere else instead what we want to do is we actually want to get
00:32:15.420 | those tokens out right but if but how do we do that all right so we we printed them but another place
00:32:23.180 | that those tokens are is in our queue all right because we set them up to go to the queue
00:32:28.460 | so we can actually pull them out of our queue whilst our agent executor is running and then we can do whatever
00:32:39.100 | we want with them because our code is async so it can be doing multiple things at the same time
00:32:43.180 | so whilst our code is running the agent executor
00:32:46.540 | whilst that is happening our code can also be pulling out from our queue tokens are in there and sending them
00:32:55.180 | to like an api for example right or whatever downstream logic you have so let's see what that
00:33:02.860 | looks like we start by just initializing our queue initializing our streamer with that queue then we
00:33:08.620 | create a task so this is basically saying okay i i want to run this but don't run it right now i'm not ready yet
00:33:16.140 | the reason that i say i'm not ready yet is because i also want to define here my async loop which is
00:33:23.900 | going to be printing those tokens right but this is async right so we we set this up this is like get ready
00:33:30.540 | to run this because it is async this is running right this is just running like it's there it's already
00:33:37.180 | running so we get this we continue we continue this none of this is actually executed yet
00:33:42.780 | right only here when we await the task that we set up here only then does our agent executor
00:33:52.220 | run and our async object here begin getting tokens right and here again i'm printing but i don't need to
00:34:02.140 | print i could i could have like a let's say where this is within an api or something
00:34:07.580 | let's say i'm i'm saying okay send token to xyz token right that's sending up tokens somewhere or if we're
00:34:20.540 | maybe we're yielding this to our some sort of streamer object within our api right we can do whatever we
00:34:26.780 | want with those tokens okay i'm just printing them because i want to actually see them okay
00:34:32.620 | but just important here is that we're not printing them within our agent executor we're printing them
00:34:38.380 | outside the agent executor we've got them out and we can put them wherever we want which is perfect
00:34:44.300 | when you're building an actual sort of real world use case we're using an api or something else okay so
00:34:49.100 | let's run that let's see what we get look at that we get all of the information we could need
00:34:55.900 | and a little bit more right because now we're using the agent executor and now we can also see
00:35:02.540 | oh we have this step end right so i know or i know just from looking at this right this is my first
00:35:09.180 | tool use so what tool is it let's have a look it's the add tool and then we have these arguments so i can
00:35:16.380 | then pass them right downstream then we have the next tool use which is here down here so then we can
00:35:24.620 | then pass them in the way that we like so that's pretty cool let's i mean let's see right so we're
00:35:33.500 | we're getting those fingers out can we can we do something with them before i before i print them
00:35:39.180 | and show them yes let's see okay so we're now modifying our our loop here same stuff right we're
00:35:47.500 | still initializing our queue initializing our streamer initializing our tasks okay and we're still doing
00:35:53.180 | this async for token streamer okay but then we're doing stuff with our tokens so i'm saying okay if we're on
00:36:01.500 | stream end i'm not actually going to print stream and i'm going to print new line okay otherwise if
00:36:08.220 | we're getting a tool call here we're going to say if that tool call is the tool name i am going to print
00:36:16.300 | calling tool name okay if it's the arguments i'm going to print the tool argument and i'm going to end
00:36:23.820 | that with nothing so that we don't go onto a new line so we're actually going to be streaming everything
00:36:28.780 | okay so let's just see what this looks like
00:36:30.860 | oh my bad i just added that
00:36:39.820 | you see that so it goes it goes very fast so it's kind of hard to see it i'm going to slow
00:36:47.020 | it down so you can see so you can see that we as soon as we get the tool name we stream that we're
00:36:53.660 | calling the add tool then we stream token by token the actual arguments for that tool
00:36:58.460 | then for the next one again we do the same we're calling this tool name then we're streaming token
00:37:04.460 | by token again we're processing everything downstream from outside of the agent executor and this is an
00:37:11.580 | essential thing to be able to do when we're actually implementing streaming and async and everything
00:37:18.780 | else in an actual application so i know that's a lot but it's important so that is it for our chapter
00:37:28.940 | on streaming and async i hope it's all been useful thanks