Rx operators deep dive Part 3: Re-grouping our thoughts
After a brief foray into Azure AD, we're back onto Rx!
(If you missed part 1 and 2 then might be worth having a quick read – going to gloss over some of the stuff common to both)
OnNext(The GroupBy operator)
This week we're looking at the GroupBy
operator. This one's a bit more involved, so saddle up!
So we start out with our usual extension method:
static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(
this IObservable<TSource> source, Func<TSource, TKey> keySelector)
{
return new GroupByOperator<TSource, TKey>(source, keySelector);
}
However, some of the types here need a bit of unpicking.
So, if you have an IObservable<TSource>
, when you call the GroupBy
operator on it, you will be returned an IObservable<IGroupedObservable<TKey, TSource>>
. The IGroupedObservable
interface looks like this:
public interface IGroupedObservable<out TKey, out TSource> : IObservable<TSource>
{
// Gets the common key.
TKey Key { get; }
}
TKey
is the type of value that you want to group the source objects on. So say you had an IObservable
of people, and you wanted to group them on age.
The TSource
here would be Person
and the TKey
would be int
.
The IGroupedObservable
would just be an Observable
of people, but with the added property of being able to retrieve the key value for that group.
So, when you used the group by operator, you would receive an IObservable
in which each item you are passed is an IObservable
of people with a specific age.
For simplicity (and to avoid building a whole other class) lets just group our SourceNumbers
(which is just the numbers from one to 25) by the number % 3. In order to get the groups we do:
IObservable<int> sourceNumbers = System.Reactive.Linq.Observable.Range(1, 25);
sourceNumbers.GroupBy(number => number % 3).Subscribe(group =>
{
Console.WriteLine($"New group:{group.Key}");
});
Here, we use the GroupBy
operator on the source numbers. We subscribe to the output with an anonymous observer.
Each time the observer is passed a new group it will write out to the console. However, remember, each group is also an observable. So in order to get each new number which is passed out of that group, we have to also subscribe to the group itself:
sourceNumbers.GroupBy(number => number % 3).Subscribe(group =>
{
Console.WriteLine($"New group:{group.Key}");
group.Subscribe(value => Console.WriteLine($"New value in group {group.Key}: {value}"));
});
So, each time a new group is raised, this will be passed to the first observer's OnNext
method.
This will write out the new group to the console and subscribe a second observer to the group.
Every time a new value is raised by the group, this value will be passed to the second observer's OnNext
method, which will write out that value to the console.
The output of the program will be:
Note: This only works because we instantly subscribe to each new group when it is created. This is a hot observable so if we subscribed sometime later, these values would be lost.
How it works
Right, so that's the easy bit 😉
Hopefully we're all fairly familiar with the pattern by now – we have a GroupByOperator
which has an internal wrapping Observer which is subscribed to the underlying source and passes values onto whatever observers subscribe.
internal class GroupByOperator<TSource, TKey> : IObservable<IGroupedObservable<TKey, TSource>>
{
private IObservable<TSource> source;
private Func<TSource, TKey> keySelector;
public GroupByOperator(IObservable<TSource> source, Func<TSource, TKey> keySelector)
{
this.source = source;
this.keySelector = keySelector;
}
public IDisposable Subscribe(IObserver<IGroupedObservable<TKey, TSource>> observer)
{
var grouper = new Grouper(observer, this.keySelector);
return source.Subscribe(grouper);
}
Here, the KeySelector
is the function which is used to group the source values.
The internal Grouper
then looks like this:
private class Grouper : IObserver<TSource>
{
private readonly IObserver<IGroupedObservable<TKey, TSource>> observer;
private readonly Func<TSource, TKey> keySelector;
private readonly Dictionary<TKey, GroupedObservable> groupedObservables;
internal Grouper(IObserver<IGroupedObservable<TKey, TSource>> observer, Func<TSource, TKey> keySelector)
{
this.observer = observer;
this.keySelector = keySelector;
this.groupedObservables = new Dictionary<TKey, GroupedObservable>();
}
public void OnNext(TSource value)
{
var key = this.keySelector(value);
if (!this.groupedObservables.TryGetValue(key, out var groupedObservable))
{
groupedObservable = new GroupedObservable(key);
this.groupedObservables.Add(key, groupedObservable);
this.observer.OnNext(groupedObservable);
}
groupedObservable.Subject.OnNext(value);
}
This has an internal dictionary of grouped observables. When a new value is passed to the Grouper
, it passes it through the key selector function to find the key.
The dictionary is then checked to see if the key already exists. If it doesn't then a new group is created and passed to the subscribed observer. The GroupedObservable
class is as follows:
private class GroupedObservable : IGroupedObservable<TKey, TSource>
{
internal GroupedObservable(TKey key)
{
this.Key = key;
this.Subject = new Subject<TSource>();
}
public TKey Key { get; }
public ISubject<TSource> Subject { get; }
public IDisposable Subscribe(IObserver<TSource> observer)
{
return this.Subject.Subscribe(observer);
}
}
This has an internal subject, and when new observers subscribe to the group they are subscribed to the subject. This handles all of the subscriptions for us (see my previous post for details).
So, once the group has been created or retrieved from the dictionary, the source value is then passed to the OnNext
method of that group's subject, which then passes the value to any observers which are subscribed to that group.
In summary, when a source value is raised it is passed through the key selector.
If a group doesn't already exist for that key then a new one is created and this new group is passed to the observer which has subscribed to the GroupBy
operator.
The new source value for that group is then passed to the OnNext
method of internal Subject
of the group, which passes the value onto any observers which are subscribed to the group.
Finally, we have the OnCompleted
and OnError
methods for the Grouper
:
private class Grouper : IObserver<TSource>
{
public void OnCompleted()
{
CompleteGroupedObservables();
this.observer.OnCompleted();
}
public void OnError(Exception error)
{
CompleteGroupedObservables();
this.observer.OnError(error);
}
private void CompleteGroupedObservables()
{
foreach (var groupedObservable in this.groupedObservables)
{
groupedObservable.Value.Subject.OnCompleted();
}
}
When OnCompleted
is called on the Grouper
by the original source, because it has finished producing values, it means that no more values will be added to any of the groups to each group must also call OnCompleted
in any listening observers.
The same is true for OnError
. Here the error is not propagated to each individual group so that the error will only be raised once, the individual groups are just completed.
One all of the groups have completed, OnCompleted
/OnError
is called on the observer subscribed to the overall GroupBy
observable.
Now I realise this has been slightly involved, so if you want to take a look at the overall code, then you're in luck. I've finally sorted out the source code for these blog posts and uploaded it here.
Otherwise, until the next OnNext
call!