After a brief foray into Azure AD, we're back onto Rx!
OnNext(The GroupBy operator)
This week we're looking at the
GroupBy operator. This one's a bit more involved, so saddle up!
So we start out with our usual extension method:
However, some of the types here need a bit of unpicking.
So, if you have an
IObservable<TSource>, when you call the
GroupBy operator on it, you will be returned an
IObservable<IGroupedObservable<TKey, TSource>>. The
IGroupedObservable interface looks like this:
TKey is the type of value that you want to group the source objects on. So say you had an
IObservable of people, and you wanted to group them on age.
TSource here would be
Person and the
TKey would be
IGroupedObservable would just be an
Observable of people, but with the added property of being able to retrieve the key value for that group.
So, when you used the group by operator, you would receive an
IObservable in which each item you are passed is an
IObservable of people with a specific age.
For simplicity (and to avoid building a whole other class) lets just group our
SourceNumbers (which is just the numbers from one to 25) by the number % 3. In order to get the groups we do:
Here, we use the
GroupBy operator on the source numbers. We subscribe to the output with an anonymous observer.
Each time the observer is passed a new group it will write out to the console. However, remember, each group is also an observable. So in order to get each new number which is passed out of that group, we have to also subscribe to the group itself:
So, each time a new group is raised, this will be passed to the first observer's
This will write out the new group to the console and subscribe a second observer to the group.
Every time a new value is raised by the group, this value will be passed to the second observer's
OnNext method, which will write out that value to the console.
The output of the program will be:
Note: This only works because we instantly subscribe to each new group when it is created. This is a hot observable so if we subscribed sometime later, these values would be lost.
How it works
Right, so that's the easy bit 😉
Hopefully we're all fairly familiar with the pattern by now – we have a
GroupByOperator which has an internal wrapping Observer which is subscribed to the underlying source and passes values onto whatever observers subscribe.
KeySelector is the function which is used to group the source values.
Grouper then looks like this:
This has an internal dictionary of grouped observables. When a new value is passed to the
Grouper, it passes it through the key selector function to find the key.
The dictionary is then checked to see if the key already exists. If it doesn't then a new group is created and passed to the subscribed observer. The
GroupedObservable class is as follows:
This has an internal subject, and when new observers subscribe to the group they are subscribed to the subject. This handles all of the subscriptions for us (see my previous post for details).
So, once the group has been created or retrieved from the dictionary, the source value is then passed to the
OnNext method of that group's subject, which then passes the value to any observers which are subscribed to that group.
In summary, when a source value is raised it is passed through the key selector.
If a group doesn't already exist for that key then a new one is created and this new group is passed to the observer which has subscribed to the
The new source value for that group is then passed to the
OnNext method of internal
Subject of the group, which passes the value onto any observers which are subscribed to the group.
Finally, we have the
OnError methods for the
OnCompleted is called on the
Grouper by the original source, because it has finished producing values, it means that no more values will be added to any of the groups to each group must also call
OnCompleted in any listening observers.
The same is true for
OnError. Here the error is not propagated to each individual group so that the error will only be raised once, the individual groups are just completed.
One all of the groups have completed,
OnError is called on the observer subscribed to the overall
Now I realise this has been slightly involved, so if you want to take a look at the overall code, then you're in luck. I've finally sorted out the source code for these blog posts and uploaded it here.
Otherwise, until the next