back to indexStreaming for LangChain Agents + FastAPI
Chapters
0:0 Streaming for LLMs and Agents
1:38 Simple StdOut Streaming in LangChain
5:3 Streaming with LangChain Agents
7:56 Final Output Streaming
10:2 Custom Callback Handlers in LangChain
14:16 FastAPI with LangChain Agent Streaming
22:56 Confirming we have Agent Streaming
24:32 Custom Callback Handlers for Async
26:26 Final Things to Consider
00:00:00.000 |
Today we're going to learn how to do streaming with LangChain. 00:00:04.000 |
Now streaming is a very popular feature for large language models and chatbots, 00:00:09.120 |
but it can be quite complicated to implement or at least confusing to get started with. 00:00:16.000 |
Now for those of you that are maybe not aware of streaming, 00:00:18.880 |
it's essentially when you are talking to an LLM or a chatbot 00:00:22.320 |
and it is loading the output token by token or word by word. 00:00:28.480 |
Now the whole point of using streaming in the first place is that, 00:00:32.080 |
particularly if you're generating a lot of text, 00:00:35.280 |
you can begin showing the user that text and they can begin reading sooner. 00:00:41.520 |
Now implementing streaming, it can be very simple 00:00:46.400 |
if you are implementing it for a simple use case, 00:00:49.600 |
but it begins to get more difficult first when you start using LangChain 00:00:54.240 |
and then secondly if we begin using agents it gets a little more complicated. 00:00:59.520 |
Then if we take it a little further and maybe we want to 00:01:02.560 |
stream the data from our agent through to our own API, 00:01:07.360 |
then again things get a little more complicated. 00:01:11.360 |
But in this video we're going to go through all of that, 00:01:13.360 |
so by the end of this we'll actually have an agent in LangChain 00:01:24.400 |
Now the simplest form of streaming that we can do is 00:01:27.680 |
basically just printing out to the terminal or as you'll see in a Jupyter notebook. 00:01:33.120 |
To achieve that level of streaming it's very easy. 00:01:38.240 |
So we use these two parameters when we're initializing our LLM. 00:01:48.320 |
I'm initializing it as usual. These are all typical parameters that we'd use. 00:01:51.840 |
But I'm also adding in this streaming and this callbacks parameter. 00:01:56.000 |
Streaming is I think pretty obvious what it does. 00:01:59.040 |
It just switches on streaming and it's also worth noting that 00:02:03.200 |
this will only work with certain LLMs, not all LLMs will support streaming. 00:02:11.760 |
And then we also need to pass this callbacks parameter. 00:02:14.880 |
Okay and I think this is probably the more interesting part here. 00:02:18.880 |
So this here is what handles the streaming, all right. 00:02:23.760 |
This handles, you can see that it says streaming, right, okay it's streaming. 00:02:30.720 |
So the standard output is basically like a print. 00:02:33.360 |
And we can even see what it's doing by heading over to its definition here. 00:02:40.320 |
So you can come to here and we see this method on LLM new token. 00:02:45.680 |
When this is executed it's going to print new token, okay. 00:02:51.840 |
So that's all we're doing by adding this callback in here. 00:02:56.400 |
With every new generated token it is going to be taken to the standard output. 00:03:06.000 |
Also of course you have to put your API key here from OpenAI. 00:03:10.720 |
Now if we run this, okay, so like we would in LangChain normally. 00:03:16.000 |
So we create our human message and we just pass it to our LLM. 00:03:20.000 |
You will see straight away we have streaming, right. 00:03:27.520 |
So if we didn't enable streaming there it would have taken 2.2 seconds to show anything, right. 00:03:34.240 |
It would have basically come out with this at the bottom here. 00:03:37.520 |
But you saw that it kind of went through token by token printing them to our output, right. 00:03:48.240 |
Now this is really good especially when we have more, you know, long outputs. 00:03:53.840 |
So maybe we say, you know, tell me a long sorry. 00:03:59.520 |
And if we run it again it's going to take a little more time. 00:04:04.480 |
You can see, you know, we're kind of going through and obviously if we were printing 00:04:08.240 |
out the whole thing nicely we'd be able to kind of follow it and read as it's going. 00:04:14.400 |
But you can see that if we were not doing streaming here, you know, it keeps going on. 00:04:23.920 |
We would be waiting that whole time just to begin reading, right. 00:04:30.880 |
If we didn't use streaming we'd have to wait that whole time to begin reading. 00:04:34.720 |
But with streaming we can begin reading earlier. 00:04:38.480 |
Which obviously is a nice little feature to have. 00:04:46.160 |
Using an LLM and streaming to, you know, your terminal or the Jupyter Network output is, 00:04:56.800 |
It begins to get a bit more complicated as soon as you start adding in more logic. 00:05:01.760 |
So let's take a look at how we might do it for an agent in Lightning Chain. 00:05:07.280 |
So we're just going to initialize an agent here. 00:05:15.840 |
It's going to be a conversational React agent. 00:05:21.760 |
So because we're using the same LLM we already have our callback, 00:05:26.720 |
our streaming standard output callback handler that is already included within our agent. 00:05:34.960 |
One thing that you will or that you should do here is make sure you return intermediate steps 00:05:41.360 |
as false because that will trigger issues if you're trying to call things back. 00:05:46.880 |
And you can actually pull back those intermediate steps by parsing what is being output to you 00:05:52.960 |
And I also set verbose equal to true so that we can actually get all of the outputs. 00:05:59.440 |
So after initializing that, I'm going to go to, I'm going to create this prompt. 00:06:03.120 |
It's just a string this time rather than the messages. 00:06:06.000 |
That's just because we're using an agent rather than the LLM directly. 00:06:16.560 |
And you see that it actually, you know, it's streamed and it output the entire, you know, 00:06:23.680 |
So whereas before we kind of just had this bit with a agent, we have more because an 00:06:32.720 |
It's like an added little bit of logic around the LLM. 00:06:37.120 |
And in order for it to correctly parse the, what the LLM wants to do, it asks it to return 00:06:44.320 |
the output from the LLM in this JSON format with an action and an action input. 00:06:50.160 |
Now, this is useful because, you know, sometimes maybe we want to use a tool using our agent, 00:06:58.640 |
or we want to go through multiple, almost like thinking steps, right? 00:07:04.640 |
We're going to use the calculator tool or the LLM math tool that we created. 00:07:09.200 |
So we're going to say, what is the square root of 71? 00:07:11.920 |
We run this and you see it's using the calculator action, right? 00:07:16.480 |
And from that, it basically uses this calculator tool and it puts in this value here. 00:07:25.760 |
And from that, it gets this answer, which it can then feed back to us in the final answer. 00:07:31.520 |
And when we ran that, it's going to stream the, you know, the calculator. 00:07:40.720 |
I'm actually not sure why it does this, but anyway, and then it streams the final answer. 00:07:50.240 |
We can extract the final answer and just do it in a cleaner way. 00:07:54.160 |
But how do we do that in a cleaner way, right? 00:07:57.760 |
Because right now it's just outputting everything to us. 00:08:03.280 |
We can either use, there's a lang chain callback handler built specifically for outputting the 00:08:17.680 |
And the other option is we can create a custom callback handler. 00:08:22.560 |
And I would say the custom callback handler is, well, it's more flexible, of course. 00:08:27.840 |
It just means it requires a little bit more extra work on our side. 00:08:34.480 |
So let's first have a look at the simple out-of-the-box lang chain callback handler. 00:08:41.120 |
OK, so because we're initializing a new callback handler, we need to reinitialize our LLM. 00:08:46.400 |
For now, I'm going to use the default tokens here. 00:08:55.520 |
And then we can go ahead and try and see what we get. 00:09:06.880 |
So it didn't really, it didn't stream anything. 00:09:14.640 |
And that's because by default, this is looking, it's looking for something like, well, something 00:09:23.280 |
So rather than looking for final answer, it's looking for something like, I don't know, 00:09:28.080 |
final answer with a lowercase a or something along those lines. 00:09:32.880 |
So what we can do instead is obviously just say with this answer prefix tokens, once you 00:09:42.960 |
So we should hopefully get a better result by doing this. 00:10:01.200 |
So honestly, the easier approach, in my opinion, is just to use a custom callback handler. 00:10:25.520 |
OK, we're inheriting the class from the streaming standard output callback handler that we saw 00:10:33.040 |
We initialize it and we set this self content variable equal to just an empty string. 00:10:39.120 |
Basically, we're going to be feeding everything into that. 00:10:41.680 |
And we're going to be using that as a almost like a check. 00:10:45.040 |
When we see that the final answer is within this self content, we're going to switch to 00:10:49.760 |
actually streaming rather than just doing nothing. 00:10:53.360 |
And the way that we make that switch is using this final answer value. 00:11:05.840 |
We saw this earlier in the line chain definition for actually this class here. 00:11:14.960 |
Now, this is called with every new token output by the LLM. 00:11:20.080 |
So we take that token, we add it to our entire content, and then we say, OK, if the final 00:11:28.560 |
answer is in self content, that means we're now in the final answer section. 00:11:35.680 |
But then we also reinitialize our content, right? 00:11:41.040 |
And maybe I can just show you why we do that. 00:11:45.840 |
We say, OK, if the final answer is true, that means this will activate. 00:11:50.640 |
And this is where we actually get where we actually output our token. 00:11:54.640 |
So again, like I said, this is just equal to printing the token. 00:11:57.600 |
So once we get that final answer, we still need to wait for action input. 00:12:03.600 |
And then once we see that, we begin printing. 00:12:14.080 |
OK, and if you saw just that, it began printing action input as soon as it saw final answer. 00:12:23.440 |
Now, the reason it does that is because as soon as we see final answer, that gets activated. 00:12:30.160 |
So it's actually looking for this one and saying, OK, now we can start printing stuff. 00:12:36.560 |
So that's why we add or that's why we reinitialize the content here. 00:12:43.040 |
So try it with that and see if it works any better. 00:12:46.400 |
OK, so you see that it began the streaming on the actual answer output or answer input. 00:13:00.080 |
Now, there is one thing that I haven't been able to figure out. 00:13:02.720 |
And, you know, if anyone out there knows how to deal with this, please let me know. 00:13:08.640 |
But this little bit here is so streamed, which doesn't make much sense to me. 00:13:14.400 |
I was looking at this, trying to figure out where it is being streamed from. 00:13:22.000 |
But anyway, I thought, OK, I'm not really too bothered about that. 00:13:28.960 |
I can add some sort of filter or whatever around the backticks. 00:13:31.840 |
But rather than dealing with that, what is more important to me 00:13:35.920 |
is getting all this working with an API, which, again, 00:13:39.280 |
adds a little more complexity to the whole thing. 00:13:42.800 |
The issue with getting this working with API is primarily that 00:13:48.080 |
in order to pass these tokens through an API, like if you're using FastAPI, for example, 00:13:54.080 |
you need to be running a loop that is looking your tokens and passing it through 00:14:03.360 |
The only issue is that we also, at the same time, need to be running our agent logic. 00:14:08.080 |
So we need to be running these two separate bits of code at the same time. 00:14:11.680 |
Essentially, we're going to have to rely on a lot of async functions. 00:14:16.320 |
So the first thing we need to do is set up our API. 00:14:24.640 |
It includes everything in this notebook, right? 00:14:27.280 |
So in here, we're actually just initializing our agent. 00:14:33.600 |
We have this, like, the input format that we query with or that we send to our API. 00:14:46.800 |
Okay, so that's where we're going to be sending our queries to there. 00:14:50.000 |
This is just some FastAPI stuff to get everything running. 00:14:53.520 |
So before we run anything, we need to actually start the API. 00:15:13.840 |
Okay, if you're going to run this with what will be the main Python file I'm going to show you, 00:15:25.760 |
Okay, so we can check that it's running by calling this. 00:15:30.000 |
Okay, we should get this status here that, okay, everything's good. 00:15:33.600 |
Now, this here is actually how we would do streaming, but we haven't implemented streaming yet. 00:15:39.360 |
So let's just send a request, like we usually would, right? 00:15:43.760 |
So what I will first do actually will just be to get rather than post. 00:15:51.440 |
And I'm going to do request.get and send it to that endpoint. 00:15:58.160 |
So localhost 2000, and it is a chat endpoint. 00:16:05.200 |
And now we have something that we actually want to send, which is a JSON. 00:16:11.520 |
And in there, we have a text parameter, and we'll just put our query. 00:16:37.840 |
That's great, but that obviously, that's not streaming, we want to implement streaming. 00:16:46.560 |
So the first thing we're going to want to do is, 00:16:49.200 |
we're going to need to add the callback to our agent, but we've already initialized it. 00:16:53.600 |
So we're just going to replace the parameters in there, or the callback parameter in there. 00:17:06.320 |
And we'll have a query, but we'll also have a streaming iterator in here as well. 00:17:13.840 |
Now the streaming iterator will be of the type, initially, it will be of this type. 00:17:20.560 |
Okay, so the async iterator callback handler. 00:17:27.360 |
And the first thing we want to do is assign that stream iterator to our callbacks like this. 00:17:33.600 |
Okay, and then from there, what we want to do is await our agent call. 00:17:39.040 |
But because we're doing everything in async, we actually need to call a call like this. 00:17:45.280 |
And in that, we're going to pass out inputs, which is going to be our input, 00:17:55.600 |
Okay, so that will get us, you know, we're going to get our response. 00:18:00.800 |
And let me show you what you can do with that. 00:18:09.200 |
And what we'll do here is we will actually just call this run call here. 00:18:17.760 |
So we're going to do response, await, run call. 00:18:32.720 |
So we can create our streaming iterator here. 00:18:44.000 |
So we can, we'll save that, that's going to reload. 00:19:01.200 |
So, so far, it just seemed to, you know, give us a response straight away. 00:19:05.760 |
But at the same time, we're not doing a streaming request here. 00:19:30.640 |
All right, so if we look over here, we can see it's entering the new execute chain. 00:19:33.920 |
It has generated us a story, not without any errors though. 00:19:39.680 |
And okay, we have something there, but we just got an internal server error. 00:19:44.960 |
So clearly that whatever we just did doesn't work. 00:19:48.160 |
Let's change that to high there, so it's a little quicker. 00:19:51.360 |
Now, if we try and make another get request, we'll also see that error again. 00:19:59.840 |
So basically we broke it and we just need to, we're going to shut it down and restart. 00:20:14.160 |
So we're clearly missing a few things from our file here. 00:20:19.680 |
Now, in order to get streaming working, we can't just return a response like this. 00:20:24.720 |
We actually need to return what is called a streaming response here, right? 00:20:29.440 |
We have this value here I'll talk about in a moment. 00:20:35.760 |
So I need to go up to the top, I'm going to uncomment that. 00:20:38.880 |
And now we have our streaming response, but our streaming response expects a generator, right? 00:20:46.720 |
So, okay, maybe we can just put response in there. 00:20:59.840 |
If we look at our terminal here, we see that we have this dict object is not an iterator. 00:21:04.960 |
That is because our streaming response here expects a generator object 00:21:13.200 |
We're just kind of returning everything at the moment. 00:21:17.200 |
And the way that we fix that is actually by not giving it the response, 00:21:23.040 |
but by actually giving it our iterator here, because this is an iterator, right? 00:21:28.160 |
But we need more of like a standard generator. 00:21:37.760 |
It's going to be taking, for now, let's take this. 00:21:52.080 |
And we are using this, we're sending our run call to execute in the background. 00:22:00.240 |
Concurrently alongside our other tasks are also running, right? 00:22:04.080 |
Because we also need to be running this here. 00:22:06.640 |
So by using this create task, we allow this to be run in the background 00:22:14.800 |
Now, this here, the reason that it needs to be running is because this is how 00:22:20.720 |
we get tokens from here and send them into our stream response here. 00:22:30.880 |
So to actually set that up, we need to get our generator. 00:22:35.920 |
We can actually drop the response object here. 00:22:43.040 |
And obviously now we don't actually need to have this run call here. 00:23:02.480 |
And we'll look at the terminal as well and see at what point is this actually running? 00:23:07.360 |
Because sometimes it can kind of trick you and you might think that you're running this 00:23:13.200 |
or you're streaming this, but actually it's loaded everything. 00:23:16.640 |
And then it begins streaming, which is not ideal. 00:23:19.520 |
So what we're looking for here is that this will begin streaming. 00:23:30.080 |
This should begin streaming whilst it's still only on entering new executor chain. 00:23:37.440 |
If it comes up with this before we start streaming, it means that we're not actually streaming. 00:23:51.920 |
So that means it's actually giving us these tokens as they are being generated by the model. 00:23:58.160 |
But then we get to the end here and we still have this error. 00:24:01.600 |
So we need to stop our app on the other side and we will just run that again. 00:24:10.160 |
So tell me a long story or let's go high there again. 00:24:19.920 |
So we can see here that it should do this all the time. 00:24:23.840 |
But it's returning like the full response, like the agent formatted response. 00:24:29.120 |
We kind of want it to just return this bit here. 00:24:32.400 |
So to do that, we need to go into sort of custom callback handlers again. 00:24:45.440 |
So I'm just going to copy the custom callback code in and I'll just go through it with you. 00:24:51.520 |
So we need LLM result and we also need any here as well. 00:24:56.000 |
So I'm taking the async iterated callback handler. 00:25:00.240 |
And that has these two methods that we need to override. 00:25:04.560 |
So on LLM new token, basically here, because we're using agents, 00:25:10.400 |
We're checking for the final answer and also the action input. 00:25:13.680 |
But slightly differently, on LLM end, one, I want to check, okay, 00:25:22.000 |
Because if we're using multiple tools, it will basically say I'm done as soon as it 00:25:29.680 |
So by adding that sort of if statement in there, we are stopping streaming, 00:25:36.400 |
but only once the final answer has been given. 00:25:42.480 |
So this tells the callback handler when the callback is or when streaming is complete. 00:25:48.160 |
And once that is complete, we come down to here. 00:25:57.040 |
Now we need to replace this with our async callback handler. 00:26:04.560 |
And we should also change the types here as well. 00:26:21.200 |
And now you can see we just get that, like not the full agent output there. 00:26:26.000 |
So with that, we have our pretty rough API that can handle streaming. 00:26:32.880 |
Obviously, there's a few other things we should really add in most cases. 00:26:37.600 |
We should obviously test it a lot more and just see if there's any weird things that 00:26:41.760 |
happen, particularly in the way that we have the callback handler set up right now is, 00:26:46.480 |
you know, we're looking specifically for the final answer in the content. 00:26:50.160 |
What if the agent just decides that it's not going to generate the final answer or, 00:26:56.240 |
you know, the agent format that it should do, you know, we need to have some logic in 00:27:01.200 |
there just to handle those cases where it might do that. 00:27:05.840 |
But for the most part, this is like the core of what you need in order to actually 00:27:16.800 |
So I hope this has been useful and interesting. 00:27:21.680 |
So thank you very much for watching and I will see you again in the next one.