Today let us look at the System.Reactive together. System.Reactive (Rx) treats events as a sequence of data arriving over time. Therefore, you can consider Rx as a LINQ to Events (based on IObservable<t>). The principal difference between observables and other LINQ providers is that Rx is a “push” model, meaning that queries define how the program reacts when events arrive.
We will cover how to use the System.Reactive extension to deal with events asynchronously. To use it, install the System.Reactive into your application from the NuGet package.
How to Convert .NET System.Reactive Events
/*
ProgressChanged event type: EventHandler
So it's easy to wrap it by FromeEventPattern: FromEventPattern
*/
var progress = new Progress();
IObservable> progressReports =
Observable.FromEventPattern(
handler => progress.ProgressChanged += handler,
handler => progress.ProgressChanged -= handler);
progressReports.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs));
/*
Elapsed event type is ElapsedEventHandler,
so the FromeEventPattern converts the EventHandler to ElapsedEventHandler
*/
var timer = new System.Timers.Timer(interval: 1000) { Enabled = true };
IObservable> ticks =
Observable.FromEventPattern(
handler => (s, a) => handler(s, a),
handler => timer.Elapsed += handler,
handler => timer.Elapsed -= handler);
ticks.Subscribe(data => Trace.WriteLine("OnNext: " + data.EventArgs.SignalTime));
Once events have been wrapped into an observable, OnNext will be called each time the event arrived. When dealing with AsyncCompletedEventArgs, any data or exception is passed along as data to OnNext. Therefore, you need to handle error data in OnNext. Checkout this example see how to deal with exceptions:
var client = new WebClient();
IObservable> downloadedStrings =
Observable.
FromEventPattern(client, nameof(WebClient.DownloadStringCompleted));
downloadedStrings.Subscribe(
data =>
{
var eventArgs = (DownloadStringCompletedEventArgs)data.EventArgs;
// exception check
if (eventArgs.Error != null)
Trace.WriteLine("OnNext: (Error) " + eventArgs.Error);
else
Trace.WriteLine("OnNext: " + eventArgs.Result);
},
ex => Trace.WriteLine("OnError: " + ex.ToString()),
() => Trace.WriteLine("OnCompleted"));
client.DownloadStringAsync(new Uri("http://invalid.example.com/"));
Sending Notifications to a Context
private void Button_Click(object sender, RoutedEventArgs e)
{
Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");
Observable.Interval(TimeSpan.FromSeconds(1))
.Subscribe(x => Trace.WriteLine(
$"Interval {x} on thread {Environment.CurrentManagedThreadId}"));
}
If you need to update a UI element, you can trigger notifications through ObserveOn and pass a synchronization context representing the UI thread:
private void Button_Click(object sender, RoutedEventArgs e)
{
SynchronizationContext uiContext = SynchronizationContext.Current;
Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");
Observable.Interval(TimeSpan.FromSeconds(1))
.ObserveOn(uiContext)
.Subscribe(x => Trace.WriteLine(
$"Interval {x} on thread {Environment.CurrentManagedThreadId}"));
}
Another common usage is moving off the UI thread when necessary like some CPU-intensive computation. All mouse moves are raised on the UI thread, so you can use ObserveOn to move those notifications to a threadpool thread, do the computation, and then move the result notifications back to the UI thread.
SynchronizationContext uiContext = SynchronizationContext.Current;
Trace.WriteLine($"UI thread is {Environment.CurrentManagedThreadId}");
Observable.FromEventPattern(
handler => (s, a) => handler(s, a),
handler => MouseMove += handler,
handler => MouseMove -= handler)
.Select(evt => evt.EventArgs.GetPosition(this))
.ObserveOn(Scheduler.Default)
.Select(position =>
{
// Complex calculation
Thread.Sleep(100);
var result = position.X + position.Y;
var thread = Environment.CurrentManagedThreadId;
Trace.WriteLine($"Calculated result {result} on thread {thread}");
return result;
})
.ObserveOn(uiContext)
.Subscribe(x => Trace.WriteLine(
$"Result {x} on thread {Environment.CurrentManagedThreadId}"));
Grouping Event Data with Window and Buffers
// Buffer example
Observable.Interval(TimeSpan.FromSeconds(1))
.Buffer(2)
.Subscribe(x => Trace.WriteLine(
$"{DateTime.Now.Second}: Got {x[0]} and {x[1]}"));
// Window example
Observable.Interval(TimeSpan.FromSeconds(1))
.Window(2)
.Subscribe(group =>
{
Trace.WriteLine($"{DateTime.Now.Second}: Starting new group");
group.Subscribe(
x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x}"),
() => Trace.WriteLine($"{DateTime.Now.Second}: Ending group"));
});
Taming Event Streams with Throttling and Sampling
The Throttle operator establishes a sliding timeout window. When an incoming event arrives, it resets the timeout window. When the timeout window expires, it publishes the last event value that arrived within the window.
private void Button_Click(object sender, RoutedEventArgs e)
{
Observable.FromEventPattern(
handler => (s, a) => handler(s, a),
handler => MouseMove += handler,
handler => MouseMove -= handler)
.Select(x => x.EventArgs.GetPosition(this))
.Throttle(TimeSpan.FromSeconds(1))
.Subscribe(x => Trace.WriteLine(
$"{DateTime.Now.Second}: Saw {x.X + x.Y}"));
}
Timeouts
private void Button_Click(object sender, RoutedEventArgs e)
{
Observable.FromEventPattern(
handler => (s, a) => handler(s, a),
handler => MouseMove += handler,
handler => MouseMove -= handler)
.Select(x => x.EventArgs.GetPosition(this))
.Timeout(TimeSpan.FromSeconds(1))
.Subscribe(
x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.X + x.Y}"),
ex => Trace.WriteLine(ex));
}
/*
output:
16: Saw 180
16: Saw 178
16: Saw 177
16: Saw 176
System.TimeoutException: The operation has timed out.
*/
Let’s change it a little bit to avoid the TimeoutException when the Timeout gets triggered. We can use the Timeout to overload that substituted a second stream when the timeout occurs.
private void Button_Click(object sender, RoutedEventArgs e)
{
IObservable clicks =
Observable.FromEventPattern(
handler => (s, a) => handler(s, a),
handler => MouseDown += handler,
handler => MouseDown -= handler)
.Select(x => x.EventArgs.GetPosition(this));
Observable.FromEventPattern(
handler => (s, a) => handler(s, a),
handler => MouseMove += handler,
handler => MouseMove -= handler)
.Select(x => x.EventArgs.GetPosition(this))
.Timeout(TimeSpan.FromSeconds(1), clicks)
.Subscribe(
x => Trace.WriteLine($"{DateTime.Now.Second}: Saw {x.X},{x.Y}"),
ex => Trace.WriteLine(ex));
}
/*
output:
49: Saw 95,39
49: Saw 94,39
49: Saw 94,38
49: Saw 94,37
53: Saw 130,141
55: Saw 469,4
*/
Hope you like this chapter. So far, we complete all types of concurrency basics. Next chapter I am going to talk about the Testing.
Reactive extension: https://github.com/dotnet/reactive
Here are other chapters:
Views: 216