Back to Index

Streaming for LangChain Agents + FastAPI


Chapters

0:0 Streaming for LLMs and Agents
1:38 Simple StdOut Streaming in LangChain
5:3 Streaming with LangChain Agents
7:56 Final Output Streaming
10:2 Custom Callback Handlers in LangChain
14:16 FastAPI with LangChain Agent Streaming
22:56 Confirming we have Agent Streaming
24:32 Custom Callback Handlers for Async
26:26 Final Things to Consider

Transcript

Today we're going to learn how to do streaming with LangChain. Now streaming is a very popular feature for large language models and chatbots, but it can be quite complicated to implement or at least confusing to get started with. Now for those of you that are maybe not aware of streaming, it's essentially when you are talking to an LLM or a chatbot and it is loading the output token by token or word by word.

Now the whole point of using streaming in the first place is that, particularly if you're generating a lot of text, you can begin showing the user that text and they can begin reading sooner. Now implementing streaming, it can be very simple if you are implementing it for a simple use case, but it begins to get more difficult first when you start using LangChain and then secondly if we begin using agents it gets a little more complicated.

Then if we take it a little further and maybe we want to stream the data from our agent through to our own API, then again things get a little more complicated. But in this video we're going to go through all of that, so by the end of this we'll actually have an agent in LangChain that is streaming via a FastAPI instance.

So let's just jump straight into it. Now the simplest form of streaming that we can do is basically just printing out to the terminal or as you'll see in a Jupyter notebook. To achieve that level of streaming it's very easy. Let's take a look at how we would do it.

So we use these two parameters when we're initializing our LLM. So here it's just a chatOpenAI object. I'm initializing it as usual. These are all typical parameters that we'd use. But I'm also adding in this streaming and this callbacks parameter. Streaming is I think pretty obvious what it does.

It just switches on streaming and it's also worth noting that this will only work with certain LLMs, not all LLMs will support streaming. But for OpenAI of course it is supported. And then we also need to pass this callbacks parameter. Okay and I think this is probably the more interesting part here.

So this here is what handles the streaming, all right. This handles, you can see that it says streaming, right, okay it's streaming. Standard output callback handler. So the standard output is basically like a print. And we can even see what it's doing by heading over to its definition here.

So you can come to here and we see this method on LLM new token. When this is executed it's going to print new token, okay. So that's all we're doing by adding this callback in here. With every new generated token it is going to be taken to the standard output.

Now that's great. I have initialized that. Also of course you have to put your API key here from OpenAI. If you haven't already. Now if we run this, okay, so like we would in LangChain normally. So we create our human message and we just pass it to our LLM.

You will see straight away we have streaming, right. So it took 2.2 seconds to complete. So if we didn't enable streaming there it would have taken 2.2 seconds to show anything, right. It would have basically come out with this at the bottom here. But you saw that it kind of went through token by token printing them to our output, right.

So we can see it again like that. Now this is really good especially when we have more, you know, long outputs. So maybe we say, you know, tell me a long sorry. And if we run it again it's going to take a little more time. You can see, you know, we're kind of going through and obviously if we were printing out the whole thing nicely we'd be able to kind of follow it and read as it's going.

All right. But you can see that if we were not doing streaming here, you know, it keeps going on. It's 19 seconds now, 20 seconds, 21 seconds. We would be waiting that whole time just to begin reading, right. So that was almost 26 seconds in total. If we didn't use streaming we'd have to wait that whole time to begin reading.

But with streaming we can begin reading earlier. Which obviously is a nice little feature to have. Now that is, you know, using an LLM. Using an LLM and streaming to, you know, your terminal or the Jupyter Network output is, you know, it's the easiest thing you can do. It begins to get a bit more complicated as soon as you start adding in more logic.

So let's take a look at how we might do it for an agent in Lightning Chain. Okay. So we're just going to initialize an agent here. We have our memory. We are going to load in one tool. We're going to initialize our agent. It's going to be a conversational React agent.

Okay. And we're using the same LLM. So because we're using the same LLM we already have our callback, our streaming standard output callback handler that is already included within our agent. So we can initialize that. One thing that you will or that you should do here is make sure you return intermediate steps as false because that will trigger issues if you're trying to call things back.

