Concurrency in C#- Asynchronous Streams

Asynchronous Streams: It is a method of receiving multiple data items asynchronously, which are built on an asynchronous enumerable (IAsyncEnumerable< T >). Asynchronous enumerable objects are asynchronous versions of enumerable objects. Based on needs of consumers, those objects can be implemented asynchronously. Let me introduce some concepts briefly.

Asynchronous Streams and Task<T>

Task<T> means an asynchronous method only can provide the value once, even the T is a collection. But the asynchronous streams are more like enumerables. 

Asynchronous Streams and IEnumerable<T>

In asynchronous world, IAsyncEnumerable<T> is just like synchronous IEnumerable<T>. IAsyncEnumberable<T> can retrieve elements one at a time asynchronously. When IEnumerable<T> is running some I/O related operation, such as API calls, it will block on I/O while it’s running. However, IAsyncEnumerable<T> can do the same thing without blocking on I/O, because it retrieves every item asynchronously.

Asynchronous Streams and Task<IEnumerable<T>>

Task<List<T>> is an example of Task<IEnumerable<T>>. The value must wait for the async method completed before it is returned. This type, Task<IEnumerable<T>>, also has a limitation. The IEnumerable<T> items cannot be returned once get them. IAsyncEnumerable<T> is similar to Task<IEnumerable<T>>, but it’s better. IAsyncEnumerable<T> can return each item asynchronously, which is a true asynchronous stream. 

Asynchronous Streams and IObservable<T>

Observables are a true notion of asynchronous streams, which means the usage of IObservable is entirely different from the IAsyncEnumerable. Generally, using a LINQ-like query an IObservable returned value is required.

The IObservable also has a backpressure problem. System.Reactive is a synchronous namespace. Therefore, once an item notification is sent to its subscribers, the observable continues to execute and retrieves the next item to be published, possibly invoking API Streams again.

Another way of thinking regarding the difference between IObservable and IAsyncEnumerable is that the IObservable is push-based, and the IAsyncEnumerable is pull-based. Thus, an observable stream in your code will push notifications, but an asynchronous stream will passively let your code pull data items out of it asynchronously.

Here is the table regarding different roles of common types:

TypeSingle or multiple valueAsynchronous or synchronousPush or pull
TSingle valueSynchronousN/A
IEnumerableMultiple valuesSynchronousN/A
TaskSingle valueAsynchronousPull
IAsyncEnumerableMultiple valuesAsynchronousPull
IObservableSingle or multipleAsynchronousPush

Creating Asynchronous Steams

yield return is the solution for returning multiple values from a method. And also, it uses async and await in asynchronous methods. We can take advantage of these two ways regarding asynchronous streams and return an IAsyncEnumerable<T> type value.

				
					async IAsyncEnumerable<int> GetValuesAsync()
{
    await Task.Delay(1000); // some asynchronous work
    yield return 10;
    await Task.Delay(1000); // more asynchronous work
    yield return 13;
}
				
			

This is the normal pattern for asynchronous streams. For many streams, most asynchronous iterations are actually synchronous; Asynchronous streams only allow asynchronous retrieval of any next item. Asynchronous streams are designed with both asynchronous and synchronous code in mind; This is why asynchronous flows are built on ValueTask<T>. Asynchronous streams maximize their efficiency by using ValueTask<T> behind the scenes, regardless of whether items are retrieved synchronously or asynchronously.

Consuming Asynchronous Streams

This section we are going to talk about how to consume the results of an asynchronous stream, known as an asynchronous enumerable. To achieve this, we can use await and foreach. Here is an example of code:

				
					IAsyncEnumerable<string> GetValuesAsync(HttpClient client);
public async Task ProcessValueAsync(HttpClient client)
{
    await foreach (string value in GetValuesAsync(client))
    {
        Console.WriteLine(value);
    }
}
				
			

