Chapter 5 – Concurrency in C# – System.Reactive Basics

Today let us look at the System.Reactive together. System.Reactive (Rx) treats events as a sequence of data arriving over time. Therefore, you can consider Rx as a LINQ to Events (based on IObservable<t>). The principal difference between observables and other LINQ providers is that Rx is a “push” model, meaning that queries define how the program reacts when events arrive.

We will cover how to use the System.Reactive extension to deal with events asynchronously. To use it, install the System.Reactive into your application from the NuGet package.

How to Convert .NET System.Reactive Events

When you want to use the OnNext each time whenever the event is arrived to deal with some data, you have to convert whatever events to the System.Reactive.

FromEventPattern method is our solution. It works well with EventHandler<T>, and also we can use it on older types like ElapsedEventHandler. Let’s checkout some examples:
				
					/*
ProgressChanged event type: EventHandler<T>
So it's easy to wrap it by FromeEventPattern: FromEventPattern<int>
*/
var progress = new Progress<int>();
IObservable<EventPattern<int>> progressReports =
    Observable.FromEventPattern<int>(
        handler => progress.ProgressChanged += handler,
        handler => progress.ProgressChanged -= handler);
progressReports.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs));

/*
Elapsed event type is ElapsedEventHandler,
so the FromeEventPattern converts the EventHandler<ElapsedEventArgs> to ElapsedEventHandler
*/
var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };
IObservable<EventPattern<ElapsedEventArgs>> ticks =
    Observable.FromEventPattern<ElapsedEventHandler, ElapsedEventArgs>(
        handler => (s, a) => handler(s, a),
        handler => timer.Elapsed += handler,
        handler => timer.Elapsed -= handler);
ticks.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs.SignalTime));
				
			

Once events have been wrapped into an observable, OnNext will be called each time the event arrived. When dealing with AsyncCompletedEventArgs, any data or exception is passed along as data to OnNext. Therefore, you need to handle error data in OnNext. Checkout this example see how to deal with exceptions:

				
					var client = new WebClient();
IObservable<EventPattern<object>> downloadedStrings =
    Observable.
    FromEventPattern(client, nameof(WebClient.DownloadStringCompleted));
downloadedStrings.Subscribe(
    data =>
    {
        var eventArgs = (DownloadStringCompletedEventArgs)data.EventArgs;
        // exception check
        if (eventArgs.Error != null)
            Trace.WriteLine("OnNext: (Error) " + eventArgs.Error);
        else
            Trace.WriteLine("OnNext: " + eventArgs.Result);
    },
    ex => Trace.WriteLine("OnError: " + ex.ToString()),
    () => Trace.WriteLine("OnCompleted"));
client.DownloadStringAsync(new Uri("http://invalid.example.com/"));
				
			

Sending Notifications to a Context

System.Reactive raises notifications in whatever thread happens to be arrived. Regularly, the notification event can occupy UI thread. We can use a particular thread to raise a particular context. Think about the following example:
				
					private void Button_Click(object sender, RoutedEventArgs e)
{
    Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");
    Observable.Interval(TimeSpan.FromSeconds(1))
        .Subscribe(x => Trace.WriteLine(
            $"Interval {x} on thread {Environment.CurrentManagedThreadId}"));
}
				
			

If you need to update a UI element, you can trigger notifications through ObserveOn and pass a synchronization context representing the UI thread:

				
					private void Button_Click(object sender, RoutedEventArgs e)
{
    SynchronizationContext uiContext = SynchronizationContext.Current;
    Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");
    Observable.Interval(TimeSpan.FromSeconds(1))
        .ObserveOn(uiContext)
        .Subscribe(x => Trace.WriteLine(
            $"Interval {x} on thread {Environment.CurrentManagedThreadId}"));
}
				
			

Another common usage is moving off the UI thread when necessary like some CPU-intensive computation. All mouse moves are raised on the UI thread, so you can use ObserveOn to move those notifications to a threadpool thread, do the computation, and then move the result notifications back to the UI thread.

				
					SynchronizationContext uiContext = SynchronizationContext.Current;
Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");
Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
        handler => (s, a) => handler(s, a),
        handler => MouseMove += handler,
        handler => MouseMove -= handler)
    .Select(evt => evt.EventArgs.GetPosition(this))
    .ObserveOn(Scheduler.Default)
    .Select(position =>
    {
        // Complex calculation
        Thread.Sleep(100);
        var result = position.X + position.Y;
        var thread = Environment.CurrentManagedThreadId;
        Trace.WriteLine($"Calculated result {result} on thread {thread}");
        return result;
    })
    .ObserveOn(uiContext)
    .Subscribe(x => Trace.WriteLine(
        $"Result {x} on thread {Environment.CurrentManagedThreadId}"));
				
			