And you can actually pull back those intermediate steps by parsing what is being output to you anyway. And I also set verbose equal to true so that we can actually get all of the outputs. So after initializing that, I'm going to go to, I'm going to create this prompt.

It's just a string this time rather than the messages. That's just because we're using an agent rather than the LLM directly. And yeah, let's try. Okay. And you see that it actually, you know, it's streamed and it output the entire, you know, we have the action, the action input.

So whereas before we kind of just had this bit with a agent, we have more because an agent basically works. It's like an added little bit of logic around the LLM. And in order for it to correctly parse the, what the LLM wants to do, it asks it to return the output from the LLM in this JSON format with an action and an action input.

Now, this is useful because, you know, sometimes maybe we want to use a tool using our agent, or we want to go through multiple, almost like thinking steps, right? So here's an example of that. We're going to use the calculator tool or the LLM math tool that we created.

So we're going to say, what is the square root of 71? We run this and you see it's using the calculator action, right? And from that, it basically uses this calculator tool and it puts in this value here. So the square root of 71. And from that, it gets this answer, which it can then feed back to us in the final answer.

Right? And when we ran that, it's going to stream the, you know, the calculator. It also streams this little bit here. I'm actually not sure why it does this, but anyway, and then it streams the final answer. OK, so we can pass that. We can extract the tools. We can extract the final answer and just do it in a cleaner way.

But how do we do that in a cleaner way, right? Because right now it's just outputting everything to us. Well, we have like two options. We can either use, there's a lang chain callback handler built specifically for outputting the final answer to us from an agent. That is literally what it's for.

And the other option is we can create a custom callback handler. And I would say the custom callback handler is, well, it's more flexible, of course. It just means it requires a little bit more extra work on our side. But both are pretty straightforward. So let's first have a look at the simple out-of-the-box lang chain callback handler.

OK, so because we're initializing a new callback handler, we need to reinitialize our LLM. For now, I'm going to use the default tokens here. So I will initialize it like that. And we also reinitialize the agent. And then we can go ahead and try and see what we get.

OK. Right. So it didn't really, it didn't stream anything. It just streamed this little bit here. And that's because by default, this is looking, it's looking for something like, well, something wrong. So rather than looking for final answer, it's looking for something like, I don't know, final answer with a lowercase a or something along those lines.

I don't quite remember. So what we can do instead is obviously just say with this answer prefix tokens, once you see these tokens, then you begin streaming. Right. So we should hopefully get a better result by doing this. Let's try. OK. OK. So it started streaming. It streamed. But then again, it's still kind of messy.

So honestly, the easier approach, in my opinion, is just to use a custom callback handler. So to do that, we come down to here. OK. And I just create this here. So this is the custom callback handler. Let me go through it step by step. So first, let's just remove that first.

OK, we're inheriting the class from the streaming standard output callback handler that we saw before. We initialize it and we set this self content variable equal to just an empty string. Basically, we're going to be feeding everything into that. And we're going to be using that as a almost like a check.

When we see that the final answer is within this self content, we're going to switch to actually streaming rather than just doing nothing. And the way that we make that switch is using this final answer value. OK, so let's add the rest of that in there. So this is the onLLM new token.

We saw this earlier in the line chain definition for actually this class here. So we're creating our own version of this. Now, this is called with every new token output by the LLM. So we take that token, we add it to our entire content, and then we say, OK, if the final answer is in self content, that means we're now in the final answer section.

So we set final answer equal to true. But then we also reinitialize our content, right? And maybe I can just show you why we do that. So I'll comment that out. We say, OK, if the final answer is true, that means this will activate. And this is where we actually get where we actually output our token.

So again, like I said, this is just equal to printing the token. So once we get that final answer, we still need to wait for action input. And then once we see that, we begin printing. So let's run all of that and let's try. OK, and if you saw just that, it began printing action input as soon as it saw final answer.

In fact, it even gets answer here. Now, the reason it does that is because as soon as we see final answer, that gets activated. But action input is already there. So it's actually looking for this one and saying, OK, now we can start printing stuff. It doesn't wait for the second action input.

