Rx.NET v6.1 New Feature: TakeUntil(CancellationToken)
Rx.NET
In this video, Ian Griffiths introduces the new TakeUntil(CancellationToken)
operator in Rx.NET 6.1, released in October 2025.
He discusses the purpose and functionality of this operator, which allows users to stop an infinite source when a cancellation token is signalled.
Ian acknowledges the contributions of community members Nils Aufschläger and Daniel Weber in shaping and developing this feature. Through a simple example using the Interval operator, Ian demonstrates how this new operator works and explains its benefits.
Learn how to manage infinite sources effectively with the new TakeUntil(CancellationToken)
operator in Rx.NET 6.1.
Full documentation is available at Introduction to Rx.NET.
- 00:00 Introduction to Rx.NET 6.1 and New Features
- 00:47 Community Contributions and Design Evolution
- 01:27 Understanding the TakeUntil(CancellationToken) Operator
- 03:08 Practical Example: Using TakeUntil with Interval
- 06:00 Summary and Documentation
Transcript
I am gonna tell you about a new operator introduced in Rx version 6.1, the TakeUntil(CancellationToken)
operator. So in October 2025, we've released version 6.1, Rx.NET. It's a new minor version release because we've got new features in there, but no breaking changes. The new features are DisposeWith
, which I'll talk about in a different video, the TakeUntil(CancellationToken)
operator, which is the topic for this video, and also one called ResetExceptionDispatchState
. Now, if you've been following the recent progress of Rx, you'll be aware that we're also looking to solve some longstanding packaging problems. That has not been done in the v6.1 release; that's coming in v7.0.
Okay, so let's take a look at this new operator TakeUntil(CancellationToken)
. So just to start with, I should say thank you to the community people who have helped this come into being. So this was originally suggested as a slightly different form of feature, then changed shape.
So the original proposal came from someone whose name I suspect I will mispronounce, but I'll try anyway: Nils Aufschläger. So thank you for the initial proposal, and he also did the work to implement this, but the design changed during the discussion, and that is partly thanks to Daniel Weber, who chipped into the conversation and suggested a completely different approach from the one that Nils originally wanted to take.
What exactly is this? What's the purpose behind this new operator? Because we've had TakeUntil
in various forms for a long time in Rx. Basically it says: take all the elements from this source until some criterion is met. And the new feature is that we are able to stop taking elements—to complete the source—when a cancellation token gets signaled.
And the purpose of this is to enable you to take an infinite source—so some source that would never complete on its own. So things like the interval timer are like this, but also the event sources that come from events in .NET. If you adapt a regular .NET event into an Rx source, it never completes by itself.
And that was actually the original motivation for this: was to enable those sorts of event sources to be completed. The original design was gonna be a specialized disposable implementation, but when we looked at that, it was kind of awkward. We didn't really have any other things that were both disposables and observables, and it wasn't clear how you would implement something that was both in a way that fit in well with the existing examples of those kinds of types in Rx.NET. So Daniel Weber's suggestion was that actually we could get the same effect—we could solve the same problem—by introducing this new overload of TakeUntil
that takes a CancellationToken
. And it's actually more flexible; it can work with anything that produces CancellationToken
s. And we already have the CancellationDisposable
in Rx.NET, which gives you a thing that you can dispose and which sets a CancellationToken
when you do so. That was the way we moved forward.
Okay, so let's see this in action. I've got a very simple Rx example here. This is using the Interval
operator that's been built into Rx forever, and this is just gonna raise events every second. So it's gonna produce the number 0, 1, 2, 3, and so on indefinitely. And the thing about interval is that it never stops. So when I run this, it's just gonna keep on producing numbers again and again and again, until either we unsubscribe or the process exits.
So we could just unsubscribe. So the Subscribe
method does return an IDisposable
, and we could hang onto that and then just call Dispose()
on that, and that will certainly shut things down. However, what it won't do is enable the complete events to flow through. So when you unsubscribe, you never see the OnCompleted
—or at least you might not see it. It's possible you'll see it during your call to Subscribe()
, but there's no guarantee. So if you definitely want to see that, you really want the source itself to shut down, and that's the idea of this new operator—or this new overload of the TakeUntil
operator.
So we're gonna need a CancellationToken
. So I'm gonna use a CancellationDisposable
, which is a type that has been built into Rx for a very long time, and that is able to provide us with a CancellationToken
. So what we can do is say: let's take items from this until that CancellationToken
is set. And what I can now do is, if I add a couple more things in here, if I dispose of that CancellationDisposable
after the first time someone hits enter, that should shut down this observable. Let me just print out another thing here saying "Disposed," and then we're gonna keep running until we get a second key press. So if I run this now, then as before it starts producing numbers, and it will do so—or the underlying interval source will do that—indefinitely. But if I hit enter, that disposes the CancellationDisposable
, and that causes the CancellationToken
to enter the canceled state, and then our TakeUntil
operator says, "Oh, okay, we're done then."
And we should now be able to see, if I add to my subscribe a second callback that says "Completed"—if I run this one more time. So that's producing numbers. I'm gonna hit enter to cause it to dispose that, and you can see right away the source completes properly. So this subscription has been shut down from the source end.
We haven't unsubscribed; we've caused the source itself to complete, and that is what TakeUntil
enables us to do.
So in summary, this new TakeUntil
operator overload enables us to cause otherwise infinite sources to complete when we want them to. So if you want to use this, you need to get hold of System.Reactive
v6.1, and you need to then get a CancellationToken
from somewhere. And you can use Rx's CancellationDisposable
as one way of doing that, and then you just pass the token into the new TakeUntil
overload.
If you want more details, we've documented this operator on the https://introtorx.com site.
And meanwhile, I'm Ian Griffiths. Thanks for listening.