Rx operators deep dive part 2: Slowly aggregating knowledge
There's been a little bit of a gap since my last Rx blog, I've been pretty busy with keeping up with Advent of Code in any spare time (and I'm sure there will be a blog along those lines at some point in the near future).
But, for now, it's time for a deep dive into a few more of the operators in the world of Rx!
First up, the Select
operator!
OnNext(Understanding of how the Select operator works)
If you've read my last blog, you'll soon see that building your own Select
operator is fairly similar to building a Where
operator. You set up an extension method on IObservable
:
static IObservable<TOutput> Select<TSource, TOutput>(this IObservable<TSource> source, Func<TSource, TOutput> selection)
{
return new SelectOperator<TSource, TOutput>(source, selection);
}
This time however, instead of a predicate you pass in a Func<TSource, TOutput>
. TSource
will be the type which the underlying source produces, and TOutput
will be the type of the values you want to select.
As before, when an observer subscribes, an internal observer (Selector
) is then constructed using the subscribing observer and the selection function. This Selector
is then subscribed to the observable.
public class SelectOperator<TSource, TOutput> : IObservable<TOutput>
{
private IObservable<TSource> source;
private Func<TSource, TOutput> selection;
public SelectOperator(IObservable<TSource> source, Func<TSource, TOutput> selection)
{
this.source = source;
this.selection = selection;
}
public IDisposable Subscribe(IObserver<TOutput> observer)
{
var selector = new Selector(observer, selection);
return this.source.Subscribe(selector);
}
...
When the underlying source passes a value to the OnNext
method of the Selector
, the value is then passed through the selection function.
The return value of this function is then passed to the original observer.
The Selector
is an observer of TSource
, as it will accept values of the type TSource
from the underlying source.
The subscribing observer takes values of type TOutput
, which is the type which is returned by the selection function.
private class Selector : IObserver<TSource>
{
private IObserver<TOutput> observer;
private Func<TSource, TOutput> selection;
public Selector(IObserver<TOutput> observer, Func<TSource, TOutput> selection)
{
this.observer = observer;
this.selection = selection;
}
public void OnNext(TSource value)
{
TOutput selectedValue = this.selection(value);
this.observer.OnNext(selectedValue);
}
...
In this way, the observer is passed the selected value rather than the one originally offered up by the observable.
You can then pass in a transform which will operate on each of the produced values and subscribe to the observable produced by the select operator:
IObservable<int> sourceNumbers = System.Reactive.Linq.Observable.Range(1, 10);
IObservable<string> formattedOutputs = sourceNumbers.Select(n => $"OUTPUT: {n}");
formattedOutputs.Subscribe(n => Console.WriteLine(n));
The output of the above code will be as follows:
OUTPUT: 1
OUTPUT: 2
OUTPUT: 3
OUTPUT: 4
OUTPUT: 5
OUTPUT: 6
OUTPUT: 7
OUTPUT: 8
OUTPUT: 9
OUTPUT: 10
Each number in the range 1 to 10 is passed through the selection function and transformed into a string.
These strings are then offered up to our listening anonymous observer and printed out to the console.
In the last blog, I talked about using a multicast observable as to not produce values multiple times for multiple subscribers.
You can also do this here, but for simplicity I've just used the one subscriber.
Now onto something a little more interesting…
OnNext(The aggregate operator)
The aggregate operator is the underlying mechanism for all of the LINQ operators that take many inputs, and produce one (e.g. Count
, Sum
, Max
etc. etc.).
In the world of Rx these operators still produce one value, but that value is in the form of an IObservable
which offers up that value. This is to keep everything in our new stream-based world in sync.
Sum
Let's try implementing our own Sum
, and then generalise that for any aggregation. Keep in mind, aggregate operators won't return until all of the elements have been produced, so they have to wait until the source has called OnComplete
!
So with Sum
, the overall setup is pretty similar to the Where
/Select
operators. A "filtering" observer is places in between the source and the subscribing observer:
internal class SumOperator<TSource> : IObservable<TSource>
{
private IObservable<TSource> source;
public SumOperator(IObservable<TSource> source)
{
this.source = source;
}
public IDisposable Subscribe(IObserver<TSource> observer)
{
var summer = new Summer<TSource>(observer);
return source.Subscribe(summer);
}
...
The Summer
looks something like this:
private class Summer<TSource> : IObserver<TSource>
{
private IObserver<TSource> observer;
private TSource sum;
public Summer(IObserver<TSource> observer)
{
this.observer = observer;
}
public void OnCompleted()
{
this.observer.OnNext(sum);
this.observer.OnCompleted();
}
public void OnError(Exception error)
{
this.observer.OnError(error);
}
public void OnNext(TSource value)
{
sum += (dynamic)value;
}
}
With each OnNext
call, the value is added to the running total. Here dynamic
is used as we need to be able to add values of an unknown type, if the type didn't implement the + operator an exception would be thrown at runtime.
When OnComplete
is called by the source, we then know that there are not going to be any more items passed along and the Summer
can pass the sum on to the listening observer.
It then calls OnComplete
on the subscribed observer, as it has produced the sum and won't be producing anything further.
IObservable<int> sourceNumbers = System.Reactive.Linq.Observable.Range(1, 10);
IObservable<int> sum = sourceNumbers.Sum();
sum.Subscribe(n => Console.WriteLine(n));
The output for in this case would be 55.
Aggregate
The aggregate operator is very similar, but instead of a running total it stores a list of all the values which have been passed to it. When OnComplete
is called, this list is run through the provided aggregator function which produces a single value.
private class Aggregator : IObserver<TSource>
{
private IObserver<TOutput> observer;
private Func<IList<TSource>, TOutput> aggregation;
IList<TSource> values = new List<TSource>();
public Aggregator(IObserver<TOutput> observer, Func<IList<TSource>, TOutput> aggregation)
{
this.observer = observer;
this.aggregation = aggregation;
}
public void OnCompleted()
{
observer.OnNext(aggregation(values));
observer.OnCompleted();
}
public void OnError(Exception error)
{
this.observer.OnError(error);
}
public void OnNext(TSource value)
{
values.Add(value);
}
}
In this example, if we use an aggregator function which just concatenates strings:
static string CombineStrings(IList<string> values)
{
string listedStrings = "";
foreach (var str in values)
{
listedStrings += str + ",";
}
return listedStrings;
}
And then we can use this aggregator on the formatted output from the select operator.
IObservable<string> combinedOutputs = sourceNumbers.Select(n => $"OUTPUT: {n}").Aggregate(CombineStrings);
combinedOutputs.Subscribe(n => Console.WriteLine(n));
Here we can see that operators can be chained together, as is possible with LINQ to objects. When this program runs, you subscribe to the aggregate operator, which encapsulates your observer in an Aggregator
which will only pass it values once OnComplete
is called.
This Aggregator
is then encapsulated in a Selector
which will only pass the formatted values on to the Aggregator
.
So, when the numbers from range are passed to the Select
operator, that offers up the formatted strings to the Aggregator
, which then offers up the single aggregated string to the observer. The output will be as follows:
OUTPUT: 1,OUTPUT: 2,OUTPUT: 3,OUTPUT: 4,OUTPUT: 5,OUTPUT: 6,OUTPUT: 7,OUTPUT: 8,OUTPUT: 9,OUTPUT: 10,
And there we have it, a quick look under the hoods of the Select
and Aggregate
operators, watch out for the next OnNext()
where I'll tackle another of the many operators still to explore!