So that's why we add or that's why we reinitialize the content here. So try it with that and see if it works any better. OK, so you see that it began the streaming on the actual answer output or answer input. Sorry. Now, there is one thing that I haven't been able to figure out.

And, you know, if anyone out there knows how to deal with this, please let me know. But this little bit here is so streamed, which doesn't make much sense to me. I was looking at this, trying to figure out where it is being streamed from. But honestly, I have no idea.

But anyway, I thought, OK, I'm not really too bothered about that. It's streaming this out here. I can add some sort of filter or whatever around the backticks. But rather than dealing with that, what is more important to me is getting all this working with an API, which, again, adds a little more complexity to the whole thing.

The issue with getting this working with API is primarily that in order to pass these tokens through an API, like if you're using FastAPI, for example, you need to be running a loop that is looking your tokens and passing it through what is called a streaming response object. The only issue is that we also, at the same time, need to be running our agent logic.

So we need to be running these two separate bits of code at the same time. Essentially, we're going to have to rely on a lot of async functions. So the first thing we need to do is set up our API. So I've started this very simple version. It doesn't include streaming.

It's just a FastAPI. It includes everything in this notebook, right? So in here, we're actually just initializing our agent. And then we go on. We have this, like, the input format that we query with or that we send to our API. We have a health check here. We're going to test that in a moment.

And then we also have this chat endpoint. Okay, so that's where we're going to be sending our queries to there. This is just some FastAPI stuff to get everything running. So before we run anything, we need to actually start the API. So we switch over to our terminal. I'm going to check where we are.

So we have this demo.py file here. That is the API I'm running. So I'm going to do evicorn demo app reload. Okay, if you're going to run this with what will be the main Python file I'm going to show you, you would just replace demo with main there. Okay, so we can check that it's running by calling this.

Okay, we should get this status here that, okay, everything's good. Now, this here is actually how we would do streaming, but we haven't implemented streaming yet. So let's just send a request, like we usually would, right? So what I will first do actually will just be to get rather than post.

And I'm going to do request.get and send it to that endpoint. So localhost 2000, and it is a chat endpoint. And now we have something that we actually want to send, which is a JSON. And in there, we have a text parameter, and we'll just put our query. So let's put hello there.

Okay, let's run that. See what we get. Okay, 200, that's good. And let's see what that contains. Okay, so we got the response here. That's great, but that obviously, that's not streaming, we want to implement streaming. So let's go through how we would do that. So the first thing we're going to want to do is, we're going to need to add the callback to our agent, but we've already initialized it.

So we're just going to replace the parameters in there, or the callback parameter in there. So we'll do async def run call. And we'll have a query, but we'll also have a streaming iterator in here as well. Now the streaming iterator will be of the type, initially, it will be of this type.

Okay, so the async iterator callback handler. So let's put that there. And the first thing we want to do is assign that stream iterator to our callbacks like this. Okay, and then from there, what we want to do is await our agent call. But because we're doing everything in async, we actually need to call a call like this.

And in that, we're going to pass out inputs, which is going to be our input, which maps to our query like this. Okay, so that will get us, you know, we're going to get our response. And let me show you what you can do with that. So we'll get a response.

Let's say we return that response. And what we'll do here is we will actually just call this run call here. So we're going to do response, await, run call. Okay, exactly, not quite like that. So we need this in here. Okay, or we can even separate that. So we can create our streaming iterator here.

So now we just pass it into here. Okay, cool. So we can, we'll save that, that's going to reload. And then we can just see if it's working. Let's try. So, okay, health first, all is good. Let's try this. Okay, right. So, so far, it just seemed to, you know, give us a response straight away.

But at the same time, we're not doing a streaming request here. So actually, we should try this. So let's try it, get stream. Okay, method not allowed. That's what I get, post, sorry. So let's change that. Okay, it's taking a long time. And that's because this isn't a stream.

It's kind of just like stuck now. All right, so if we look over here, we can see it's entering the new execute chain. It has generated us a story, not without any errors though. And okay, we have something there, but we just got an internal server error. So clearly that whatever we just did doesn't work.

Let's change that to high there, so it's a little quicker. Now, if we try and make another get request, we'll also see that error again. So basically we broke it and we just need to, we're going to shut it down and restart. Okay, now let's try again. Okay, now it looks good.

