I google around and found many articles or posts regarding concurrency or multithread in C#. However, those articles only cover small pieces of the iceberg. Most articles mention multithreading. In C#, the namespace, System.Threading, allows you to deal with multithreading development like this:
class Program
{
static void Main()
{
// initialize a new thread
var a = new Thread(FnA);
// execute of the function, fnA()
a.Start();
// implement multiple functions
var b = new Thread(FnB);
b.Start();
var c = new Thread(FnC);
c.Start();
Console.ReadKey();
}
static void FnA()
{
// do sth
}
static void FnB()
{
// do sth
}
static void FnC()
{
// do sth
}
}
Or using Task to deal with concurrency scenarios. For example:
using System;
using System.Threading;
using System.Threading.Tasks;
public class Example
{
public static void Main()
{
Thread.CurrentThread.Name = "Main";
// Create a task and supply a user delegate by using a lambda expression.
Task taskA = new Task( () => Console.WriteLine("Hello from taskA."));
// Start the task.
taskA.Start();
// Output a message from the calling thread.
Console.WriteLine("Hello from thread '{0}'.",
Thread.CurrentThread.Name);
taskA.Wait();
}
}
// The example displays output like the following:
// Hello from thread 'Main'.
// Hello from taskA.
The above example is from: Task-based asynchronous programming
However, concurrency is about the thread or task library in C# and Asynchronous Programming, Parallel Programming, Reactive Programming (Rx), and Dataflows.
Today, I will start with Concurrency Overview to talk about concurrency and use code examples of how to code with different concurrency programming.
Concurrency Types
What is Concurrency? Its concept is straightforward: Doing many things at a time! Like people mention multi-tasks. Therefore, pretty much we can say 99% of applications in the world can take advantage of concurrency to get benefits.
At the beginning of this article, as I mentioned, most developers think “multithreading” is concurrency (multithreading == concurrency). But they are not the same.
Multithreading: one of concurrency types uses multiple threads when running. If a developer use: new Thread(), like the above example. It’s over! new Thread() means using a low-level threading way which is less efficient than higher-level abstractions of multithreading.
Thread pool is a place where the multithreading lives. Because of that, we have another important type of concurrency: parallel processing.
Parallel processing: Dividing lots of work up among many threads and run them concurrently. Parallel programming takes maximum advantage of using multiple processor cores to use multithreading.
Asynchronous programming: is another type of concurrency and using the same threads to call applications to get results. It’s very important to modern applications, but not many developers are familiar with it in C#. The idea is an asynchronous operation will complete sometime later once it starts.
Reactive Programming: This type of concurrency is very close to asynchronous programming. The difference is it’s built on asynchronous events instead of asynchronous operations. Its concept is that applications react to events and are not limited to threads or abstract solutions.
Asynchronous Programming
There are two major benefits in Asynchronous Programming: It enables responsiveness, and it enables scalability. When using an application with asynchronous programming, the application can remain responsive to users while it’s running. And the application can take advantage of thread pool to use more threads.
In most modern asynchronous .NET applications, developers can use async and await to implement the asynchronous programming. When an async method returns a value, the type is Task<TResult>. But if it doesn’t return a value or “task-like” value, it will return a Task. When an async method returns multiple values, the returned type is IAsyncEnumerable<T> or IAsyncEnumerator<T>.
Warning: As mentioned earlier, please return Task instead of void when an async method doesn’t return a value. Therefore, please avoid async avoid method all the time!
Here is an asynchronous programming code example:
async Task DoSomethingAsync()
{
int value = 13;
// Asynchronously wait 1 second.
await Task.Delay(TimeSpan.FromSeconds(1));
value *= 2;
// Asynchronously wait 1 second.
await Task.Delay(TimeSpan.FromSeconds(1));
Trace.WriteLine(value);
}
In an async method, await breaks up several synchronous portions. When calling await, the method will be paused. A context will be captured when implementing await. The context is based on where you call the async method. For example, it could be the UI context if the method called is from a UI thread. If it’s a UI thread, it will pause UI since the await part is a synchronous portion. To avoid this situation, you can pass false for the continueOnCapturedContext parameter, the ConfigreAwait extension, to call a threadpool thread to resume the method. Example code:
async Task DoSomethingAsync()
{
int value = 13;
// Asynchronously wait 1 second.
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
value *= 2;
// Asynchronously wait 1 second.
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
Trace.WriteLine(value);
}
Tip: There is no hurt to call ConfigureAwait in your application. It’s good practice to resume the context whenever needed outside of the UI context.
Error Handling: For the async and await, using the regular try…catch is good enough!
async Task TrySomethingAsync()
{
try
{
await PossibleExceptionAsync();
}
catch (NotSupportedException ex)
{
LogException(ex);
throw;
}
}
The PossibleExceptionAsync method may throw a NotSupportedException. But the TrysomethingAsync method can catch it naturally.
In an async method, the exception is placed on its returned Task when it’s complete, if the async method throws an exception. Therefore, we can refactor the code above a little bit. We can just try catch the await Task. See the following code:
async Task TrySomethingAsync()
{
// The exception will end up on the Task, not thrown directly.
Task task = PossibleExceptionAsync();
try
{
// The Task's exception will be raised here, at the await.
await task;
}
catch (NotSupportedException ex)
{
LogException(ex);
throw;
}
}
The best practice is to allow use await to get the Task results returned. And try your best to avoid to use Task.Wait, Task<TResult>.Result, or GetAwaiter().GetResult(); Those methods could cause a deadlock. Fortunately, there are two ways to prevent the deadlock: 1) use ConfigureAwait(false) within WaitAsync; 2) use await the call to WaitAsync.
Parallel Programming
Parallel programming: whenever you have several computation works, which can be broken up into multiple independent chunks, you should use parallel programming. It has two types of parallelism: data parallelism and task parallelism. Data parallelism focus on processing each separate piece of data when you have a bunch of data. Task parallelism focus on a pool of work. Each piece of work is independent of others. See the following code – how to handle data parallelism:
void RotateMatrices(IEnumerable matrices, float degrees)
{
Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees));
}
// Another way to do it: Parallel LINQ (PLINQ) -- using AsParallel extension
IEnumerable PrimalityTest(IEnumerable values)
{
return values.AsParallel().Select(value => IsPrime(value));
}
Task parallelism: Let take a look at this method, Parallel.Invoke, to implement multiple tasks:
void ProcessArray(double[] array)
{
Parallel.Invoke(
() => ProcessPartialArray(array, 0, array.Length / 2),
() => ProcessPartialArray(array, array.Length / 2, array.Length)
);
}
void ProcessPartialArray(double[] array, int begin, int end)
{
// CPU-intensive processing...
}
Error handling: It’s similar with the Asynchronous programming. You can just throw exception in each task:
try
{
Parallel.Invoke(() => { throw new Exception(); },
() => { throw new Exception(); });
}
catch (AggregateException ex)
{
ex.Handle(exception =>
{
Trace.WriteLine(exception);
return true; // "handled"
});
}
Tip: Tasks should neither be too long, nor too short. If tasks are too long, you need to think about how to break them up into small task because the thread pool can not maximize its benefits. If tasks are too short, running those tasks will cost much performance in thread pool.
Reactive Programming (Rx)
Reactive programming: It’s more complicated than other concurrency types. You need to keep maintaining your code and keep up your skills. Reactive programming is like observable streams of subscription. The application needs to observe steam data to react.
The way how to code it is much like LINQ in C#. See the following example code:
Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Where(x => x.Value % 2 == 0)
.Select(x => x.Timestamp) // 1. select the Timestamp value
.Subscribe(x => Trace.WriteLine(x)); // writes it to the Subscribe
For subscriptions behaviors, it has two types: hot observables and cold observables. Hot observable: an events stream is always going on, and those events are lost if there are no subscribers when events come in. Code observable: incoming events are not incoming all the time. For example, a request of download event is a cold observable.
Error handling: see the following code:
Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Where(x => x.Value % 2 == 0)
.Select(x => x.Timestamp)
.Subscribe(x => Trace.WriteLine(x),
ex => Trace.WriteLine(ex));
Dataflows
TPL Dataflow: It’s a combination of asynchronous and parallel technologies. A dataflow block is the basic unit of a dataflow mesh. To summarize how it works: TPL dataflow creates all the blocks, and then link them together, and then start putting data in. I don’t have a good example here. But in the future chapter, we definitely will talk about it.
Let’s see the following code. It has dataflow blocks and try catch.
try
{
var multiplyBlock = new TransformBlock(item =>
{
if (item == 1)
throw new InvalidOperationException("Blech.");
return item * 2;
});
var subtractBlock = new TransformBlock(item => item - 2);
multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true });
multiplyBlock.Post(1);
subtractBlock.Completion.Wait();
}
catch (AggregateException exception)
{
AggregateException ex = exception.Flatten();
Trace.WriteLine(ex.InnerException);
}
Concurrent Applications Collections
There are two collection categories: concurrent collections and immutable collections. Concurrent collections: multiple threads can update ;the collections simultaneously safely. Immutable collections: Know as its name, this type collections cannot be modified. The only to change immutable collections is create new collections to replace old ones.
My knowledge is from this book: Concurrency In C# Cookbook – Asynchronous, Parallel, and Multithreaded Programming 2019
Please feel free to read online! It’s free!
Next chapter, I will cover Async Basics!
Views: 41