Skip to content
Howard van Rooijen By Howard van Rooijen Co-Founder
Observe File System Changes with Reactive Extensions for .NET

TLDR; FileSystemWatcher has a classic .NET event handler based API. This can make it difficult to write compensating logic to deal with some of the idiosyncratic behaviours that it (and the underlying file system) can exhibit. Thankfully the Reactive Extensions for .NET (Rx .NET) provides powerful APIs which can convert these events into an observable event stream which can easily be filtered and manipulated to provide a more modern, high-level and easy to work with API.

Note: FileSystemWatcher has a number of known issues and (cross platform) limitations. You should be aware of these before using it in a production system. Our use case is within a CLI tool we use for doing Static Site Generation, watching for changes before triggering a cache invalidation, and we've yet to come across any of the issues mentioned above. YMMV.

When we redesigned endjin.com in 2020 we wanted to simplify our web stack. The site was a combination of an ASP.NET Core app for the main site, and a WordPress site for our blog. Our site has a lot of content (1000+ pages), the ASP.NET Core app was clunky for content editing, and WordPress needed constant maintenance to deal with the daily automated exploit based attacks.

I'm not a fan of the NPM ecosystem, and its endless package dependency (software supply chain security) nightmare, so we built a custom .NET Static Site Generator to suit our specific needs. It's packaged as a .NET Global Tool for ease of use (by a human or inside a GitHub Action), and allows users to focus on writing good content in Markdown, and the SSG gets out of the way.

Although the tool is very fast, multiplying the duration of a set of complex operations by 1000 will result in a generation process that takes many 10s of seconds. The more we used the tool to build the site, the more we wanted to improve the inner dev loop. Rather than regenerating the entire site every time a file changed, we wanted to be able to regenerate just the pages affected by the change.

We created a caching graph, which allows us to work out the dependency tree of pages affected by a content change, which we can use to build an optimised execution plan. We then needed a way to trigger the cache invalidation process when a file changed. We needed to build the equivalent of a "watch" command.

This is where the fun and games dealing with the idiosyncrasies of file system operations began.

If you save a single file, the FileSystemWatcher will raise a Changed event:

c:\endjin-com\content\blog\howard.vanrooijen\rxfs.md - Changed

If you rename a file, the FileSystemWatcher will raise a Renamed event for the file and then Changed event at the directory level:

c:\endjin-com\content\blog\howard.vanrooijen\rxfs2.md - Renamed
c:\endjin-com\content\blog\howard.vanrooijen - Changed

If you perform a "Save All" operation in VS Code, you'll get a Changed event for every file that was saved:

c:\endjin-com\content\blog\ian.griffiths\csharp-11-parameter-null-check.md - Changed
c:\endjin-com\content\blog\ian.griffiths\csharp-11-pattern-matching-span-char.md - Changed
c:\endjin-com\content\blog\ian.griffiths\csharp-11-raw-string-literals.md - Changed
c:\endjin-com\content\blog\ian.griffiths\csharp-11-string-interpolation-newline.md - Changed
c:\endjin-com\content\blog\ian.griffiths\csharp-11-utf8-string-literals.md - Changed

