So, this week we are looking at the
Window Rx operators. (If you have no idea what I'm on about, I suggest you start at the beginning!)
There are a few different implementations of these operators, and we are going to focus on the time-based versions. In order to do this, we need to talk about
In Rx there are several different schedulers which can be used to invoke actions (or queue up work). Depending on the scheduler, that work will be run using the context defined by that scheduler.
When you invoke work via a scheduler, you can either tell it to "run this immediately" or to run it at a specific time.
This is done via the
Schedule method, of which there are a few overloads. The
Schedule(Action) method schedules work to be done immediately (or at the earliest time possible).
Schedule(DateTimeOffset, Action) method schedules work to be done at a specified time, and the
Schedule(TimeSpan, Action) method schedules work to be done after a certain amount of time.
When work is run immediately, or when it reaches its due time, the scheduler will run that work using the context defined by the scheduler type.
There are several concrete implementations of the
IScheduler interface in Rx:
Scheduler.Immediate is the immediate scheduler. When you "schedule" work through this scheduler, it is in fact not scheduled, just executed immediately.
Scheduler.CurrentThread will queue up any work to be executed on the thread that makes the original call.
Even if you don't supply a due time, the work will not happen immediately but will be queued up to execute after the current action has completed.
This is because this work must be executed on the current thread, which is currently in use by the caller action.
Scheduler.NewThread will create a thread to use for the task. For example, if you have a long running operation, e.g. a timer that will continuously produce ticks.
It would make sense for this timer to run on a different thread to your main one, which may be otherwise occupied (with user input, for example).
This scheduler is ideal for long-running work, especially that which needs a dedicated thread, i.e. it will completely occupy whichever thread it is using.
Scheduler.TaskPool schedules actions using the default Task Factory from the Task Programming Library (TPL). Creating threads is an expensive process, so re-use is often the best strategy.
This is especially true in the case of short running operations, where creating a new thread is unnecessary and extremely expensive.
The task pool scheduler is also optimised for multicore systems.
Scheduler.Threadpool also uses the threadpool. This works in a very similar way to the task pool scheduler, but it is less optimised.
This should only be used in situations where
Scheduler.TaskPool is unavailable, which is the case in Silverlight 4 and .NET 3.5.
When you are subscribing to an observable, you can use the
ObserverOn methods on an
These methods have an overload which takes an
IScheduler. When the
SubscribeOn method is used, when anything subscribes, the subscribe method will be run on the supplied scheduler.
OnNext invocations will be run using the corresponding scheduler.
In order to get the default scheduler that the platform is running on, you can use the concrete
This retrieves the scheduler on which your application is based. In Rx this will use the thread pool scheduler by default (due to backwards compatibility with Rx v1).
So, another operator we need to touch on at this point is the
Timer operator. We will use this to demonstrate the timed grouping using the
The Timer operator
Timer operator has a few overloads. There is one which just produces an event after a certain amount of time.
There is also an overload which starts after a certain amount of time, and then periodically produces an event at a regular interval. It does this via scheduling a new "tick" to happen after a specified
So if you did:
IObservable timer = System.Reactive.Linq.Observable.Timer
(The inclusion of the namespace here is due to the fact that I do not want to bring in the built in LINQ operators, would make this whole 3 month project somewhat redundant!) and subscribed to the output, then after 0.5 seconds you would be passed the value 0, and then every second after than you would be passed 1,2,3 etc.
Okay, so now we have all the information we need…
OnNext(The Buffer operator)
You use the buffer operator as follows:
IObservable<IList<long>> bufferedTimes = timer.Buffer(TimeSpan.FromSeconds(4));
This will split the items produced by the original source into four-minute windows.
So, when you pass a source through a buffer operator, it will return an
TSource is the type of the underlying source (here
long because the timer returns each "tick" as a
long). Each of these lists will contain all of the values produced by the original source within each time window.
This means that you are passed a new
IList (which will be fully populated) at the end of each timed period.
So, following our usual pattern…
Taking in an
IObservable of type
TSource, we are returned an
BufferOperator then looks like this:
Bufferer as follows:
When a new
Bufferer is created, the first window starts straight away.
This has an internal list which is added to whenever a new value is produced by the underlying source:
StartNewWindow method looks like this:
This creates a new list for the current window (removing the old values in the process), which any new items will be added to.
It also gets the default scheduler that the application is using, and schedules the
OnWindowCompleteDueTime method to run, setting it to run after your specified time period.
This will mean that after an amount of time equal to the specified time span, the following method will be called:
When a window finishes, the completed list for that window is passed to the listening observer, and a new window is started.
Notice that the
endOfWindowSchedulerWorkItem is saved. This is so that the following clean up can be done when the underlying source is completed (or errored):
This disposes the work item left in the schedulers queue, so that it does not attempt to do more work once the source has already finished, and then passes the list for the last window onto the observer, before notifying it that it has finished.
So, when we subscribe (using an anonymous observer):
We will be passed a
IList<long> (ticks) which is passed through out lambda, and each tick is written out to the console. The output will be as follows:
With each window appearing all at once, at the end of each 4 second time period.
OnNext(The Window operator)
The window operator is very similar, but instead of lists, we are working with
Then, in the same way to Buffer, we have:
StartNewWindow method is as follows:
Instead of a new list, this starts a new
Subject<TSource>. This is a new observable. This could be replaced by your own implementation of
IObservable<TSource>, but here we've just used the predefined
Subject class, defined as part of the
System.Reactive NuGet package. This also has the advantage that it will manage any observers subscribed to each window for us (see my blog on the basics of Rx for details).
This is then passed straight the the
OnNext of the listening observer. This is the main difference between buffer and window. With buffer, you are passed fully complete lists at the end of each time period, which you can then just read the values from. With window you are passed a new observable at the start of each window, which you then subscribe to in order to be passed the values.
Finally, a work item is scheduled to close the window after the given time period. The
OnWindowCompletedDueTime method then looks like this:
OnCompleted on the subject, which will notify any observers subscribed to the window that the window has completed. It then starts a new window.
Finally, we do our clean up:
When the underlying source completes, or errors, it will complete the current window (here the last window is completed not errored because, as discussed last time, we only want the error to appear once). The observer subscribed to the overall operator is then notified that the source has finished.
Then, If we do the same as before:
Here we are subscribing a first observer to the
IObservable<IObservable<TSource>> that is returned by the window operator. This observer then subscribes an inner observer to each individual window in order to read the values out of each window.
Then the output will be:
With the first window being passed to our overall window observer straight way, and each value being passed into our inner observer as they are raised by the underlying timer source. This can be useful if you have large windows and would like to get started on processing as soon as the events are available.
However, if you need to window to complete before analysis (e.g. if you want to count the number of events) there is no advantage of using window over buffer, and it adds unnecessary complexity.
And there we have it, the timed versions of the
Here's me, not entirely prepared for the windows flying in my general direction... It wouldn't take a very small
TimeSpan to defeat my catching skills... Especially if half my brain is focused on avoiding the unfortunate typos that were almost inevitable in this post!
Now, I shouldn't really tell you this, as the source of these blog posts I really have no responsibility to give you any more information... But I think we may be swiftly approaching the end of this deep dive!
But for now, until the next