When the method GetValuesAsync got invoked, it returns an IAsyncEnumerable<T>. The results from await foreach have to wait for the await foreach complete.

We also can pass the ConfigureAwait(false) to avoid the implicitly captured context.

				
					IAsyncEnumerable<string> GetValuesAsync(HttpClient client);
public async Task ProcessValueAsync(HttpClient client)
{
    await foreach (string value in GetValuesAsync(client).ConfigureAwait(false))
    {
        await Task.Delay(100).ConfigureAwait(false); // asynchronous work
        Console.WriteLine(value);
    }
}
				
			

Using LINQ with Asynchronous Streams

await foreach is the most common way to consume asynchronous streams. But you also can use LINQ to do that. Using LINQ, IEnumerable<T> result can be returned. And also IObserveble<T> can be converted to event by LINQ. LINQ asynchronous streams support many useful operators such as WhereAwait:

				
					IAsyncEnumerable<int> values = SlowRange().WhereAwait(
async value =>
{
    // Do some asynchronous work to determine
    // if this element should be included.
    await Task.Delay(10);
    return value % 2 == 0;
});
await foreach (int result in values)
{
    Console.WriteLine(result);
}
// Produce sequence that slows down as it progresses.
async IAsyncEnumerable<int> SlowRange()
{
    for (int i = 0; i != 10; ++i)
    {
        await Task.Delay(i * 100);
        yield return i;
    }
}
				
			

Also, LINQ operators for asynchronous streams can convert synchronous results to asynchronous streams by Where, Select, or whatever:

				
					IAsyncEnumerable<int> values
    = SlowRange().Where(
        value => value % 2 == 0);
await foreach (int result in values)
{
    Console.WriteLine(result);
}
				
			

Asynchronous stream LINQ methods, which are very useful as well, can convert regular enumerables to asynchronous streams. When you meet this situation, you can call ToAsyncEnumerable() on any IEnumerable<T>, and then got an asynchronous stream interface that take advantage of WhereAwait, SelectAwait, and other operators that support asynchronous delegates.

Asynchronous Streams and Cancellation

This section we are talking about how to find a way to cancel asynchronous streams. It’s an option to cancel asynchronous streams when needed. The most common way to to do it is to pass cancellation tokens to their source streams:

				
					using var cts = new CancellationTokenSource(500);
CancellationToken token = cts.Token;
await foreach (int result in SlowRange(token))
{
    Console.WriteLine(result);
}
// Produce sequence that slows down as it progresses.
async IAsyncEnumerable<int> SlowRange(
    [EnumeratorCancellation] CancellationToken token = default)
{
    for (int i = 0; i != 10; ++i)
    {
        await Task.Delay(i * 100, token);
        yield return i;
    }
}
				
			

Cancelable is not an enumerable object, but the enumerator created by that enumerable object. This is an uncommon but important use case, which is why asynchronous flows support the WithCancellation extension method, which you can use to append CancellationToken to a particular iteration of an asynchronous stream:

				
					async Task ConsumeSequence(IAsyncEnumerable<int> items)
{
    using var cts = new CancellationTokenSource(500);
    CancellationToken token = cts.Token;
    await foreach (int result in items.WithCancellation(token))
    {
        Console.WriteLine(result);
    }
}
// Produce sequence that slows down as it progresses.
async IAsyncEnumerable<int> SlowRange(
    [EnumeratorCancellation] CancellationToken token = default)
{
    for (int i = 0; i != 10; ++i)
    {
        await Task.Delay(i * 100, token);
        yield return i;
    }
}
await ConsumeSequence(SlowRange());
				
			

next Chapter we are going to talk about Parallel Basic. If you are interested in the topic, Concurrency in C#, please also check other chapters:

For more resources regarding Asynchronous Stream in C#, you can also read this article: Tutorial: Generate and consume async streams using C# 8.0 and .NET Core 3.0

 

Visits: 137

Leave a Reply

Your email address will not be published. Required fields are marked *