Grouping Event Data with Window and Buffers

In this section, let us talk about how to combine the incoming a sequence of events whenever they arrive. They System.Reactive has two ways to do it: Buffer and Window. Buffer will hold on to the incoming events until the group is complete, at which time it forwards them all at once as a collection of events. Window will logically group the incoming events but will pass them along as they arrive.
				
					// Buffer example
Observable.Interval(TimeSpan.FromSeconds(1))
    .Buffer(2)
    .Subscribe(x => Trace.WriteLine(
        $"{DateTime.Now.Second}: Got {x[0]} and {x[1]}"));

// Window example
Observable.Interval(TimeSpan.FromSeconds(1))
    .Window(2)
    .Subscribe(group =>
    {
        Trace.WriteLine($"{DateTime.Now.Second}: Starting new group");
        group.Subscribe(
            x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x}"),
            () => Trace.WriteLine($"{DateTime.Now.Second}: Ending group"));
    });
				
			

Taming Event Streams with Throttling and Sampling

A common problem with writing reactive code is when the events come in too quickly. A fast-moving stream of events can overwhelm your program’s processing.

The Throttle operator establishes a sliding timeout window. When an incoming event arrives, it resets the timeout window. When the timeout window expires, it publishes the last event value that arrived within the window.
				
					private void Button_Click(object sender, RoutedEventArgs e)
{
    Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
            handler => (s, a) => handler(s, a),
            handler => MouseMove += handler,
            handler => MouseMove -= handler)
        .Select(x => x.EventArgs.GetPosition(this))
        .Throttle(TimeSpan.FromSeconds(1))
        .Subscribe(x => Trace.WriteLine(
            $"{DateTime.Now.Second}: Saw {x.X + x.Y}"));
}
				
			

Timeouts

Sometime events cannot arrive in time. You need to setup the Timeout to response requests. The Timeout method resets the timeout window whenever a new event arrived. If the Timeout method gets triggered, the process will end an OnError notification containing a TimeoutException. Let’s take look at this example:
				
					private void Button_Click(object sender, RoutedEventArgs e)
{
    Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
            handler => (s, a) => handler(s, a),
            handler => MouseMove += handler,
            handler => MouseMove -= handler)
        .Select(x => x.EventArgs.GetPosition(this))
        .Timeout(TimeSpan.FromSeconds(1))
        .Subscribe(
            x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.X + x.Y}"),
            ex => Trace.WriteLine(ex));
}

/*
output:
16: Saw 180
16: Saw 178
16: Saw 177
16: Saw 176
System.TimeoutException: The operation has timed out.
*/
				
			

Let’s change it a little bit to avoid the TimeoutException when the Timeout gets triggered. We can use the Timeout to overload that substituted a second stream when the timeout occurs.

				
					private void Button_Click(object sender, RoutedEventArgs e)
{
    IObservable<Point> clicks =
        Observable.FromEventPattern<MouseButtonEventHandler, MouseButtonEventArgs>(
            handler => (s, a) => handler(s, a),
            handler => MouseDown += handler,
            handler => MouseDown -= handler)
        .Select(x => x.EventArgs.GetPosition(this));

    Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(
            handler => (s, a) => handler(s, a),
            handler => MouseMove += handler,
            handler => MouseMove -= handler)
        .Select(x => x.EventArgs.GetPosition(this))
        .Timeout(TimeSpan.FromSeconds(1), clicks)
        .Subscribe(
            x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.X},{x.Y}"),
            ex => Trace.WriteLine(ex));
}

/*
output:
49: Saw 95,39
49: Saw 94,39
49: Saw 94,38
49: Saw 94,37
53: Saw 130,141
55: Saw 469,4
*/
				
			

Hope you like this chapter. So far, we complete all types of concurrency basics. Next chapter I am going to talk about the Testing.

Reactive extension: https://github.com/dotnet/reactive

Here are other chapters:

Views: 216

Leave a Reply

Your email address will not be published. Required fields are marked *