But this stampede of events can occur over a seemingly random period of time (I've seen between 75-1250ms between events), which could be due to a number of factors outside of your control: VS Code, the OS dealing with other tasks, the file system buffer, real time virus scanning, the speed of the SSD or HDD, and possibly a hundred other reasons hidden from view. The FileSystemWatcher has a "classic" event-pattern based API, which does not let you model compensating behaviours, like waiting and buffering, very easily.

So I nerd sniped Ian Griffiths with this problem and he came up with an elegant solution. Using the Reactive Extensions for .NET (Rx .NET) and LINQ to create a Quiescent<T> extension method that would buffer events until a period of inactivity occurred, and then emit a single event containing a collection of all the events that occurred before the period of inactivity.

An Rx marble diagram illustrating two observables. The first is labelled 'source', and it shows six events, labelled numerically. These fall into three groups: events 1 and 2 occur close together, and are followed by a gap. Then events 3, 4, and 5 are close together. And then after another gap event 6 occurs, not close to any. The second observable is labelled 'source.Quiescent(TimeSpan.FromSeconds(2), Scheduler.Default)'. It shows three events. The first is labelled '1, 2', and its horizontal position shows that it occurs a little bit after the '2' event on the 'source' observable. The second event on the second observable is labelled '3,4,5' and occurs a bit after the '5' event on the 'source' observable. The third event from on the second observable is labelled '6', and occurs a bit after the '6' event on the 'source' observable. The image conveys the idea that each time the source produces some events and then stops, the second observable will produce an event shortly after the source stops, which will contain a list with all of the events from the source's most recent burst of activity.

This scenario is sufficiently complex and interesting that Ian covers it in some depth in the Key Types: LINQ Operators and Composition chapter of his excellent FREE book Introduction to Reactive Extensions for .NET. I won't rehash the details here, but will instead skip to the solution.

public static IObservable<IList<T>> Quiescent<T>(this IObservable<T> src, TimeSpan minimumInactivityPeriod, IScheduler scheduler)
{
    IObservable<int> onoffs =
        from _ in src
        from delta in
            Observable.Return(1, scheduler)
                .Concat(Observable.Return(-1, scheduler)
                    .Delay(minimumInactivityPeriod, scheduler))
        select delta;

    IObservable<int> outstanding = onoffs.Scan(0, (total, delta) => total + delta);
    IObservable<int> zeroCrossings = outstanding.Where(total => total == 0);

    return src.Buffer(zeroCrossings);
}

public static IObservable<IList<T>> Quiescent<T>(this IObservable<T> src, TimeSpan minimumInactivityPeriod)
{
    return Quiescent(src, minimumInactivityPeriod, DefaultScheduler.Instance);
}

The method overload simply provides a sensible default for scheduling.

While Quiescent solves the sporadic stampede of events, we still need to create a higher-level API that allows us to specify which directories we want to watch and be notified about when any changes occur. In our CMS the Content directory contains the Markdown files that make up the site, the Taxonomy directory contains the YAML files which define the site's structure and which content is included on which pages, and the Theme directory contains the Razor, CSS and JavaScript files that define the site's theme and interactivity.

IObservable<IEnumerable<FileSystemEventArgs>> changes = RxFs.ObserveUniqueChanges(
[
    new(@"C:\website\endjin-com\Content"),
    new(@"C:\website\endjin-com\Taxonomy"),
    new(@"C:\website\endjin-com\Theme")
]);

changes.Subscribe(x =>
{
    foreach (FileSystemEventArgs fileSystemEventArgs in x)
    {
        Console.WriteLine($"{fileSystemEventArgs.FullPath} - {fileSystemEventArgs.ChangeType}");
    }
});

Console.ReadKey();

This API supports watching a number of directories where common editing scenarios may occur; such as making a change to a markdown file, a CSS file, and a taxonomy file, and then performing a Save All command which updates multiple files in a single batch operation.

The RxFs.ObserveUniqueChanges method deals with all the underlying complexity and will just emit a single batch of distinct events containing information about the changes, which, in our case, can then be passed into the cache invalidation process.

ObserveUniqueChanges calls ObserveFileSystem for each directory passed in, and then merges all the returned observables into a single observable stream. It then calls Quiescent on the merged observable, which will buffer events until the specified period of inactivity (1 second in this example) elapses, and then emit a single event containing all the events that occurred before the period of inactivity.

Finally, we move from the world of Rx to the world of LINQ using LINQ's DistinctBy operator on the buffered events to ensure that only unique events are emitted. The composability of Rx and LINQ is incredibly powerful and allows us to build complex event processing pipelines with very little code.

public static IObservable<IEnumerable<FileSystemEventArgs>> ObserveUniqueChanges(IEnumerable<DirectoryPath> directoryPaths)
{
    IObservable<FileSystemEventArgs> observables = directoryPaths.Select(directoryPath => ObserveFileSystem(directoryPath, "*.*", true)).Merge();
    return observables.Quiescent(TimeSpan.FromSeconds(1)).Select(changes => changes.DistinctBy(x => (x.ChangeType, x.FullPath)));
}

ObserveFileSystem creates a lazy (using Observable.Defer) FileSystemWatcher for the specified directory and applies a filter if supplied, and then uses Rx's FromEventPattern to convert the FileSystemWatcher "classic" events into an IObservable<T> based event stream. The Finally ensures that the FileSystemWatcher is disposed when the observable is unsubscribed from. The Publish and RefCount combination ensures that multiple subscribers will share a single FileSystemWatcher, but that it gets disposed if and when all subscribers unsubscribe.

public static IObservable<FileSystemEventArgs> ObserveFileSystem(DirectoryPath directoryPath, string filter, bool includeSubdirectories)
{
    return
        // Observable.Defer enables us to avoid doing any work
        // until we have a subscriber.
        Observable.Defer(() =>
        {
            FileSystemWatcher fsw = string.IsNullOrEmpty(filter)
                ? new(directoryPath.FullPath)
                : new(directoryPath.FullPath, filter);
            fsw.EnableRaisingEvents = true;
            fsw.IncludeSubdirectories = includeSubdirectories;

            return Observable.Return(fsw);
        })
        // Once the preceding part emits the FileSystemWatcher
        // (which will happen when someone first subscribes), we
        // want to wrap all the events as IObservable<T>s, for which
        // we'll use a projection. To avoid ending up with an
        // IObservable<IObservable<FileSystemEventArgs>>, we use
        // SelectMany, which effectively flattens it by one level.
        .SelectMany(fsw =>
            Observable.Merge(new[]
            {
                Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                    h => fsw.Created += h, h => fsw.Created -= h),
                Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                    h => fsw.Changed += h, h => fsw.Changed -= h),
                Observable.FromEventPattern<RenamedEventHandler, FileSystemEventArgs>(
                    h => fsw.Renamed += h, h => fsw.Renamed -= h),
                Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                    h => fsw.Deleted += h, h => fsw.Deleted -= h)
            })
            // FromEventPattern supplies both the sender and the event
            // args. Extract just the latter.
            .Select(ep => ep.EventArgs)
            // The Finally here ensures the watcher gets shut down once
            // we have no subscribers.
            .Finally(fsw.Dispose))
        // This combination of Publish and RefCount means that multiple
        // subscribers will get to share a single FileSystemWatcher,
        // but that it gets shut down if all subscribers unsubscribe.
        .Publish()
        .RefCount();
}

