Observe File System Changes with Reactive Extensions for .NET
TLDR; FileSystemWatcher has a classic .NET event handler based API. This can make it difficult to write compensating logic to deal with some of the idiosyncratic behaviours that it (and the underlying file system) can exhibit. Thankfully the Reactive Extensions for .NET (Rx .NET) provides powerful APIs which can convert these events into an observable event stream which can easily be filtered and manipulated to provide a more modern, high-level and easy to work with API.
Note: FileSystemWatcher
has a number of known issues and (cross platform) limitations. You should be aware of these before using it in a production system. Our use case is within a CLI tool we use for doing Static Site Generation, watching for changes before triggering a cache invalidation, and we've yet to come across any of the issues mentioned above. YMMV.
When we redesigned endjin.com in 2020 we wanted to simplify our web stack. The site was a combination of an ASP.NET Core app for the main site, and a WordPress site for our blog. Our site has a lot of content (1000+ pages), the ASP.NET Core app was clunky for content editing, and WordPress needed constant maintenance to deal with the daily automated exploit based attacks.
I'm not a fan of the NPM ecosystem, and its endless package dependency (software supply chain security) nightmare, so we built a custom .NET Static Site Generator to suit our specific needs. It's packaged as a .NET Global Tool for ease of use (by a human or inside a GitHub Action), and allows users to focus on writing good content in Markdown, and the SSG gets out of the way.
Although the tool is very fast, multiplying the duration of a set of complex operations by 1000 will result in a generation process that takes many 10s of seconds. The more we used the tool to build the site, the more we wanted to improve the inner dev loop. Rather than regenerating the entire site every time a file changed, we wanted to be able to regenerate just the pages affected by the change.
We created a caching graph, which allows us to work out the dependency tree of pages affected by a content change, which we can use to build an optimised execution plan. We then needed a way to trigger the cache invalidation process when a file changed. We needed to build the equivalent of a "watch" command.
This is where the fun and games dealing with the idiosyncrasies of file system operations began.
If you save a single file, the FileSystemWatcher
will raise a Changed
event:
c:\endjin-com\content\blog\howard.vanrooijen\rxfs.md - Changed
If you rename a file, the FileSystemWatcher
will raise a Renamed
event for the file and then Changed
event at the directory level:
c:\endjin-com\content\blog\howard.vanrooijen\rxfs2.md - Renamed
c:\endjin-com\content\blog\howard.vanrooijen - Changed
If you perform a "Save All" operation in VS Code, you'll get a Changed
event for every file that was saved:
c:\endjin-com\content\blog\ian.griffiths\csharp-11-parameter-null-check.md - Changed
c:\endjin-com\content\blog\ian.griffiths\csharp-11-pattern-matching-span-char.md - Changed
c:\endjin-com\content\blog\ian.griffiths\csharp-11-raw-string-literals.md - Changed
c:\endjin-com\content\blog\ian.griffiths\csharp-11-string-interpolation-newline.md - Changed
c:\endjin-com\content\blog\ian.griffiths\csharp-11-utf8-string-literals.md - Changed
But this stampede of events can occur over a seemingly random period of time (I've seen between 75-1250ms between events), which could be due to a number of factors outside of your control: VS Code, the OS dealing with other tasks, the file system buffer, real time virus scanning, the speed of the SSD or HDD, and possibly a hundred other reasons hidden from view. The FileSystemWatcher
has a "classic" event-pattern based API, which does not let you model compensating behaviours, like waiting and buffering, very easily.
So I nerd sniped Ian Griffiths with this problem and he came up with an elegant solution. Using the Reactive Extensions for .NET (Rx .NET) and LINQ to create a Quiescent<T>
extension method that would buffer events until a period of inactivity occurred, and then emit a single event containing a collection of all the events that occurred before the period of inactivity.
This scenario is sufficiently complex and interesting that Ian covers it in some depth in the Key Types: LINQ Operators and Composition chapter of his excellent FREE book Introduction to Reactive Extensions for .NET. I won't rehash the details here, but will instead skip to the solution.
public static IObservable<IList<T>> Quiescent<T>(this IObservable<T> src, TimeSpan minimumInactivityPeriod, IScheduler scheduler)
{
IObservable<int> onoffs =
from _ in src
from delta in
Observable.Return(1, scheduler)
.Concat(Observable.Return(-1, scheduler)
.Delay(minimumInactivityPeriod, scheduler))
select delta;
IObservable<int> outstanding = onoffs.Scan(0, (total, delta) => total + delta);
IObservable<int> zeroCrossings = outstanding.Where(total => total == 0);
return src.Buffer(zeroCrossings);
}
public static IObservable<IList<T>> Quiescent<T>(this IObservable<T> src, TimeSpan minimumInactivityPeriod)
{
return Quiescent(src, minimumInactivityPeriod, DefaultScheduler.Instance);
}
The method overload simply provides a sensible default for scheduling.
While Quiescent
solves the sporadic stampede of events, we still need to create a higher-level API that allows us to specify which directories we want to watch and be notified about when any changes occur. In our CMS the Content
directory contains the Markdown files that make up the site, the Taxonomy
directory contains the YAML files which define the site's structure and which content is included on which pages, and the Theme
directory contains the Razor, CSS and JavaScript files that define the site's theme and interactivity.
IObservable<IEnumerable<FileSystemEventArgs>> changes = RxFs.ObserveUniqueChanges(
[
new(@"C:\website\endjin-com\Content"),
new(@"C:\website\endjin-com\Taxonomy"),
new(@"C:\website\endjin-com\Theme")
]);
changes.Subscribe(x =>
{
foreach (FileSystemEventArgs fileSystemEventArgs in x)
{
Console.WriteLine($"{fileSystemEventArgs.FullPath} - {fileSystemEventArgs.ChangeType}");
}
});
Console.ReadKey();
This API supports watching a number of directories where common editing scenarios may occur; such as making a change to a markdown file, a CSS file, and a taxonomy file, and then performing a Save All
command which updates multiple files in a single batch operation.
The RxFs.ObserveUniqueChanges
method deals with all the underlying complexity and will just emit a single batch of distinct events containing information about the changes, which, in our case, can then be passed into the cache invalidation process.
ObserveUniqueChanges
calls ObserveFileSystem
for each directory passed in, and then merges all the returned observables into a single observable stream. It then calls Quiescent
on the merged observable, which will buffer events until the specified period of inactivity (1 second in this example) elapses, and then emit a single event containing all the events that occurred before the period of inactivity.
Finally, we move from the world of Rx to the world of LINQ using LINQ's DistinctBy
operator on the buffered events to ensure that only unique events are emitted. The composability of Rx and LINQ is incredibly powerful and allows us to build complex event processing pipelines with very little code.
public static IObservable<IEnumerable<FileSystemEventArgs>> ObserveUniqueChanges(IEnumerable<DirectoryPath> directoryPaths)
{
IObservable<FileSystemEventArgs> observables = directoryPaths.Select(directoryPath => ObserveFileSystem(directoryPath, "*.*", true)).Merge();
return observables.Quiescent(TimeSpan.FromSeconds(1)).Select(changes => changes.DistinctBy(x => (x.ChangeType, x.FullPath)));
}
ObserveFileSystem
creates a lazy (using Observable.Defer
) FileSystemWatcher
for the specified directory and applies a filter if supplied, and then uses Rx's FromEventPattern
to convert the FileSystemWatcher
"classic" events into an IObservable<T>
based event stream. The Finally
ensures that the FileSystemWatcher
is disposed when the observable is unsubscribed from. The Publish
and RefCount
combination ensures that multiple subscribers will share a single FileSystemWatcher
, but that it gets disposed if and when all subscribers unsubscribe.
public static IObservable<FileSystemEventArgs> ObserveFileSystem(DirectoryPath directoryPath, string filter, bool includeSubdirectories)
{
return
// Observable.Defer enables us to avoid doing any work
// until we have a subscriber.
Observable.Defer(() =>
{
FileSystemWatcher fsw = string.IsNullOrEmpty(filter)
? new(directoryPath.FullPath)
: new(directoryPath.FullPath, filter);
fsw.EnableRaisingEvents = true;
fsw.IncludeSubdirectories = includeSubdirectories;
return Observable.Return(fsw);
})
// Once the preceding part emits the FileSystemWatcher
// (which will happen when someone first subscribes), we
// want to wrap all the events as IObservable<T>s, for which
// we'll use a projection. To avoid ending up with an
// IObservable<IObservable<FileSystemEventArgs>>, we use
// SelectMany, which effectively flattens it by one level.
.SelectMany(fsw =>
Observable.Merge(new[]
{
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Created += h, h => fsw.Created -= h),
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Changed += h, h => fsw.Changed -= h),
Observable.FromEventPattern<RenamedEventHandler, FileSystemEventArgs>(
h => fsw.Renamed += h, h => fsw.Renamed -= h),
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Deleted += h, h => fsw.Deleted -= h)
})
// FromEventPattern supplies both the sender and the event
// args. Extract just the latter.
.Select(ep => ep.EventArgs)
// The Finally here ensures the watcher gets shut down once
// we have no subscribers.
.Finally(fsw.Dispose))
// This combination of Publish and RefCount means that multiple
// subscribers will get to share a single FileSystemWatcher,
// but that it gets shut down if all subscribers unsubscribe.
.Publish()
.RefCount();
}
Finally, the complete code listing:
public static class RxFs
{
public static IObservable<FileSystemEventArgs> ObserveFileSystem(DirectoryPath directoryPath, string filter, bool includeSubdirectories)
{
return
// Observable.Defer enables us to avoid doing any work
// until we have a subscriber.
Observable.Defer(() =>
{
FileSystemWatcher fsw = string.IsNullOrEmpty(filter)
? new(directoryPath.FullPath)
: new(directoryPath.FullPath, filter);
fsw.EnableRaisingEvents = true;
fsw.IncludeSubdirectories = includeSubdirectories;
return Observable.Return(fsw);
})
// Once the preceding part emits the FileSystemWatcher
// (which will happen when someone first subscribes), we
// want to wrap all the events as IObservable<T>s, for which
// we'll use a projection. To avoid ending up with an
// IObservable<IObservable<FileSystemEventArgs>>, we use
// SelectMany, which effectively flattens it by one level.
.SelectMany(fsw =>
Observable.Merge(new[]
{
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Created += h, h => fsw.Created -= h),
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Changed += h, h => fsw.Changed -= h),
Observable.FromEventPattern<RenamedEventHandler, FileSystemEventArgs>(
h => fsw.Renamed += h, h => fsw.Renamed -= h),
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Deleted += h, h => fsw.Deleted -= h)
})
// FromEventPattern supplies both the sender and the event
// args. Extract just the latter.
.Select(ep => ep.EventArgs)
// The Finally here ensures the watcher gets shut down once
// we have no subscribers.
.Finally(fsw.Dispose))
// This combination of Publish and RefCount means that multiple
// subscribers will get to share a single FileSystemWatcher,
// but that it gets shut down if all subscribers unsubscribe.
.Publish()
.RefCount();
}
public static IObservable<IList<T>> Quiescent<T>(this IObservable<T> src, TimeSpan minimumInactivityPeriod)
{
return Quiescent(src, minimumInactivityPeriod, DefaultScheduler.Instance);
}
public static IObservable<IList<T>> Quiescent<T>(this IObservable<T> src, TimeSpan minimumInactivityPeriod, IScheduler scheduler)
{
IObservable<int> onoffs =
from _ in src
from delta in
Observable.Return(1, scheduler)
.Concat(Observable.Return(-1, scheduler)
.Delay(minimumInactivityPeriod, scheduler))
select delta;
IObservable<int> outstanding = onoffs.Scan(0, (total, delta) => total + delta);
IObservable<int> zeroCrossings = outstanding.Where(total => total == 0);
return src.Buffer(zeroCrossings);
}
public static IObservable<IEnumerable<FileSystemEventArgs>> ObserveUniqueChanges(IEnumerable<DirectoryPath> directoryPaths)
{
IObservable<FileSystemEventArgs> observables = directoryPaths.Select(directoryPath => ObserveFileSystem(directoryPath, "*.*", true)).Merge();
return observables.Quiescent(TimeSpan.FromSeconds(1)).Select(changes => changes.DistinctBy(x => (x.ChangeType, x.FullPath)));
}
}
IObservable<IEnumerable<FileSystemEventArgs>> changes = RxFs.ObserveUniqueChanges(
[
new(@"C:\website\endjin-com\Content"),
new(@"C:\website\endjin-com\Taxonomy"),
new(@"C:\website\endjin-com\Theme")
]);
changes.Subscribe(x =>
{
foreach (FileSystemEventArgs fileSystemEventArgs in x)
{
Console.WriteLine($"{fileSystemEventArgs.FullPath} - {fileSystemEventArgs.ChangeType}");
}
});
Console.ReadKey();
Rx.NET not only provides a suite of inbuilt powerful operators, simple extensibility points for creating custom operators, but also provides APIs for wrapping classic event patterns, dealing with object lifetime management and concurrency which allows you to write complex compensating logic in a cleaner, clearer, more modern functional style.
I hope you find this code sample useful if you face a similar problem. If not I hope it inspires you to investigate Rx.NET - a good way to start is to read the FREE Introduction to Reactive Extensions for .NET book.