Understanding Rx: Making the interfaces, subscribing and other Subject<T>s finally click
This has been sitting in my "blogs" folder for some time, with little more in it than a first sentence about the fact that I haven't written this blog yet.
Whilst going over C#, reactive extensions have been one of the things that I have struggled to get my head around the most. I think the only way I am ever going to get around to writing it is if I just dive straight in… So here goes nothing!
The underlying concept behind Rx is the representation of objects as streams of information. I think that the easiest way to describe this is by using a comparison with collections. I assume anyone reading this is at least somewhat familiar with C#, otherwise you've chosen rather a bold place to begin your understanding… Though, if that is the case, I stand behind you!
So, in a "normal" collection, you have a set of values, and when you want to read the next one you go and ask for it. This is relatively easy to understand (and yes, it's patronising analogy time). Say someone's making a load of cupcakes, if you go and ask for one they will either hand you one or tell you they don't have any.
In Rx however, the collection will provide you with objects as and when it's ready. I.e. the cakemaker will throw a newly finished cake at you, and if you are paying attention you can probably deal with that accordingly (if you're better at catching than I am).
However, if you're not ready that's on you and the baker doesn't really care either way (at least with a "hot" observable – as newly baked cakes are likely to be). I'll admit it's a little bit more complicated than that, but that's the basic idea.
There are two interfaces underlying all of Rx: IObservable<T>
and IObserver<T>
.
An observable is a source. It has one method to implement, Subscribe(IObserver<T> observer)
.
When you call this method on an IObservable
, it is telling that observable that the given observer cares about anything is has to say.
When it has new data that is ready, it will call the OnNext(T item)
method of any listening observers to pass them the data.
Once it is finished producing data it will call their OnComplete()
methods, or if there is an error while data is being produced it will call OnError(Exception error)
on each observer that has subscribed.
Observers
An observer is something that is linked up to a push-notification-based data provider (observable). The provider will pass it data, and then tell it when it has finished passing data. It has three methods to implement: OnNext(T item)
, OnComplete()
and OnError(Exception error)
.
When it is passed an item via the OnNext
method, it generally does something with that item.
For example, if it is an IObserver<int>
, and all it wants to do is write each integer it is passed out to the console then the OnNext
method would look like this:
public void OnNext(int number)
{
Console.WriteLine(number);
}
The OnComplete()
method signifies to the observer that no more push based notifications will be arriving from the provider.
And finally, the OnError()
method notifies that there has been an error while providing the data.
A full observer can be created as follows:
private class MyObserver : IObserver<int>
{
public TaskCompletionSource<object> done = new TaskCompletionSource<object>();
public void OnCompleted()
{
done.SetResult(null);
}
public void OnError(Exception error)
{
Console.WriteLine("Oops: " + error);
done.SetException(error);
}
public void OnNext(int number)
{
Console.WriteLine(number);
}
}
This observer has a reference to a TaskCompletionSource
, this is used to signify whether the OnComplete()
method has been called (or OnError
, as if an error is thrown then the source will also stop sending data).
This is useful because you may want to wait for any observables to complete before e.g. letting the program exit.
An observer can also be created using the Subscribe method. This allows you to do the following:
var done = new TaskCompletionSource<object>();
anObservable.Subscribe(
number => { Console.WriteLine(number); },
ex =>
{
Console.WriteLine("Oops: " + ex);
done.SetException(ex);
},
() => { done.SetResult(null); });
await done.Task;
As you can see, the delegates are passed straight into the subscribe method (generally as lambdas).
This is essentially creating an anonymous observer for when you don't want to define a whole class.
Observables
There are two types of observable: hot or cold. I'm going to start with a hot observable as I think I find it easier to understand.
Now, before I fly into an example, there's something else which will make it much less complex: The Subject<T>
class. This doesn't ship with the .NET Framework and is available as part of the System.Reactive namespace Nuget package.
The subject class implements both the IObserver
interface and the IObservable
in interface. It has an internal list of observers, and when the OnNext()
, Complete()
or OnError()
methods are called, it calls them in turn on each of the observers.
For example:
private class MySubject<T> : IObservable<T>, IObserver<T>
{
private List<IObserver<T>> observers = new List<IObserver<T>>();
public void OnCompleted()
{
foreach (var observer in this.observers)
{
observer.OnCompleted();
}
}
public void OnError(Exception error)
{
foreach (var observer in this.observers)
{
observer.OnError(error);
}
}
public void OnNext(T value)
{
foreach (var observer in this.observers)
{
observer.OnNext(value);
}
}
Now, the same could be done with a simple list of observers, but there is something that complicates it a little…
public IDisposable Subscribe(IObserver<T> observer)
{
observers.Add(observer);
...
}
When an observer subscribes to an observable, it needs a way to unsubscribe.
To allow for this, the Subscribe method returns an IDisposable
, and when Dispose()
is called on that IDisposable
, the observer is unsubscribed from the source.
In the Subject
class, when an observer is subscribed to the observable, it creates a disposable which contains a reference to the observer that was used to create it, and the subject's list of observers.
When dispose is called on that disposable, it removes its observer from the subject's internal list.
private class MyDisposable : IDisposable
{
private readonly List<IObserver<T>> _observers;
private readonly IObserver<T> _observer;
private bool _disposed;
public MyDisposable(IObserver<T> observer, List<IObserver<T>> observers)
{
_observer = observer;
_observers = observers;
}
public void Dispose()
{
if (!_disposed)
{
_observers.Remove(_observer);
_disposed = true;
}
}
}
(The disposed Boolean is used here because Dispose()
methods must support being called multiple times.)
The subscribe method then looks like this:
public IDisposable Subscribe(IObserver<T> observer)
{
observers.Add(observer);
return new MyDisposable(observer, this.observers);
}
Overall, you subscribe to the subject and are returned a disposable.
Your observer has then been added to the subject's internal list of observers.
When you want to unsubscribe, you call dispose on the returned disposable and your observer is removed from the list.
So, how is it used? Essentially instead of your observable having a list of observers to notify, it instead contains a reference to a subject.
When a new observer subscribes to your observable, you just call the subscribe method on that subject and pass it that observer.
The observer is then in the subject's observer list. Then whenever you want to call OnNext
/OnCompleted
/OnError
on any listening observers, you just call it on the subject and it will notify them for you.
Finally, the complexities of unsubscribing via a disposable are all handled for you by the subject.
Hot Observables
So, say we have an observable which raises an event each time a file system is changed.
The observable references a subject which contains a list of observers which have subscribed to the observable. When an event is raised, it will run through the list of observers and call their OnNext()
methods, passing them the path of the file which raised the event.
private class MyHotObservable : IObservable<string>
{
private string folder;
private FileSystemWatcher fsw;
private Subject<string> subject = new Subject<string>();
public MyHotObservable(string folder)
{
this.folder = folder;
fsw = new FileSystemWatcher(folder);
fsw.Changed += HandleFileChanged;
fsw.EnableRaisingEvents = true;
}
public IDisposable Subscribe(IObserver<string> observer)
{
return this.subject.Subscribe(observer);
}
private void HandleFileChanged(object sender, FileSystemEventArgs e)
{
this.subject.OnNext(e.FullPath);
}
}
The HandleFileChanged()
method here has been set up as an event handler for file system changed events.
When an event is raised, the handler method will be called and the file path will be passed to any listening observers.
In this case the OnComplete()
and OnError()
methods are not implemented, but you can imagine that if the folder was, for example, deleted, then there would be no more events and OnComplete()
could be called on the observers. Similarly, if there was some error in processing a raised event, then the OnError()
method could be called.
Cold Observables
The other type of observable is a cold observable. In this case, the observable will not start producing data until something is listening. For cold observables, the subscribe method is the key piece.
The easiest way to create a cold observable is using the Observable.Create() method. Again, this doesn't ship with .NET and is part of the System.Reactive namespace.
This method takes a single delegate as an argument, which will be the subscribe method of the observable.
var coldObservable = Observable.Create<string>(async observer =>
{
using (var reader = new StreamReader(File.OpenRead(path)))
{
await Task.Delay(1000);
while (!reader.EndOfStream)
{
string line = await reader.ReadLineAsync();
if (line != null)
{
observer.OnNext(line);
}
}
}
});
In this example, when an observer subscribes to the observable, the reader begins reading from the file and passing the data to the observer's OnNext()
method.
Until someone subscribes, the method passed to the create method is not called.
When using the Create()
method, the OnComplete()
method of the observer is automatically called when the Subscribe()
method returns, and the OnError()
method is called if any exception is thrown.
So, while you could call OnComplete()
explicitly at the end of the method, it's not necessary.
The file could be processed as follows:
var done = new TaskCompletionSource<object>();
coldObservable.Subscribe(
line => { Console.WriteLine(line); },
ex =>
{
Console.WriteLine($"Oops! {ex}");
done.SetException(ex);
},
() =>
{
Console.WriteLine("Done!");
done.SetResult(null);
});
await done.Task;
In this way, no work is done until someone cares, no data is lost, and the subscribe method does the brunt of the work. This is much more similar to the normal enumerable approach.
However, imagine you were reading an extremely large (or infinitely long) file, then this would be a far better way of processing the data. Each line of data could be passed to the processor as and when it was read.
In summary...
This has been a brief intro to reactive extensions.
It is an extremely powerful tool, which can be used for many different things. Check out Richard's blog on event stream manipulation for an example! Also, here's a link to the reactive extensions GitHub repository, which also includes a link to the rx.net slack channel.
Look out for my next blog on integrating Rx with the TPL dataflow library! And, as I appear to have got through this whole post without the need for a doodlegram... Here's one of me when I finally finished this blog!