Skip to content
Carmel Eve By Carmel Eve Software Engineer I
Rx operators deep dive part 1: Where observables meet LINQ

In case you missed it… Here's a link to my last blog on understanding Rx (luckily this blog has an internal buffer so if you're just tuning in now, you've not missed your chance)!

OnNext(Understanding of the Rx operators)

Now one of the most exciting things about Rx is that it has its own implementation of LINQ. This means that you can perform various operations using the same syntax that you would with the standard pull-based collections.

There are a huge amount of operators in Rx to explore, but first, lets get a look under the hood!

How the operators work…

So, I have one of those brains which makes it rather difficult to just accept something as fact. I really struggled to understand exactly what was happening when you used an operator on an observable sequence, luckily, during my weekly one-on-one with our resident technical fellow (and experienced explainer of complicated things), I got the opportunity to ask just that!

The Introduction to Rx.NET 2nd Edition (2024) Book, by Ian Griffiths & Lee Campbell, is now available to download for FREE.

In this blog I'm going to attempt to lay out that explanation, dissecting the Where operator as an example.

So… If you were going to write your own where operator, it might look something like the following:

static IObservable Where(this IObservable source, Func<TSource, bool> predicate)
{
    return new WhereOperator(source, predicate);
}

The static Where method is an extension method on an IObservable. So that calls to AnObservable.Where(predicate) will work as expected. This method then returns an instance of a WhereOperator class, which might look something like this (details to follow):

private class WhereOperator : IObservable
{
    private readonly IObservable source;
    private readonly Func<T, bool> predicate;

    public WhereOperator(IObservable source, Func<T, bool> predicate)
    {
        this.source = source;
        this.predicate = predicate;
    }

    public IDisposable Subscribe(IObserver observer)
    {
        ...
    }
    
    private class Filter : IObserver
    {
              ...

The WhereOperator contains an IObservable, which is the source to be filtered, and a predicate, which is the condition to filter that source on. So if we wanted just the even numbers from 1 to 10:

IObservable<int> sourceNumbers = Observable.Range(1, 10);
IObservable<int> filteredNumbers = sourceNumbers.Where(n => n % 2 == 0);

Then, when you then subscribe to filteredNumbers (which is an instance of the WhereOperator):

public class MyObserver : IObserver<int>
{
    public void OnNext(int value)
    {
        Console.WriteLine(value);
    }
    
    public void OnCompleted() { }

    public void OnError(Exception error) { }

}

var myObserver = new MyObserver();
filteredNumbers.Subscribe(myObserver);

(This could also be done using an anonymous observer).

A new Filter, which implements IObserver, is created within the WhereOperator's Subscribe method:

private class WhereOperator : IObservable
{
    public IDisposable Subscribe(IObserver observer)
    {
        var filter = new Filter(observer, predicate);
        return source.Subscribe(filter);
    }
    
    ...

The internal Filter class is instantiated using the observer which wants to subscribe to the filtered source, and the filtering predicate. This filter is then subscribed to the underlying source (in this case, Observable.Range). For each value that the range produces, it is passed to the OnNext method of the filter.

private class Filter : IObserver<T>
{
    private IObserver<T> observer;
    private Func<T, bool> predicate;

    public Filter(IObserver<T> observer, Func<T, bool> predicate)
    {
        this.observer = observer;
        this.predicate = predicate;
    }
    
    public void OnNext(T value)
    {
        if (this.predicate(value))
        {
            this.observer.OnNext(value);
        }
    }

    public void OnCompleted()
    {
        this.observer.OnCompleted();
    }

