Skip to content
Carmel Eve By Carmel Eve Software Engineer I
Rx operators deep dive Part 5 – Thank you for joining me on this journey

And here we are, the culmination of a 3-month long journey deep into the realms of Rx operators. If any of you have been here since the beginning, I hope you have enjoyed going on this ride with me, and if any of you are just starting now, here's my weekly shameless plug for the start of this series.

Fair warning, this one's a bit of a stinker…

OnNext(The join operator)

So in Rx, how the join operator works is as follows:

If you have two observable sources, when you "join" them, any of the values they raise whose durations overlapped are joined via a join function.

The duration of the values in a source is defined via an observable. I.e. when you use a join operator you do:

IObservable<long> s1 = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3));
IObservable<long> s2 = Observable.Timer(TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(1));

DateTime start = DateTime.Now;

s1.Join(
    s2,
    t => Timer(TimeSpan.FromSeconds(1.5)),
    t => Timer(TimeSpan.FromSeconds(1.5)),
    (tl, tr) => (tl, tr)).Subscribe(x =>
    {
        Console.WriteLine($"{(int)(DateTime.Now - start).TotalSeconds} {x.tl} {x.tr}");
    });

Here two observable timer sources are defined, one starting after 1 second of running and emitting a tick every 3 seconds, the other starting after 2 seconds and emitting a tick every second. When there two sources are joined, value will be produced when the durations of the ticks overlap.

The duration of a tick it defined via a duration function, which are the second and third argument of the join operator. These functions take a value of the type of each of the observables and emit a new observable. The duration of the tick will be until the observable first emits a value, or completes or errors.

Here the observables passed in are just timers, which will produce a value after 1.5 seconds, meaning that each window will be 1.5 seconds long. The output will look like this:

Output of joined observables.

So after 1 second, the left observable produces tick 0.

After two seconds the right observable also produces tick 0. At this point the window for the left observable's first tick is still open (as the windows are 1.5 seconds long).

Therefore, the left and right 0 values are passed through the join result function, which produces the displayed output.

The left hand 0 window then closes, and the left hand tick 1 is not produced until 4 seconds, at which point the 1 and 2 windows for the right hand operator are still open, so two values are produced via the result function.

This might get clearer if we dig down a bit further…

So in our normal way:

public static IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(
        this IObservable<TLeft> left,
        IObservable<TRight> right,
        Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector,
        Func<TRight, IObservable<TRightDuration>> rightDurationSelector,
        Func<TLeft, TRight, TResult> resultSelector)
{
      return new JoinOperator<TLeft, TRight, TResult, TLeftDuration, TRightDuration>(
            left, 
            right, 
            leftDurationSelector, 
            rightDurationSelector, 
            resultSelector
      );
}

This clearly requires some more unpicking. So the first argument is the observable on which the extension method is acting (the "left hand" source).

This has a value type of TLeft. The second is the other observable, which will be joined from the "right", and has a value type of TRight.

It is worth pointing out that there is no special differentiation between the left and right hand sources, the join is completely symmetric. The difference only arises because in C# there is not a way to invoke a method on multiple objects.

The third and fourth arguments are the functions which produce the durations of values from the first and second source respectively.

If we take the first as an example, it takes an argument of type TLeft, which is the value type of the left hand observable.

It produces an observable of type TDurationLeft. The type of this observable is irrelevant, because all that matters is that the duration will end as soon as the produced observable produces a value or completes.

The final argument is the join function. This takes two inputs of type TRight and TLeft, and produces a result of type TResult.

Values which have overlapping durations will be passed through this function, and the results will be the values raised by the join operator. Hence, the observable produced by the join extension method is an observable of type TResult.

So the join operator then looks like this…

public class JoinOperator<TLeft, TRight, TResult, TLeftDuration, TRightDuration> : IObservable<TResult>
{
    private IObservable<TLeft> left;
    private IObservable<TRight> right;
    private Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector;
    private Func<TRight, IObservable<TRightDuration>> rightDurationSelector;
    private Func<TLeft, TRight, TResult> resultSelector;

    public JoinOperator(IObservable<TLeft> left, 
        IObservable<TRight> right, 
        Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, 
        Func<TRight, IObservable<TRightDuration>> rightDurationSelector, 
        Func<TLeft, TRight, TResult> resultSelector)
    {
        this.left = left;
        this.right = right;
        this.leftDurationSelector = leftDurationSelector;
        this.rightDurationSelector = rightDurationSelector;
        this.resultSelector = resultSelector;
    }

    public IDisposable Subscribe(IObserver<TResult> observer)
    {
        return new Joiner(observer, this);
    }

Now, if you're paying close attention you'll notice a difference from our previous examples. Previously, the Subscribe method would just return the disposable which is produced when the "Joiner" or equivalent if subscribed to the underlying source.

However, this will not work in this case because there are multiple sources to which the Joiner now needs to subscribe/unsubscribe, so we will have to handle the subscriptions ourselves.

To achieve this, the Joiner itself implements IDisposable, and it is just returned by the subscribe method.

The Joiner then looks like this:

private class Joiner : IDisposable
{
    private IObserver<TResult> observer;
    private JoinOperator<TLeft, TRight, TResult, TLeftDuration, TRightDuration> joinOperator;
    private IList<TLeft> currentlyOpenLeft;
    private IList<TRight> currentlyOpenRight;
    private IList<IDisposable> openSubs;