So we're clearly missing a few things from our file here. So let's work through those. Now, in order to get streaming working, we can't just return a response like this. We actually need to return what is called a streaming response here, right? We have this value here I'll talk about in a moment.

So we're returning this event stream, right? So I need to go up to the top, I'm going to uncomment that. And now we have our streaming response, but our streaming response expects a generator, right? So, okay, maybe we can just put response in there. Let's see what happens. Okay, let's come to here and try again.

Okay, and we get this error. If we look at our terminal here, we see that we have this dict object is not an iterator. That is because our streaming response here expects a generator object or something it can iterate over. And we don't have that. We're just kind of returning everything at the moment.

So we need to fix that. And the way that we fix that is actually by not giving it the response, but by actually giving it our iterator here, because this is an iterator, right? But we need more of like a standard generator. So we come to, let's come to here, right?

We have this create generator. It's going to be taking, for now, let's take this. We use async I/O, so I need to import that. So we have async I/O create task. And we are using this, we're sending our run call to execute in the background. Concurrently alongside our other tasks are also running, right?

Because we also need to be running this here. So by using this create task, we allow this to be run in the background whilst we're also running this. Now, this here, the reason that it needs to be running is because this is how we get tokens from here and send them into our stream response here.

All right. So to actually set that up, we need to get our generator. We can actually drop the response object here. We don't need it. And we replace this with our generator. Okay. And obviously now we don't actually need to have this run call here. So save that and let's try and run again.

Okay. Nice. So that kind of looks like it's streaming. But let's rerun that. And we'll look at the terminal as well and see at what point is this actually running? Because sometimes it can kind of trick you and you might think that you're running this or you're streaming this, but actually it's loaded everything.

And then it begins streaming, which is not ideal. So what we're looking for here is that this will begin streaming. In fact, we can ask for the long story now. So tell me a long story. This should begin streaming whilst it's still only on entering new executor chain. All right.

If it comes up with this before we start streaming, it means that we're not actually streaming. So let's try. Okay. Looks good. We're streaming. It hasn't come up with the answer yet. So that means it's actually giving us these tokens as they are being generated by the model. But then we get to the end here and we still have this error.

So we need to stop our app on the other side and we will just run that again. Let's try again. All right. So tell me a long story or let's go high there again. Just make it quick. Okay. So we can see here that it should do this all the time.

But it's returning like the full response, like the agent formatted response. And we don't necessarily want that. We kind of want it to just return this bit here. So to do that, we need to go into sort of custom callback handlers again. So we can modify that as we saw before.

We had like a custom callbacks. We can do the same here. So I'm just going to copy the custom callback code in and I'll just go through it with you. So we need LLM result and we also need any here as well. So I'm taking the async iterated callback handler.

And that has these two methods that we need to override. So on LLM new token, basically here, because we're using agents, I want to do what we did before. We're checking for the final answer and also the action input. But slightly differently, on LLM end, one, I want to check, okay, have we reached final answer yet?

Because if we're using multiple tools, it will basically say I'm done as soon as it hits the first tool, which we don't want. So by adding that sort of if statement in there, we are stopping streaming, but only once the final answer has been given. And we have this.

So self done set. All right. So this tells the callback handler when the callback is or when streaming is complete. And once that is complete, we come down to here. So we have this await task. This is going to finish, right? Now we need to replace this with our async callback handler.

So the name we gave it here. And we should also change the types here as well. Okay, cool. So we rerun that. Now we can try again. Okay. And now you can see we just get that, like not the full agent output there. So with that, we have our pretty rough API that can handle streaming.

Obviously, there's a few other things we should really add in most cases. We should obviously test it a lot more and just see if there's any weird things that happen, particularly in the way that we have the callback handler set up right now is, you know, we're looking specifically for the final answer in the content.

What if the agent just decides that it's not going to generate the final answer or, you know, the agent format that it should do, you know, we need to have some logic in there just to handle those cases where it might do that. But for the most part, this is like the core of what you need in order to actually have streaming and have it behind an API.

Now, that's it for this video. So I hope this has been useful and interesting. But for now, I will leave it there. So thank you very much for watching and I will see you again in the next one. Bye.