IAsyncEnumerable – Migrating to .NET Core 3.0

C# 8 and .NET Core 3.0 introduce async streams, which “are asynchronous variants of enumerables, where getting the next element may involve an asynchronous operation“. This feature is built around the IAsyncEnumerable/IAsyncEnumerator interfaces and the new await foreach statement. A lot has already been written about the feature since its preview releases, so I’d like to focus in a different thing: migrating from a previous version of IAsyncEnumerable.

Prior to C# 8, there was already a version of IAsyncEnumerable/IAsyncEnumerator defined in the System.Interactive.Async package (version <= 3.2.0), which is part of the Reactive Extensions for .NET. The package also included LINQ-like extension methods for async enumerables and helpers to create them. You could be using the package and its facilities, for instance, when using gRPC streaming (version < 2).

If you want to migrate a project as described above to .NET Core 3.0 (or .NET Standard 2.1) you’ll get a conflict between the old (System.Interactive.Async) and new (.NET Standard 2.1) types. If the project at hands is a deployable (e.g. ASP.NET Core application) you can simply update the System.Interactive.Async package to a version >= 4.0.0, which no longer includes its versions of IAsyncEnumerable/IAsyncEnumerator. It’s worth noting, however, that the old and new interfaces are a bit different, so some tweaks are needed (more on this later).

Multi-targeting on shared packages for an easy migration

Another possible scenario is that you have a shared .NET Standard 2.0 library that depends on IAsyncEnumerable/IAsyncEnumerator from System.Interactive.Async and is used in multiple clients which you may not want (or be able) to upgrade to .NET Core 3.0 immediately. This was the case in my current project, where we had a shared library of gRPC utilities being used in different .NET Core 2.2 micro-services. We wanted to start migrating the services to .NET Core 3.0 and the types conflict arose. However, we didn’t want to migrate ALL the services now but we still wanted to keep updating the shared libraries

Along with the recent releases of .NET Core there is the Microsoft.Bcl.AsyncInterfaces package, which includes the IAsyncEnumerable/IAsyncEnumerator interfaces as a standalone release. The solution we found was multi-targeting both .NET Standard 2.0 and 2.1 in the shared library, referencing this package only for .NET Standard 2.0. Here’s the csproj of such a shared library:

<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
  </PropertyGroup>
  <ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
    <PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.0.0" />
  </ItemGroup>
</Project>

This allows consuming the library from projects on both .NET Standard versions and allows you to start using await foreach in the shared code. This is actually what the System.Interactive.Async packages does internally (version > 4.0.0).

Code changes

Besides picking types from the correct assembly, this migration also means doing some code changes, as the old and new interfaces are a bit different.

// Old (System.Interactive.Async)
public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetEnumerator();
}
public interface IAsyncEnumerator<out T> : IDisposable
{
    T Current { get; }
    Task<bool> MoveNext(CancellationToken cancellationToken);
}

// New (NET Standard 2.1 / Microsoft.Bcl.AsyncInterfaces)
public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
}
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
    T Current { get; }
    ValueTask<bool> MoveNextAsync();
}

The main difference is on how cancellation is handled: in the old version, each call to MoveNext would get a cancellation token, while on the new version the token is supplied on GetAsyncEnumerator. This means that the cancellation is captured when an enumeration starts, and the async enumerator is responsible for honoring it on the different calls to MoveNextAsync.

Cancellation tokens that were supplied to MoveNext should now be supplied to GetAsyncEnumerator. If you’re consuming async enumerables with await foreach, by default no cancellation token is supplied. In order to do that, the WithCancellation method can be used:

await foreach(var i in someAsyncEnumerable.WithCancellation(token))

More on cancellation

The previous example illustrates cancellation from the consumer perspective. What about usage of cancellation tokens when implementing async streams with the async foreach statement? Lets say we have the following method:

public async IAsyncEnumerable<int> GetSomeAsyncEnumerable()
{
    await foreach (var i in GetOtherAsyncEnumerable())
    {
        await Task.Delay(i);
        yield return i * i;
    }
}

The first impulse would be to add a cancellation token as a parameter of the method and use it on the call to Delay. However, this would mean that the cancellation token is bundled with the async enumerable itself and that it is used on different enumerations of the enumerable! That would be incorrect.

Keep in mind that the compiler will generate an implementation of the async enumerable which has a GetAsyncEnumerator(CancellationToken cancellationToken) method. What we actually need is a way to access the token that will be supplied by the consumers of our enumerable. Meet the [EnumeratorCancellation] attribute! The previous method can be re-written as:

public static async IAsyncEnumerable<int> GetSomeAsyncEnumerable([EnumeratorCancellation] CancellationToken token = default)
{
    await foreach (var i in GetOtherAsyncEnumerable())
    {
        await Task.Delay(i, token);
        yield return i * i;
    }
}

You can think of this attribute as something that turns the token parameter into a placeholder for the cancellation tokens supplied to GetAsyncEnumerator by the consumers of the enumerable. In addition, the compiler does a bit more magic:

  • If no cancellation token is supplied to GetSomeAsyncEnumerable, than token represents the cancellation token supplied to each enumeration.
  • If a cancellation token is supplied to GetSomeAsyncEnumerable, than token is a new instance created by the compiler which combines the initial parameter and the token supplied to each enumeration. This means that the enumeration is cancelled when either of those two is cancelled.

Summary

In this post I discussed a possible migration path – and code changes – for projects that want to upgrade to .NET Core 3.0 / .NET Standard 2.1 and were using the IAsyncEnumerable/IAsyncEnumerator interfaces defined in the System.Interactive.Async package. The easiest way to migrate is to use multi-target on shared libraries and depend on the new Microsoft.Bcl.AsyncInterfaces package.

I also discussed some of the required code changes as well as the new [EnumeratorCancellation] which can be used to easily support cancellation on async enumerables using await foreach.

Hope this helps!

Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s