Rx operators deep dive Part 4: A window into scheduling in Rx
So, this week we are looking at the Buffer
and Window
Rx operators. (If you have no idea what I'm on about, I suggest you start at the beginning!)
There are a few different implementations of these operators, and we are going to focus on the time-based versions. In order to do this, we need to talk about Schedulers
.
Schedulers
In Rx there are several different schedulers which can be used to invoke actions (or queue up work). Depending on the scheduler, that work will be run using the context defined by that scheduler.
When you invoke work via a scheduler, you can either tell it to "run this immediately" or to run it at a specific time.
This is done via the Schedule
method, of which there are a few overloads. The Schedule(Action)
method schedules work to be done immediately (or at the earliest time possible).
The Schedule(DateTimeOffset, Action)
method schedules work to be done at a specified time, and the Schedule(TimeSpan, Action)
method schedules work to be done after a certain amount of time.
When work is run immediately, or when it reaches its due time, the scheduler will run that work using the context defined by the scheduler type.
There are several concrete implementations of the IScheduler
interface in Rx:
Scheduler.Immediate
is the immediate scheduler. When you "schedule" work through this scheduler, it is in fact not scheduled, just executed immediately.
Scheduler.CurrentThread
will queue up any work to be executed on the thread that makes the original call.
Even if you don't supply a due time, the work will not happen immediately but will be queued up to execute after the current action has completed.
This is because this work must be executed on the current thread, which is currently in use by the caller action.
Scheduler.NewThread
will create a thread to use for the task. For example, if you have a long running operation, e.g. a timer that will continuously produce ticks.
It would make sense for this timer to run on a different thread to your main one, which may be otherwise occupied (with user input, for example).
This scheduler is ideal for long-running work, especially that which needs a dedicated thread, i.e. it will completely occupy whichever thread it is using.
Scheduler.TaskPool
schedules actions using the default Task Factory from the Task Programming Library (TPL). Creating threads is an expensive process, so re-use is often the best strategy.
This is especially true in the case of short running operations, where creating a new thread is unnecessary and extremely expensive.
The task pool scheduler is also optimised for multicore systems.
Finally, Scheduler.Threadpool
also uses the threadpool. This works in a very similar way to the task pool scheduler, but it is less optimised.
This should only be used in situations where Scheduler.TaskPool
is unavailable, which is the case in Silverlight 4 and .NET 3.5.
When you are subscribing to an observable, you can use the SubscribeOn
and ObserverOn
methods on an IObservable
.
These methods have an overload which takes an IScheduler
. When the SubscribeOn
method is used, when anything subscribes, the subscribe method will be run on the supplied scheduler.
Likewise, using ObserveOn
, the OnNext
invocations will be run using the corresponding scheduler.
In order to get the default scheduler that the platform is running on, you can use the concrete Scheduler.Default
implementation.
This retrieves the scheduler on which your application is based. In Rx this will use the thread pool scheduler by default (due to backwards compatibility with Rx v1).
So, another operator we need to touch on at this point is the Timer
operator. We will use this to demonstrate the timed grouping using the Buffer
/Window
operators.
The Timer operator
The Timer
operator has a few overloads. There is one which just produces an event after a certain amount of time.
There is also an overload which starts after a certain amount of time, and then periodically produces an event at a regular interval. It does this via scheduling a new "tick" to happen after a specified TimeSpan
.
So if you did:
IObservable timer = System.Reactive.Linq.Observable.Timer(TimeSpan.FromSeconds(0.5), TimeSpan.FromSeconds(1))
(The inclusion of the namespace here is due to the fact that I do not want to bring in the built in LINQ operators, would make this whole 3 month project somewhat redundant!) and subscribed to the output, then after 0.5 seconds you would be passed the value 0, and then every second after than you would be passed 1,2,3 etc.
Okay, so now we have all the information we need…
OnNext(The Buffer operator)
You use the buffer operator as follows:
IObservable<IList<long>> bufferedTimes = timer.Buffer(TimeSpan.FromSeconds(4));
This will split the items produced by the original source into four-minute windows.
So, when you pass a source through a buffer operator, it will return an IObservable<IList<TSource>>
where TSource
is the type of the underlying source (here TSource
is long
because the timer returns each "tick" as a long
). Each of these lists will contain all of the values produced by the original source within each time window.
This means that you are passed a new IList
(which will be fully populated) at the end of each timed period.
So, following our usual pattern…
static IObservable<IList<TSource>>(this IObservable<TSource> source, TimeSpan timeSpan)
{
return new BufferOperator(source, timeSpan);
}
Taking in an IObservable
of type TSource
, we are returned an IObservable<IList<TSource>>
.
The BufferOperator
then looks like this:
internal class BufferOperator<TSource> : IObservable<IList<TSource>>
{
private IObservable<TSource> source;
private TimeSpan timeSpan;
public BufferOperator(IObservable<TSource> source, TimeSpan timeSpan)
{
this.source = source;
this.timeSpan = timeSpan;
}
public IDisposable Subscribe(IObserver<IList<TSource>> observer)
{
var bufferer = new Bufferer(observer, this.timeSpan);
return this.source.Subscribe(bufferer);
}
...
With the Bufferer
as follows:
private class Bufferer : IObserver<TSource>
{
private readonly IObserver<IList<TSource>> observer;
private TimeSpan timeSpan;
private IList<TSource> currentList;
private IDisposable endOfWindowSchedulerWorkItem;
public Bufferer(IObserver<IList<TSource>> observer, TimeSpan timeSpan)
{
this.observer = observer;
this.timeSpan = timeSpan;
StartNewWindow();
}
...
When a new Bufferer
is created, the first window starts straight away.
This has an internal list which is added to whenever a new value is produced by the underlying source:
public void OnNext(TSource value)
{
this.currentList.Add(value);
}
The StartNewWindow
method looks like this:
private void StartNewWindow()
{
var scheduler = Scheduler.Default;
this.currentList = new List<TSource>();
this.endOfWindowSchedulerWorkItem = scheduler.Schedule(this.timeSpan, OnWindowCompleteDueTime);
}
This creates a new list for the current window (removing the old values in the process), which any new items will be added to.
It also gets the default scheduler that the application is using, and schedules the OnWindowCompleteDueTime
method to run, setting it to run after your specified time period.
This will mean that after an amount of time equal to the specified time span, the following method will be called:
private void OnWindowCompleteDueTime()
{
this.observer.OnNext(currentList);
this.StartNewWindow();
}
When a window finishes, the completed list for that window is passed to the listening observer, and a new window is started.
Notice that the endOfWindowSchedulerWorkItem
is saved. This is so that the following clean up can be done when the underlying source is completed (or errored):
public void OnCompleted()
{
CloseLastWindow();
this.observer.OnCompleted();
}
public void OnError(Exception error)
{
CloseLastWindow();
this.observer.OnError(error);
}
private void CloseLastWindow()
{
this.endOfWindowSchedulerWorkItem.Dispose();
this.observer.OnNext(currentList);
}
This disposes the work item left in the schedulers queue, so that it does not attempt to do more work once the source has already finished, and then passes the list for the last window onto the observer, before notifying it that it has finished.
So, when we subscribe (using an anonymous observer):
DateTime start = DateTime.Now;
int bufferNumber = 1;
bufferedTimes.Subscribe(ticks =>
{
int thisBuffer = bufferNumber++;
Console.WriteLine($"TIME PASSED: {(int)(DateTime.Now - start).TotalSeconds} seconds");
Console.WriteLine($"New window: {thisBuffer}");
foreach (var tick in ticks)
{
Console.WriteLine($"tick {tick}");
}
);
We will be passed a IList<long>
(ticks) which is passed through out lambda, and each tick is written out to the console. The output will be as follows:
With each window appearing all at once, at the end of each 4 second time period.
OnNext(The Window operator)
The window operator is very similar, but instead of lists, we are working with IObservable<IObservable<TSource>>
.
static IObservable<IObservable<TSource>> Window(this IObservable<TSource> source, TimeSpan timeSpan)
{
return new WindowOperator(source, timeSpan);
}
Then, in the same way to Buffer, we have:
internal class WindowOperator<TSource> : IObservable<IObservable<TSource>>
{
private IObservable<TSource> source;
private TimeSpan timeSpan;
public WindowOperator(IObservable<TSource> source, TimeSpan timeSpan)
{
this.source = source;
this.timeSpan = timeSpan;
}
public IDisposable Subscribe(IObserver<IObservable<TSource>> observer)
{
var bufferer = new Windower(observer, this.timeSpan);
return this.source.Subscribe(bufferer);
}
private class Windower : IObserver<TSource>
{
private readonly IObserver<IObservable<TSource>> observer;
private TimeSpan timeSpan;
private Subject<TSource> currentWindow;
private IDisposable endOfWindowSchedulerWorkItem;
public Windower(IObserver<IObservable<TSource>> observer, TimeSpan timeSpan)
{
this.observer = observer;
this.timeSpan = timeSpan;
StartNewWindow();
}
...
But the StartNewWindow
method is as follows:
private void StartNewWindow()
{
var scheduler = Scheduler.Default;
this.currentWindow = new Subject<TSource>();
this.observer.OnNext(currentWindow);
this.endOfWindowSchedulerWorkItem = scheduler.Schedule(this.timeSpan, OnWindowCompleteDueTime);
}
Instead of a new list, this starts a new Subject<TSource>
. This is a new observable. This could be replaced by your own implementation of IObservable<TSource>
, but here we've just used the predefined Subject
class, defined as part of the System.Reactive
NuGet package. This also has the advantage that it will manage any observers subscribed to each window for us (see my blog on the basics of Rx for details).
This is then passed straight the the OnNext
of the listening observer. This is the main difference between buffer and window. With buffer, you are passed fully complete lists at the end of each time period, which you can then just read the values from. With window you are passed a new observable at the start of each window, which you then subscribe to in order to be passed the values.
Finally, a work item is scheduled to close the window after the given time period. The OnWindowCompletedDueTime
method then looks like this:
private void OnWindowCompleteDueTime()
{
this.currentWindow.OnCompleted();
this.StartNewWindow();
}
This calls OnCompleted
on the subject, which will notify any observers subscribed to the window that the window has completed. It then starts a new window.
Finally, we do our clean up:
public void OnCompleted()
{
CloseLastWindow();
this.observer.OnCompleted();
}
public void OnError(Exception error)
{
CloseLastWindow();
this.observer.OnError(error);
}
private void CloseLastWindow()
{
this.currentWindow.OnCompleted();
this.endOfWindowSchedulerWorkItem.Dispose();
}
When the underlying source completes, or errors, it will complete the current window (here the last window is completed not errored because, as discussed last time, we only want the error to appear once). The observer subscribed to the overall operator is then notified that the source has finished.
Then, If we do the same as before:
IObservable<IObservable<long>> windowedTimes = timer.Window(TimeSpan.FromSeconds(4));
DateTime start = DateTime.Now;
int windowNumber = 1;
windowedTimes.Subscribe(w =>
{
int thisWindow = windowNumber++;
Console.WriteLine($"TIME PASSED: {(int)(DateTime.Now - start).TotalSeconds} seconds");
Console.WriteLine($"New window: {thisWindow}");
w.Subscribe(tick =>
{
Console.WriteLine($"{(int)(DateTime.Now - start).TotalSeconds} seconds: tick {tick}");
}
);
});
Here we are subscribing a first observer to the IObservable<IObservable<TSource>>
that is returned by the window operator. This observer then subscribes an inner observer to each individual window in order to read the values out of each window.
Then the output will be:
With the first window being passed to our overall window observer straight way, and each value being passed into our inner observer as they are raised by the underlying timer source. This can be useful if you have large windows and would like to get started on processing as soon as the events are available.
However, if you need to window to complete before analysis (e.g. if you want to count the number of events) there is no advantage of using window over buffer, and it adds unnecessary complexity.
And there we have it, the timed versions of the Window
and Buffer
operators!
Here's me, not entirely prepared for the windows flying in my general direction... It wouldn't take a very small TimeSpan
to defeat my catching skills... Especially if half my brain is focused on avoiding the unfortunate typos that were almost inevitable in this post!
Now, I shouldn't really tell you this, as the source of these blog posts I really have no responsibility to give you any more information... But I think we may be swiftly approaching the end of this deep dive!
But for now, until the next OnNext
call!