Chapter 5 - Concurrency in C# Dataflow basic

Dataflow is one of TPL (Task Parallel Library) components that helps increase concurrency-enabled applications, which is very powerful! Dataflow follows some steps: you need to define the mesh first, and then process the data and pass the results to next mesh. The data will flow through a pipeline that you define to the end of the mesh. Once you follow it, the dataflow becomes a natural fit for various scenarios.

Before usingTPL Dataflow, you need to install the System.Threading.Tasks.Dataflow from the NuGet into your application.

Linking Blocks

There are multiple tasks that you need to complete. And the next task input data is from the previous task results. Therefore, you can use the dataflow blocks linking to resolve this.
				
					var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
// After linking, values that exit multiplyBlock will enter subtractBlock.
multiplyBlock.LinkTo(subtractBlock);
				
			

The code shows how to link multiple tasks/blocks together. It’s very easy. The data from multiplyBlock, the first mesh, flows into the next block, subtractBlock, and the data will be output from line 4.

By default, the linked dataflow blocks only propagate data. We need to pass the option, PropagateCompletion, to the LinkTo method to track their completion (and errors).

				
					var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
var options = new DataflowLinkOptions { PropagateCompletion = true };
multiplyBlock.LinkTo(subtractBlock, options);
...
// The first block's completion is automatically propagated to the second block.
multiplyBlock.Complete();
await subtractBlock.Completion;
				
			

The PropagateCompletion option can help the method catch up on errors from each block (if there are any). The caught errors will be propagated to the next block wrapped in an AggregateException. Unfortunately, it’s very easy to be nested in multiple AggregateException instances. Fortunately, the AggregateException has several members that can help with this error handling to make it simple, such as Flatten.

You can set the DataFlowLinkOptions type on the link (such as the PropagateCompletion option used in this solution), and the LinkTo overload can also adopt a predicate that you can use to filter what data can pass through the link.

If the data does not pass the filter, it is not discarded. Instead, data passing through the filter is propagated on the link; Data that does not pass the filter will try to pass through an alternate link, and if no other link can pass, the data will remain in the block. If a data item is stuck in a block like this, the block will not produce any other data items; The entire block is stalled until the data item is deleted.

Propagating Errors

This section we talk about how to handle errors from dataflow mesh. It’s easy to catch exceptions. But we need to use the Completion property its status. And we need to catch the exception at the end of a pipeline.
				
					try
{
    var multiplyBlock = new TransformBlock<int, int>(item =>
    {
        if (item == 1)
            throw new InvalidOperationException("Blech.");
        return item * 2;
    });
    var subtractBlock = new TransformBlock<int, int>(item => item - 2);
    multiplyBlock.LinkTo(subtractBlock,
        new DataflowLinkOptions { PropagateCompletion = true });
    multiplyBlock.Post(1);
    await subtractBlock.Completion;
}
catch (AggregateException)
{
    // The exception is caught here.
}
				
			

The incoming errors will be wrapped in an AggregateException from each block. If an error occurs early in the pipeline and passes through several links before it is observed, the original error will be wrapped in multiple layers of the AggregateException. The AggregateException.Flatten method simplifies error handling in this scenario.

Also, exceptions are kind of another type data and you can let them flow through the mesh along with your regular data.

Unlinking Blocks

This section is pretty easy and straightforward. Sometimes, you may want to unlink blocks during processing to do some dynamically changes.
The concept is that you can link or unlink dataflow blocks at any time. The only concern is that we may not know what data will be. The data could be duplicated or loss of data. Another thing is that both linking and unlinking are entirely threadsafe.
				
					var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
IDisposable link = multiplyBlock.LinkTo(subtractBlock);
multiplyBlock.Post(1);
multiplyBlock.Post(2);
// Unlink the blocks.
// The data posted above may or may not have already gone through the link.
// In real-world code, consider a using block rather than calling Dispose.
link.Dispose();
				
			

Throttling Blocks

This section we talk about how data flows in a load-balancing way. Thinking about the B-tree: One source block links to two target blocks. And we want the data flows down each link. The BoundedCapacity block option is the key that can resolve this scenario issue.
				
					var sourceBlock = new BufferBlock<int>();
var options = new DataflowBlockOptions { BoundedCapacity = 1 };
var targetBlockA = new BufferBlock<int>(options);
var targetBlockB = new BufferBlock<int>(options);
sourceBlock.LinkTo(targetBlockA);
sourceBlock.LinkTo(targetBlockB);
				
			

The throttling blocks behavior can be used anywhere. For example, if your data has many I/O operation, you can use dataflow  to apply BoundedCapacity to the blocks in your mesh. Therefore, less data has interactive with I/O operation, and your mesh won’t end up buffering all the input data before it’s able to process it.

Parallel Processing with Dataflow Blocks

When you want to implement some parallel processing within your dataflow mesh, this section is what you need.
Before, we are talking more details, you should know that two linked blocks process independently. To implement the parallel processing, we need to set the MaxDegreeOfParallelism option. By default this option is set to 1.

We can use the BoundedCapacity to set DataflowBlockOptions.Unbounded or any value greater than zero.

				
					var multiplyBlock = new TransformBlock<int, int>(
    item => item * 2,
    new ExecutionDataflowBlockOptions
    {
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var subtractBlock = new TransformBlock<int, int>(item => item - 2);
multiplyBlock.LinkTo(subtractBlock);
				
			

Creating Custom Blocks

Whenever you have a complex scenarios, this is for you. We separate the input and output block by using the Encapsulate method. Therefore, we can propagating data and completion between those endpoints. Here is an example:
				
					IPropagatorBlock<int, int> CreateMyCustomBlock()
{
    var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
    var addBlock = new TransformBlock<int, int>(item => item + 2);
    var divideBlock = new TransformBlock<int, int>(item => item / 2);
    var flowCompletion = new DataflowLinkOptions { PropagateCompletion = true };
    multiplyBlock.LinkTo(addBlock, flowCompletion);
    addBlock.LinkTo(divideBlock, flowCompletion);
    return DataflowBlock.Encapsulate(multiplyBlock, divideBlock);
}
				
			

Hits: 71

Leave a Reply

Your email address will not be published. Required fields are marked *