    public void OnError(Exception error)
    {
        this.observer.OnError(error);
    }
}

When the OnNext method of the Filter is invoked, if the predicate is true it will call the OnNext of the underlying observer. If the predicate is not satisfied then the value will not be passed on.

In this case, the Filter created inside the subscribe method has myObserver as its observer property, and the predicate passed into the Where operator as it's predicate.

So, when each value from Range is passed into the filter, it is then passed through the predicate. If the value is even, the predicate returns true. If this is the case, the value is passed to myObserver's OnNext method and the number is written out to the console. However, if the number is odd then the predicate returns false and the value is not passed on to myObserver.

In this way only the even numbers are passed to myObserver. The original IObservable is filtered by the predicate.

A note to remember though, is that all of the numbers in range are still processed, they are just discarded if they do not meet the criteria (in the same way as every number is still checked when you call Where on an IEnumerable).

So, as all of the work is done each time an observer subscribes to Range, this will mean that the numbers 1 to 10 are produced each time someone subscribes.

Programming C# 12 Book, by Ian Griffiths, published by O'Reilly Media, is now available to buy.

Now, for something that's just producing the numbers 1 to 10 that's not too much of an issue. But, if the processing was slightly more involved, you may want a way to send the filtered results to multiple observers while only doing the processing once.

Enter Publish()

If you call Publish on an observable source, you are returned an IConnectableObservable. If you created it yourself it might look a little something like this:

When you subscribe an observer to a connectable observable, that observer is subscribed to the Subject. And as we saw in my last blog, it is added to the subject's internal list of observers.

(TL;DR When the subject's OnNext method is called, it passes the value in to the OnNext of all of the observers in its list).

When you call the Connect method, you are telling the connectable observable that everything that needs to subscribe has done so. At this point, it subscribes the subject to the underlying source and items will start being passed to all of the listening observers. In practice you could use it as follows:

var myConnectableObservable = new MyConnectableObservable<int>(filteredNumbers);
myConnectableObservable.Subscribe(n => Console.WriteLine(n));
myConnectableObservable.Subscribe(n => Console.WriteLine(n));

myConnectableObservable.Connect();

And equivalently, using Publish():

IConnectableObservable<int> multicastFilteredNumbers = filteredNumbers.Publish();

multicastFilteredNumbers.Subscribe(n => Console.WriteLine(n));
multicastFilteredNumbers.Subscribe(n => Console.WriteLine(n));

multicastFilteredNumbers.Connect();

In each of these cases we have subscribed (using anonymous observers) twice to the connectable observable.

The output from either one of these implementations will be as follows (and will only be produced once Connect is called):

Showing output of pairs of even numbers.

And in each case, the processing to produce the range has only been carried out once.

I had dreams of this blog being an overview of the operators in Rx, however I think we can all agree that I've pushed out enough information for now (not that it's up to you observers)…

Keep an eye out for my next blog where I'll be diving into another of the operators (unless I get distracted again). So, until the next OnNext call! (I'm pretty pleased with this analogy)

Oh and here's me... Surrounded by my expansive notes... Laughing at my own puns.

Doodle of author surrounded by Rx notes.

Carmel Eve

Software Engineer I

Carmel Eve

Carmel is a software engineer and LinkedIn Learning instructor. She worked at endjin from 2016 to 2021, focused on delivering cloud-first solutions to a variety of problems. These included highly performant serverless architectures, web applications, reporting and insight pipelines, and data analytics engines. After a three-year career break spent travelling around the world, she rejoined endjin in 2024.

Carmel has written many blog posts covering a huge range of topics, including deconstructing Rx operators, agile estimation and planning and mental well-being and managing remote working.

Carmel has released two courses on LinkedIn Learning - one on the Az-204 exam (developing solutions for Microsoft Azure) and one on Azure Data Lake. She has also spoken at NDC, APISpecs, and SQLBits, covering a range of topics from reactive big-data processing to secure Azure architectures.

She is passionate about diversity and inclusivity in tech. She spent two years as a STEM ambassador in her local community and taking part in a local mentorship scheme. Through this work she hopes to be a part of positive change in the industry.

Carmel won "Apprentice Engineer of the Year" at the Computing Rising Star Awards 2019.