Rx operators deep dive Part 5 – Thank you for joining me on this journey
And here we are, the culmination of a 3-month long journey deep into the realms of Rx operators. If any of you have been here since the beginning, I hope you have enjoyed going on this ride with me, and if any of you are just starting now, here's my weekly shameless plug for the start of this series.
Fair warning, this one's a bit of a stinker…
OnNext(The join operator)
So in Rx, how the join operator works is as follows:
If you have two observable sources, when you "join" them, any of the values they raise whose durations overlapped are joined via a join function.
The duration of the values in a source is defined via an observable. I.e. when you use a join operator you do:
IObservable<long> s1 = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3));
IObservable<long> s2 = Observable.Timer(TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(1));
DateTime start = DateTime.Now;
s1.Join(
s2,
t => Timer(TimeSpan.FromSeconds(1.5)),
t => Timer(TimeSpan.FromSeconds(1.5)),
(tl, tr) => (tl, tr)).Subscribe(x =>
{
Console.WriteLine($"{(int)(DateTime.Now - start).TotalSeconds} {x.tl} {x.tr}");
});
Here two observable timer sources are defined, one starting after 1 second of running and emitting a tick every 3 seconds, the other starting after 2 seconds and emitting a tick every second. When there two sources are joined, value will be produced when the durations of the ticks overlap.
The duration of a tick it defined via a duration function, which are the second and third argument of the join operator. These functions take a value of the type of each of the observables and emit a new observable. The duration of the tick will be until the observable first emits a value, or completes or errors.
Here the observables passed in are just timers, which will produce a value after 1.5 seconds, meaning that each window will be 1.5 seconds long. The output will look like this:
So after 1 second, the left observable produces tick 0.
After two seconds the right observable also produces tick 0. At this point the window for the left observable's first tick is still open (as the windows are 1.5 seconds long).
Therefore, the left and right 0 values are passed through the join result function, which produces the displayed output.
The left hand 0 window then closes, and the left hand tick 1 is not produced until 4 seconds, at which point the 1 and 2 windows for the right hand operator are still open, so two values are produced via the result function.
This might get clearer if we dig down a bit further…
So in our normal way:
public static IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(
this IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector,
Func<TRight, IObservable<TRightDuration>> rightDurationSelector,
Func<TLeft, TRight, TResult> resultSelector)
{
return new JoinOperator<TLeft, TRight, TResult, TLeftDuration, TRightDuration>(
left,
right,
leftDurationSelector,
rightDurationSelector,
resultSelector
);
}
This clearly requires some more unpicking. So the first argument is the observable on which the extension method is acting (the "left hand" source).
This has a value type of TLeft
. The second is the other observable, which will be joined from the "right", and has a value type of TRight
.
It is worth pointing out that there is no special differentiation between the left and right hand sources, the join is completely symmetric. The difference only arises because in C# there is not a way to invoke a method on multiple objects.
The third and fourth arguments are the functions which produce the durations of values from the first and second source respectively.
If we take the first as an example, it takes an argument of type TLeft
, which is the value type of the left hand observable.
It produces an observable of type TDurationLeft
. The type of this observable is irrelevant, because all that matters is that the duration will end as soon as the produced observable produces a value or completes.
The final argument is the join function. This takes two inputs of type TRight
and TLeft
, and produces a result of type TResult
.
Values which have overlapping durations will be passed through this function, and the results will be the values raised by the join operator. Hence, the observable produced by the join extension method is an observable of type TResult
.
So the join operator then looks like this…
public class JoinOperator<TLeft, TRight, TResult, TLeftDuration, TRightDuration> : IObservable<TResult>
{
private IObservable<TLeft> left;
private IObservable<TRight> right;
private Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector;
private Func<TRight, IObservable<TRightDuration>> rightDurationSelector;
private Func<TLeft, TRight, TResult> resultSelector;
public JoinOperator(IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector,
Func<TRight, IObservable<TRightDuration>> rightDurationSelector,
Func<TLeft, TRight, TResult> resultSelector)
{
this.left = left;
this.right = right;
this.leftDurationSelector = leftDurationSelector;
this.rightDurationSelector = rightDurationSelector;
this.resultSelector = resultSelector;
}
public IDisposable Subscribe(IObserver<TResult> observer)
{
return new Joiner(observer, this);
}
Now, if you're paying close attention you'll notice a difference from our previous examples. Previously, the Subscribe
method would just return the disposable which is produced when the "Joiner
" or equivalent if subscribed to the underlying source.
However, this will not work in this case because there are multiple sources to which the Joiner
now needs to subscribe/unsubscribe, so we will have to handle the subscriptions ourselves.
To achieve this, the Joiner
itself implements IDisposable
, and it is just returned by the subscribe method.
The Joiner
then looks like this:
private class Joiner : IDisposable
{
private IObserver<TResult> observer;
private JoinOperator<TLeft, TRight, TResult, TLeftDuration, TRightDuration> joinOperator;
private IList<TLeft> currentlyOpenLeft;
private IList<TRight> currentlyOpenRight;
private IList<IDisposable> openSubs;
public Joiner(IObserver<TResult> observer, JoinOperator<TLeft, TRight, TResult, TLeftDuration, TRightDuration> joinOperator)
{
this.observer = observer;
this.joinOperator = joinOperator;
this.currentlyOpenRight = new List<TRight>();
this.currentlyOpenLeft = new List<TLeft>();
this.openSubs = new List<IDisposable>();
this.openSubs.Add(this.joinOperator.left.Subscribe(new CallbackObserver<TLeft>(OnLeftNext, OnError, OnCompleted)));
this.openSubs.Add(this.joinOperator.right.Subscribe(new CallbackObserver<TRight>(OnRightNext, OnError, OnCompleted)));
}
Okay, so in this constructor a few things are initialised:
First, the observer being subscribed and a reference to the join operator are saved. This is so that the functions for obtaining the durations and joined values don't all have to be passed in individually.
Two lists of types TLeft
and TRight
are then initialised.
These will hold the values corresponding to the currently open windows of the left and right observables respectively.
A list of the currently open subscriptions is initialised.
A new callback observer is then subscribed to each of the right and left hand subscriptions and the disposables returned by each subscription are added to the open subscriptions list.
The callback observer looks like this:
internal class CallbackObserver<TSource> : IObserver<TSource>
{
private Action<TSource> onNext;
private Action<Exception> onError;
private Action onCompleted;
public CallbackObserver(Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
{
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}
public void OnCompleted()
{
this.onCompleted();
}
public void OnError(Exception error)
{
this.onError(error);
}
public void OnNext(TSource value)
{
this.onNext(value);
}
}
This means that we can now initialise a new observer which takes arguments corresponding to each of the methods it will call.
This is quite a useful concept in general, so it has been implemented as a standalone class rather than a nested class. In the Joiner
we have passed in the methods "OnLeft/RightNext
", "OnCompleted
" and "OnError
"
The OnLeftNext
method looks like this:
public void OnLeftNext(TLeft value)
{
this.currentlyOpenLeft.Add(value);
openSubs.Add(this.joinOperator.leftDurationSelector(value).Subscribe(new CallbackObserver<TLeftDuration>(_ => OnFinished(),
_ => OnFinished(), OnFinished)));
foreach (var rightValue in this.currentlyOpenRight)
{
this.observer.OnNext(this.joinOperator.resultSelector(value, rightValue));
}
void OnFinished()
{
this.currentlyOpenLeft.Remove(value);
}
}
When a new value is raised by the left hand source, and OnNext
is called on the callback observer, this value is added to the list of currently open windows.
A callback observer is then subscribed to the observable produced by the left duration function.
Remember the window for the value will be open until the duration function produces a value or completed. All of the methods for the callback operator therefore just remove the value from the currently open windows.
Then, the newly raised left value is combined (via the join result selector) with the values for each of the currently open right windows.
These results are then passed on to the listening observer.
The OnRightNext
function is equivalent but inverted:
public void OnRightNext(TRight value)
{
this.currentlyOpenRight.Add(value);
openSubs.Add(this.joinOperator.rightDurationSelector(value).Subscribe(new CallbackObserver<TRightDuration>(_ => { },
_ => OnFinished(), OnFinished)));
foreach (var leftValue in this.currentlyOpenLeft)
{
this.observer.OnNext(this.joinOperator.resultSelector(leftValue, value));
}
void OnFinished()
{
this.currentlyOpenRight.Remove(value);
}
}
Finally, the OnError
and OnCompleted
methods look like this:
public void OnCompleted()
{
Dispose();
this.observer.OnCompleted();
}
public void OnError(Exception error)
{
Dispose();
this.observer.OnError(error);
}
These have been passed into each of the callback observers subscribed to the underlying right and left hand sources.
This means that, for example, when the underlying left hand source calls OnCompleted
on the subscribed callback observer, Dispose
is called, then OnCompleted
is called on the observer subscribed to the join operator.
The Dispose
method looks like this:
public void Dispose()
{
foreach (var sub in openSubs)
{
sub.Dispose();
}
openSubs.Clear();
}
All of the open callback subscriptions are disposed, meaning that all listening observers are unsubscribed.
The list of open subscriptions is then cleared. In this way, before the underlying observer is completed, all active subscriptions are cleaned up.
So with all of our cleanup done, we have now reached the end of our journey.
I hope you have enjoyed this deep dive into the Rx operators. Writing this all out has definitely made me realise that these things do not just work by magic (though it's still pretty magical if you ask me!)
Finally, here's a link to the GitHub repository containing all of the code from the last 3 months… Peruse at your pleasure. And with that, I bid you farewell, I'm sure I'll be back with more Rx related content at some point in the future, but at that point you'll have to re-subscribe I'm afraid, because…
OnComplete()
!