Parallel programming is used to split CPU-bound pieces of work and divide them into multiple threads. These parallel processing methods only consider CPU binding work.
This chapter we will talk about how to use the Parallel class to resolve some issues.
Parallel Processing of Data
void RotateMatrices(IEnumerable matrices, float degrees)
{
Parallel.ForEach(matrices, matrix => matrix.Rotate(degrees));
}
For some situations, you may want to stop the loop in the middle of the process. We can use the state to stop it.
void InvertMatrices(IEnumerable matrices)
{
Parallel.ForEach(matrices, (matrix, state) =>
{
if (!matrix.IsInvertible)
state.Stop();
else
matrix.Invert();
});
}
The state.Stop() is using ParallelLoopState.Stop to stop the loop and the process will jump out the loop body.
Another common situation is to cancel a parallel loop. The difference between cancelling a loop and stopping a loop is that stopping a loop is from inside the loop, and the cancelling a loop is from outside the loop. Here is an example of how to cancel a loop from outside.
void RotateMatrices(IEnumerable matrices, float degrees, CancellationToken token)
{
Parallel.ForEach(matrices,
new ParallelOptions { CancellationToken = token },
matrix => matrix.Rotate(degrees));
}
Parallel Aggregation
// Note: this is not the most efficient implementation.
// This is just an example of using a lock to protect shared state.
int ParallelSum(IEnumerable values)
{
object mutex = new object();
int result = 0;
Parallel.ForEach(source: values,
localInit: () => 0,
body: (item, state, localValue) => localValue + item,
localFinally: localValue =>
{
lock (mutex)
result += localValue;
});
return result;
}
LINQ also supports the Parallel class and it’s more natural aggregation:
int ParallelSum(IEnumerable values)
{
return values.AsParallel().Aggregate(
seed: 0,
func: (sum, item) => sum + item
);
}
Parallel Invocation
void ProcessArray(double[] array)
{
Parallel.Invoke(
() => ProcessPartialArray(array, 0, array.Length / 2),
() => ProcessPartialArray(array, array.Length / 2, array.Length)
);
}
void ProcessPartialArray(double[] array, int begin, int end)
{
// CPU-intensive processing...
}
Also, Parallel.Invoke supports cancellation:
void DoAction20Times(Action action, CancellationToken token)
{
Action[] actions = Enumerable.Repeat(action, 20).ToArray();
Parallel.Invoke(new ParallelOptions { CancellationToken = token }, actions);
}
Dynamic Parallelism
The dynamic parallelism makes the Parallel more powerful. In some scenarios, you may need to use Parallel.Invoke or Parallel.ForEach in the middle of a methods. The data is dynamic generated.
For this scenario we can use the Task Parallel Library (TPL). For powerful Tasks, The Parallel class and Parallel LINQ are very convenient. When you need dynamic parallelism, it is easiest to use the Task type directly. Here is an example:
void Traverse(Node current)
{
DoExpensiveActionOnNode(current);
if (current.Left != null)
{
Task.Factory.StartNew(
() => Traverse(current.Left),
CancellationToken.None,
TaskCreationOptions.AttachedToParent,
TaskScheduler.Default
);
}
if (current.Right != null)
{
Task.Factory.StartNew(
() => Traverse(current.Right),
CancellationToken.None,
TaskCreationOptions.AttachedToParent,
TaskScheduler.Default
);
}
}
void ProcessTree(Node root)
{
Task task = Task.Factory.StartNew(
() => Traverse(root),
CancellationToken.None,
TaskCreationOptions.None,
TaskScheduler.Default
);
task.Wait();
}
You can also use task continuations to schedule any task to run after another task without a parent/child situation.
Task task = Task.Factory.StartNew(
() => Thread.Sleep(TimeSpan.FromSeconds(2)),
CancellationToken.None,
TaskCreationOptions.None,
TaskScheduler.Default
);
Task continuation = task.ContinueWith(
t => Trace.WriteLine("Task is done"),
CancellationToken.None,
TaskContinuationOptions.None,
TaskScheduler.Default
);
// The "t" argument to the continuation is the same as "task".
WARNING: Parallel processing with Tasks is quite different from asynchronous processing with tasks.
Parallel LINQ
In streaming scenarios, PLINQ works well when you have a sequence of inputs and produce a sequence of outputs. Here are some example:
IEnumerable MultiplyBy2(IEnumerable values)
{
return values.AsParallel().Select(value => value * 2);
}
// You can also specify the order to be preserved.
IEnumerable MultiplyBy2(IEnumerable values)
{
return values.AsParallel().AsOrdered().Select(value => value * 2);
}
int ParallelSum(IEnumerable values)
{
return values.AsParallel().Sum();
}
- The Parallel class is suitable for many scenarios, but PLINQ code is simpler when aggregating or transforming one sequence into another.
PLINQ provides parallel versions of several operators, including filtering (Where), projection (Select), and various aggregations, such as Sum, Average, and general Aggregate. In general, anything you can do with regular LINQ can be done in parallel with PLINQ. Thus, if the existing LINQ code can be run in parallel, PLINQ is a good choice.
Next chapter we will talk about Dataflow Basics. If you don’t know what the Dataflow concurrency, please read the Chapter 1: Concurrency Overview in C#.
Hits: 10