    public Joiner(IObserver<TResult> observer, JoinOperator<TLeft, TRight, TResult, TLeftDuration, TRightDuration> joinOperator)
    {
        this.observer = observer;
        this.joinOperator = joinOperator;
        this.currentlyOpenRight = new List<TRight>();
        this.currentlyOpenLeft = new List<TLeft>();
        this.openSubs = new List<IDisposable>();
        this.openSubs.Add(this.joinOperator.left.Subscribe(new CallbackObserver<TLeft>(OnLeftNext, OnError, OnCompleted)));
        this.openSubs.Add(this.joinOperator.right.Subscribe(new CallbackObserver<TRight>(OnRightNext, OnError, OnCompleted)));
    }

Okay, so in this constructor a few things are initialised:

First, the observer being subscribed and a reference to the join operator are saved. This is so that the functions for obtaining the durations and joined values don't all have to be passed in individually.

Two lists of types TLeft and TRight are then initialised.

These will hold the values corresponding to the currently open windows of the left and right observables respectively.

A list of the currently open subscriptions is initialised.

A new callback observer is then subscribed to each of the right and left hand subscriptions and the disposables returned by each subscription are added to the open subscriptions list.

The callback observer looks like this:

internal class CallbackObserver<TSource> : IObserver<TSource>
{
    private Action<TSource> onNext;
    private Action<Exception> onError;
    private Action onCompleted;

    public CallbackObserver(Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
    {
        this.onNext = onNext;
        this.onError = onError;
        this.onCompleted = onCompleted;
    }

    public void OnCompleted()
    {
        this.onCompleted();
    }

    public void OnError(Exception error)
    {
        this.onError(error);
    }

    public void OnNext(TSource value)
    {
        this.onNext(value);
    }
}

This means that we can now initialise a new observer which takes arguments corresponding to each of the methods it will call.

This is quite a useful concept in general, so it has been implemented as a standalone class rather than a nested class. In the Joiner we have passed in the methods "OnLeft/RightNext", "OnCompleted" and "OnError"

The OnLeftNext method looks like this:

public void OnLeftNext(TLeft value)
{
    this.currentlyOpenLeft.Add(value);

    openSubs.Add(this.joinOperator.leftDurationSelector(value).Subscribe(new CallbackObserver<TLeftDuration>(_ => OnFinished(),
        _ => OnFinished(), OnFinished)));

    foreach (var rightValue in this.currentlyOpenRight)
    {
        this.observer.OnNext(this.joinOperator.resultSelector(value, rightValue));
    }

    void OnFinished()
    {
        this.currentlyOpenLeft.Remove(value);
    }
}

When a new value is raised by the left hand source, and OnNext is called on the callback observer, this value is added to the list of currently open windows.

A callback observer is then subscribed to the observable produced by the left duration function.

Remember the window for the value will be open until the duration function produces a value or completed. All of the methods for the callback operator therefore just remove the value from the currently open windows.

Then, the newly raised left value is combined (via the join result selector) with the values for each of the currently open right windows.

These results are then passed on to the listening observer.

The OnRightNext function is equivalent but inverted:

public void OnRightNext(TRight value)
{
    this.currentlyOpenRight.Add(value);

    openSubs.Add(this.joinOperator.rightDurationSelector(value).Subscribe(new CallbackObserver<TRightDuration>(_ => { },
        _ => OnFinished(), OnFinished)));

    foreach (var leftValue in this.currentlyOpenLeft)
    {
        this.observer.OnNext(this.joinOperator.resultSelector(leftValue, value));
    }

    void OnFinished()
    {
        this.currentlyOpenRight.Remove(value);
    }
}

Finally, the OnError and OnCompleted methods look like this:

public void OnCompleted()
{
    Dispose();
    this.observer.OnCompleted();
}

public void OnError(Exception error)
{
    Dispose();
    this.observer.OnError(error);
}

These have been passed into each of the callback observers subscribed to the underlying right and left hand sources.

This means that, for example, when the underlying left hand source calls OnCompleted on the subscribed callback observer, Dispose is called, then OnCompleted is called on the observer subscribed to the join operator.

The Dispose method looks like this:

public void Dispose()
{
    foreach (var sub in openSubs)
    {
        sub.Dispose();
    }

    openSubs.Clear();
}

All of the open callback subscriptions are disposed, meaning that all listening observers are unsubscribed.

The list of open subscriptions is then cleared. In this way, before the underlying observer is completed, all active subscriptions are cleaned up.

So with all of our cleanup done, we have now reached the end of our journey.

I hope you have enjoyed this deep dive into the Rx operators. Writing this all out has definitely made me realise that these things do not just work by magic (though it's still pretty magical if you ask me!)

Doodle of author surrounded by operators.

Finally, here's a link to the GitHub repository containing all of the code from the last 3 months… Peruse at your pleasure. And with that, I bid you farewell, I'm sure I'll be back with more Rx related content at some point in the future, but at that point you'll have to re-subscribe I'm afraid, because…

OnComplete()!

Carmel Eve

Software Engineer I

Carmel Eve

Carmel is a software engineer, LinkedIn Learning instructor and STEM ambassador.

Over the past four years she has been focused on delivering cloud-first solutions to a variety of problems. These have ranged from highly-performant serverless architectures, to web applications, to reporting and insight pipelines and data analytics engines.

In her time at endjin, she has written many blog posts covering a huge range of topics, including deconstructing Rx operators and mental well-being and managing remote working.

Carmel's first LinkedIn Learning course on how to prepare for the Az-204 exam - developing solutions for Microsoft Azure - was released in April 2021. Over the last couple of years she has also spoken at NDC, APISpecs and SQLBits. These talks covered a range of topics, from reactive big-data processing to secure Azure architectures.

She is also passionate about diversity and inclusivity in tech. She is a STEM ambassador in her local community and is taking part in a local mentorship scheme. Through this work she hopes to be a part of positive change in the industry.

Carmel won "Apprentice Engineer of the Year" at the Computing Rising Star Awards 2019.

Carmel worked at endjin from 2016 to 2021.