There's been a little bit of a gap since my last Rx blog, I've been pretty busy with keeping up with Advent of Code in any spare time (and I'm sure there will be a blog along those lines at some point in the near future).
But, for now, it's time for a deep dive into a few more of the operators in the world of Rx!
First up, the
OnNext(Understanding of how the Select operator works)
If you've read my last blog, you'll soon see that building your own
Select operator is fairly similar to building a
Where operator. You set up an extension method on
This time however, instead of a predicate you pass in a
TSource will be the type which the underlying source produces, and
TOutput will be the type of the values you want to select.
As before, when an observer subscribes, an internal observer (
Selector) is then constructed using the subscribing observer and the selection function. This
Selector is then subscribed to the observable.
When the underlying source passes a value to the
OnNext method of the
Selector, the value is then passed through the selection function.
The return value of this function is then passed to the original observer.
Selector is an observer of
TSource, as it will accept values of the type
TSource from the underlying source.
The subscribing observer takes values of type
TOutput, which is the type which is returned by the selection function.
In this way, the observer is passed the selected value rather than the one originally offered up by the observable.
You can then pass in a transform which will operate on each of the produced values and subscribe to the observable produced by the select operator:
The output of the above code will be as follows:
OUTPUT: 1 OUTPUT: 2 OUTPUT: 3 OUTPUT: 4 OUTPUT: 5 OUTPUT: 6 OUTPUT: 7 OUTPUT: 8 OUTPUT: 9 OUTPUT: 10
Each number in the range 1 to 10 is passed through the selection function and transformed into a string.
These strings are then offered up to our listening anonymous observer and printed out to the console.
In the last blog, I talked about using a multicast observable as to not produce values multiple times for multiple subscribers.
You can also do this here, but for simplicity I've just used the one subscriber.
Now onto something a little more interesting…
OnNext(The aggregate operator)
The aggregate operator is the underlying mechanism for all of the LINQ operators that take many inputs, and produce one (e.g.
Max etc. etc.).
In the world of Rx these operators still produce one value, but that value is in the form of an
IObservable which offers up that value. This is to keep everything in our new stream-based world in sync.
Let's try implementing our own
Sum, and then generalise that for any aggregation. Keep in mind, aggregate operators won't return until all of the elements have been produced, so they have to wait until the source has called
Sum, the overall setup is pretty similar to the
Select operators. A "filtering" observer is places in between the source and the subscribing observer:
Summer looks something like this:
OnNext call, the value is added to the running total. Here
dynamic is used as we need to be able to add values of an unknown type, if the type didn't implement the + operator an exception would be thrown at runtime.
OnComplete is called by the source, we then know that there are not going to be any more items passed along and the
Summer can pass the sum on to the listening observer.
It then calls
OnComplete on the subscribed observer, as it has produced the sum and won't be producing anything further.
The output for in this case would be 55.
The aggregate operator is very similar, but instead of a running total it stores a list of all the values which have been passed to it. When
OnComplete is called, this list is run through the provided aggregator function which produces a single value.
In this example, if we use an aggregator function which just concatenates strings:
And then we can use this aggregator on the formatted output from the select operator.
Here we can see that operators can be chained together, as is possible with LINQ to objects. When this program runs, you subscribe to the aggregate operator, which encapsulates your observer in an
Aggregator which will only pass it values once
OnComplete is called.
Aggregator is then encapsulated in a
Selector which will only pass the formatted values on to the
So, when the numbers from range are passed to the
Select operator, that offers up the formatted strings to the
Aggregator, which then offers up the single aggregated string to the observer. The output will be as follows:
OUTPUT: 1,OUTPUT: 2,OUTPUT: 3,OUTPUT: 4,OUTPUT: 5,OUTPUT: 6,OUTPUT: 7,OUTPUT: 8,OUTPUT: 9,OUTPUT: 10,
And there we have it, a quick look under the hoods of the
Aggregate operators, watch out for the next
OnNext() where I'll tackle another of the many operators still to explore!