Finally, the complete code listing:

public static class RxFs
{
    public static IObservable<FileSystemEventArgs> ObserveFileSystem(DirectoryPath directoryPath, string filter, bool includeSubdirectories)
    {
        return
            // Observable.Defer enables us to avoid doing any work
            // until we have a subscriber.
            Observable.Defer(() =>
                {
                    FileSystemWatcher fsw = string.IsNullOrEmpty(filter)
                        ? new(directoryPath.FullPath)
                        : new(directoryPath.FullPath, filter);
                    fsw.EnableRaisingEvents = true;
                    fsw.IncludeSubdirectories = includeSubdirectories;

                    return Observable.Return(fsw);
                })
                // Once the preceding part emits the FileSystemWatcher
                // (which will happen when someone first subscribes), we
                // want to wrap all the events as IObservable<T>s, for which
                // we'll use a projection. To avoid ending up with an
                // IObservable<IObservable<FileSystemEventArgs>>, we use
                // SelectMany, which effectively flattens it by one level.
                .SelectMany(fsw =>
                    Observable.Merge(new[]
                        {
                            Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                                h => fsw.Created += h, h => fsw.Created -= h),
                            Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                                h => fsw.Changed += h, h => fsw.Changed -= h),
                            Observable.FromEventPattern<RenamedEventHandler, FileSystemEventArgs>(
                                h => fsw.Renamed += h, h => fsw.Renamed -= h),
                            Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
                                h => fsw.Deleted += h, h => fsw.Deleted -= h)
                        })
                        // FromEventPattern supplies both the sender and the event
                        // args. Extract just the latter.
                        .Select(ep => ep.EventArgs)
                        // The Finally here ensures the watcher gets shut down once
                        // we have no subscribers.
                        .Finally(fsw.Dispose))
                // This combination of Publish and RefCount means that multiple
                // subscribers will get to share a single FileSystemWatcher,
                // but that it gets shut down if all subscribers unsubscribe.
                .Publish()
                .RefCount();
    }

    public static IObservable<IList<T>> Quiescent<T>(this IObservable<T> src, TimeSpan minimumInactivityPeriod)
    {
        return Quiescent(src, minimumInactivityPeriod, DefaultScheduler.Instance);
    }

    public static IObservable<IList<T>> Quiescent<T>(this IObservable<T> src, TimeSpan minimumInactivityPeriod, IScheduler scheduler)
    {
        IObservable<int> onoffs =
            from _ in src
            from delta in
                Observable.Return(1, scheduler)
                    .Concat(Observable.Return(-1, scheduler)
                        .Delay(minimumInactivityPeriod, scheduler))
            select delta;

        IObservable<int> outstanding = onoffs.Scan(0, (total, delta) => total + delta);
        IObservable<int> zeroCrossings = outstanding.Where(total => total == 0);

        return src.Buffer(zeroCrossings);
    }

    public static IObservable<IEnumerable<FileSystemEventArgs>> ObserveUniqueChanges(IEnumerable<DirectoryPath> directoryPaths)
    {
        IObservable<FileSystemEventArgs> observables = directoryPaths.Select(directoryPath => ObserveFileSystem(directoryPath, "*.*", true)).Merge();
        return observables.Quiescent(TimeSpan.FromSeconds(1)).Select(changes => changes.DistinctBy(x => (x.ChangeType, x.FullPath)));
    }
}

IObservable<IEnumerable<FileSystemEventArgs>> changes = RxFs.ObserveUniqueChanges(
[
    new(@"C:\website\endjin-com\Content"),
    new(@"C:\website\endjin-com\Taxonomy"),
    new(@"C:\website\endjin-com\Theme")
]);

changes.Subscribe(x =>
{
    foreach (FileSystemEventArgs fileSystemEventArgs in x)
    {
        Console.WriteLine($"{fileSystemEventArgs.FullPath} - {fileSystemEventArgs.ChangeType}");
    }
});

Console.ReadKey();

Rx.NET not only provides a suite of inbuilt powerful operators, simple extensibility points for creating custom operators, but also provides APIs for wrapping classic event patterns, dealing with object lifetime management and concurrency which allows you to write complex compensating logic in a cleaner, clearer, more modern functional style.

I hope you find this code sample useful if you face a similar problem. If not I hope it inspires you to investigate Rx.NET - a good way to start is to read the FREE Introduction to Reactive Extensions for .NET book.

Howard van Rooijen

Co-Founder

Howard van Rooijen

Howard spent 10 years as a technology consultant helping some of the UK's best known organisations work smarter, before founding endjin in 2010. He's a Microsoft ScaleUp Mentor, and a Microsoft MVP for Azure and Developer Technologies, and helps small teams achieve big things using data, AI and Microsoft Azure.