Chapter 4 - Concurrency in C#- Parallel Basic

Parallel programming is used to split CPU-bound pieces of work and divide them into multiple threads. These parallel processing methods only consider CPU binding work.

This chapter we will talk about how to use the Parallel class to resolve some issues.

Parallel Processing of Data

Assume you need to manipulate same operation on every element of a data collection. For this scenario, we can use the Parallel type to invoke a ForEach method to resolve this problem. Please see the example code:
				
					void RotateMatrices(IEnumerable<Matrix> matrices, float degrees)
{
    Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees));
}
				
			

For some situations, you may want to stop the loop in the middle of the process. We can use the state to stop it.

				
					void InvertMatrices(IEnumerable<Matrix> matrices)
{
    Parallel.ForEach(matrices, (matrix, state) =>
    {
        if (!matrix.IsInvertible)
            state.Stop();
        else
            matrix.Invert();
    });
}
				
			

The state.Stop() is using ParallelLoopState.Stop to stop the loop and the process will jump out the loop body.

Another common situation is to cancel a parallel loop. The difference between cancelling a loop and stopping a loop is that stopping a loop is from inside the loop, and the cancelling a loop is from outside the loop. Here is an example of how to cancel a loop from outside.

				
					void RotateMatrices(IEnumerable<Matrix> matrices, float degrees, CancellationToken token)
{
    Parallel.ForEach(matrices,
    new ParallelOptions { CancellationToken = token },
    matrix => matrix.Rotate(degrees));
}
				
			

Parallel Aggregation

In this section, let’s talk about how to aggregate results from a parallel conclusion operation. The loop body can access this value directly without synchronization. When a loop is ready to aggregate each of its local results, it uses the localFinally delegate to do so.
				
					// Note: this is not the most efficient implementation.
// This is just an example of using a lock to protect shared state.
int ParallelSum(IEnumerable<int> values)
{
    object mutex = new object();
    int result = 0;
    Parallel.ForEach(source: values,
        localInit: () => 0, 
        body: (item, state, localValue) => localValue + item,
        localFinally: localValue =>
        {
            lock (mutex)
            result += localValue;
        });
    return result;
}
				
			

LINQ also supports the Parallel class and it’s more natural aggregation:

				
					int ParallelSum(IEnumerable<int> values)
{
    return values.AsParallel().Aggregate(
        seed: 0,
        func: (sum, item) => sum + item
    );
}
				
			

Parallel Invocation

In some scenarios, we need to call a number of methods, and those methods are not related to each other. Therefore, we can use invoke those methods in parallel.

The Parallel class has a method, named Invoke, that can resolve this type issue.
				
					void ProcessArray(double[] array)
{
    Parallel.Invoke(
        () => ProcessPartialArray(array, 0, array.Length / 2),
        () => ProcessPartialArray(array, array.Length / 2, array.Length)
    );
}
void ProcessPartialArray(double[] array, int begin, int end)
{
    // CPU-intensive processing...
}
				
			

Also, Parallel.Invoke supports cancellation:

				
					void DoAction20Times(Action action, CancellationToken token)
{
    Action[] actions = Enumerable.Repeat(action, 20).ToArray();
    Parallel.Invoke(new ParallelOptions { CancellationToken = token }, actions);
}
				
			

Dynamic Parallelism

The dynamic parallelism makes the Parallel more powerful. In some scenarios, you may need to use Parallel.Invoke or Parallel.ForEach in the middle of a methods. The data is dynamic generated.

For this scenario we can use the Task Parallel Library (TPL). For powerful Tasks, The Parallel class and Parallel LINQ are very convenient. When you need dynamic parallelism, it is easiest to use the Task type directly. Here is an example:

				
					void Traverse(Node current)
{
    DoExpensiveActionOnNode(current);
    if (current.Left != null)
    {
        Task.Factory.StartNew(
            () => Traverse(current.Left),
            CancellationToken.None,
            TaskCreationOptions.AttachedToParent,
            TaskScheduler.Default
        );
    }
    if (current.Right != null)
    {
        Task.Factory.StartNew(
            () => Traverse(current.Right),
            CancellationToken.None,
            TaskCreationOptions.AttachedToParent,
            TaskScheduler.Default
        );
    }
}
void ProcessTree(Node root)
{
    Task task = Task.Factory.StartNew(
        () => Traverse(root),
        CancellationToken.None,
        TaskCreationOptions.None,
        TaskScheduler.Default
    );
    task.Wait();
}
				
			

You can also use task continuations to schedule any task to run after another task without a parent/child situation.

				
					Task task = Task.Factory.StartNew(
    () => Thread.Sleep(TimeSpan.FromSeconds(2)),
    CancellationToken.None,
    TaskCreationOptions.None,
    TaskScheduler.Default
);

Task continuation = task.ContinueWith(
    t => Trace.WriteLine("Task is done"),
    CancellationToken.None,
    TaskContinuationOptions.None,
    TaskScheduler.Default
);
// The "t" argument to the continuation is the same as "task".
				
			

WARNING: Parallel processing with Tasks is quite different from asynchronous processing with tasks.

Parallel LINQ

Parallel LINQ also know as PLINQ. For some scenarios, you need to perform parallel processing on one data sequence to generate another data sequence or summarize that data. 

In streaming scenarios, PLINQ works well when you have a sequence of inputs and produce a sequence of outputs. Here are some example:
				
					IEnumerable<int> MultiplyBy2(IEnumerable<int> values)
{
    return values.AsParallel().Select(value => value * 2);
}

// You can also specify the order to be preserved.
IEnumerable<int> MultiplyBy2(IEnumerable<int> values)
{
    return values.AsParallel().AsOrdered().Select(value => value * 2);
}

int ParallelSum(IEnumerable<int> values)
{
    return values.AsParallel().Sum();
}
				
			
  • The Parallel class is suitable for many scenarios, but PLINQ code is simpler when aggregating or transforming one sequence into another.

PLINQ provides parallel versions of several operators, including filtering (Where), projection (Select), and various aggregations, such as Sum, Average, and general Aggregate. In general, anything you can do with regular LINQ can be done in parallel with PLINQ. Thus, if the existing LINQ code can be run in parallel, PLINQ is a good choice.

Next chapter we will talk about Dataflow Basics. If you don’t know what the Dataflow concurrency, please read the Chapter 1: Concurrency Overview in C#.

There are some other resources as well:

Visits: 23

Leave a Reply

Your email address will not be published. Required fields are marked *