IAsyncEnumerable – Migrating to .NET Core 3.0

C# 8 and .NET Core 3.0 introduce async streams, which “are asynchronous variants of enumerables, where getting the next element may involve an asynchronous operation“. This feature is built around the IAsyncEnumerable/IAsyncEnumerator interfaces and the new await foreach statement. A lot has already been written about the feature since its preview releases, so I’d like to focus in a different thing: migrating from a previous version of IAsyncEnumerable.

Prior to C# 8, there was already a version of IAsyncEnumerable/IAsyncEnumerator defined in the System.Interactive.Async package (version <= 3.2.0), which is part of the Reactive Extensions for .NET. The package also included LINQ-like extension methods for async enumerables and helpers to create them. You could be using the package and its facilities, for instance, when using gRPC streaming (version < 2).

If you want to migrate a project as described above to .NET Core 3.0 (or .NET Standard 2.1) you’ll get a conflict between the old (System.Interactive.Async) and new (.NET Standard 2.1) types. If the project at hands is a deployable (e.g. ASP.NET Core application) you can simply update the System.Interactive.Async package to a version >= 4.0.0, which no longer includes its versions of IAsyncEnumerable/IAsyncEnumerator. It’s worth noting, however, that the old and new interfaces are a bit different, so some tweaks are needed (more on this later).

Multi-targeting on shared packages for an easy migration

Another possible scenario is that you have a shared .NET Standard 2.0 library that depends on IAsyncEnumerable/IAsyncEnumerator from System.Interactive.Async and is used in multiple clients which you may not want (or be able) to upgrade to .NET Core 3.0 immediately. This was the case in my current project, where we had a shared library of gRPC utilities being used in different .NET Core 2.2 micro-services. We wanted to start migrating the services to .NET Core 3.0 and the types conflict arose. However, we didn’t want to migrate ALL the services now but we still wanted to keep updating the shared libraries

Along with the recent releases of .NET Core there is the Microsoft.Bcl.AsyncInterfaces package, which includes the IAsyncEnumerable/IAsyncEnumerator interfaces as a standalone release. The solution we found was multi-targeting both .NET Standard 2.0 and 2.1 in the shared library, referencing this package only for .NET Standard 2.0. Here’s the csproj of such a shared library:

<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <TargetFrameworks>netstandard2.0;netstandard2.1</TargetFrameworks>
  </PropertyGroup>
  <ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
    <PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="1.0.0" />
  </ItemGroup>
</Project>

This allows consuming the library from projects on both .NET Standard versions and allows you to start using await foreach in the shared code. This is actually what the System.Interactive.Async packages does internally (version > 4.0.0).

Code changes

Besides picking types from the correct assembly, this migration also means doing some code changes, as the old and new interfaces are a bit different.

// Old (System.Interactive.Async)
public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetEnumerator();
}
public interface IAsyncEnumerator<out T> : IDisposable
{
    T Current { get; }
    Task<bool> MoveNext(CancellationToken cancellationToken);
}

// New (NET Standard 2.1 / Microsoft.Bcl.AsyncInterfaces)
public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
}
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
    T Current { get; }
    ValueTask<bool> MoveNextAsync();
}

The main difference is on how cancellation is handled: in the old version, each call to MoveNext would get a cancellation token, while on the new version the token is supplied on GetAsyncEnumerator. This means that the cancellation is captured when an enumeration starts, and the async enumerator is responsible for honoring it on the different calls to MoveNextAsync.

Cancellation tokens that were supplied to MoveNext should now be supplied to GetAsyncEnumerator. If you’re consuming async enumerables with await foreach, by default no cancellation token is supplied. In order to do that, the WithCancellation method can be used:

await foreach(var i in someAsyncEnumerable.WithCancellation(token))

More on cancellation

The previous example illustrates cancellation from the consumer perspective. What about usage of cancellation tokens when implementing async streams with the async foreach statement? Lets say we have the following method:

public async IAsyncEnumerable<int> GetSomeAsyncEnumerable()
{
    await foreach (var i in GetOtherAsyncEnumerable())
    {
        await Task.Delay(i);
        yield return i * i;
    }
}

The first impulse would be to add a cancellation token as a parameter of the method and use it on the call to Delay. However, this would mean that the cancellation token is bundled with the async enumerable itself and that it is used on different enumerations of the enumerable! That would be incorrect.

Keep in mind that the compiler will generate an implementation of the async enumerable which has a GetAsyncEnumerator(CancellationToken cancellationToken) method. What we actually need is a way to access the token that will be supplied by the consumers of our enumerable. Meet the [EnumeratorCancellation] attribute! The previous method can be re-written as:

public static async IAsyncEnumerable<int> GetSomeAsyncEnumerable([EnumeratorCancellation] CancellationToken token = default)
{
    await foreach (var i in GetOtherAsyncEnumerable())
    {
        await Task.Delay(i, token);
        yield return i * i;
    }
}

You can think of this attribute as something that turns the token parameter into a placeholder for the cancellation tokens supplied to GetAsyncEnumerator by the consumers of the enumerable. In addition, the compiler does a bit more magic:

  • If no cancellation token is supplied to GetSomeAsyncEnumerable, than token represents the cancellation token supplied to each enumeration.
  • If a cancellation token is supplied to GetSomeAsyncEnumerable, than token is a new instance created by the compiler which combines the initial parameter and the token supplied to each enumeration. This means that the enumeration is cancelled when either of those two is cancelled.

Summary

In this post I discussed a possible migration path – and code changes – for projects that want to upgrade to .NET Core 3.0 / .NET Standard 2.1 and were using the IAsyncEnumerable/IAsyncEnumerator interfaces defined in the System.Interactive.Async package. The easiest way to migrate is to use multi-target on shared libraries and depend on the new Microsoft.Bcl.AsyncInterfaces package.

I also discussed some of the required code changes as well as the new [EnumeratorCancellation] which can be used to easily support cancellation on async enumerables using await foreach.

Hope this helps!

Advertisement

JSON.NET converter for type hierarchies

By default, JSON.NET supports serialization of type hierarchies – or, more generically, runtime types different from the ones declared in the respective properties – through the usage of TypeNameHandling. The serializer can add a $type property whose value is the fully qualified name of the serialized type. While this works, it can be a problem for interop and code maintainability.  There are ways to customize the type name that gets serialize but you still get stuck with the $type and global resolution of types from the name that is serialized (possible collisions if one isn’t careful).

I wrote the following converter that handles a class hierarchy by adding a discriminator property with configurable name and values.


class SubTypesConverter<T> : JsonConverter<T> where T: new()
{
[ThreadStatic]
private static bool isWriting;
private readonly string discriminatorName;
private readonly Dictionary<string, Func<T>> factories;
private readonly Dictionary<Type, string> discriminators;
public override bool CanRead => true;
public override bool CanWrite => !isWriting;
public SubTypesConverter(string discriminatorName)
{
this.discriminatorName = discriminatorName;
this.factories = new Dictionary<string, Func<T>>();
this.discriminators = new Dictionary<Type, string>();
var types = typeof(T).Assembly
.GetTypes()
.Where(t => typeof(T).IsAssignableFrom(t) && !t.IsAbstract);
foreach (var t in types)
{
var discriminator = this.GetDiscriminator(t);
this.factories.Add(discriminator, CreateFactory(t));
this.discriminators.Add(t, discriminator);
}
}
public override T ReadJson(JsonReader reader, Type objectType, T existingValue, bool hasExistingValue, JsonSerializer serializer)
{
if (hasExistingValue)
{
throw new NotSupportedException($"{nameof(SubTypesConverter<T>)} does not allow reading into an existing instance");
}
var jsonObject = JObject.Load(reader);
var discriminator = jsonObject[this.discriminatorName].Value<string>();
var value = this.factories[discriminator]();
serializer.Populate(jsonObject.CreateReader(), value);
return value;
}
public override void WriteJson(JsonWriter writer, T value, JsonSerializer serializer)
{
try
{
isWriting = true;
var jsonObject = JObject.FromObject(value, serializer);
jsonObject.AddFirst(new JProperty(this.discriminatorName, this.discriminators[value.GetType()]));
jsonObject.WriteTo(writer);
}
finally
{
isWriting = false;
}
}
protected virtual string GetDiscriminator(Type type)
{
return type.Name;
}
private static Func<T> CreateFactory(Type t)
{
var newExp = Expression.New(t.GetConstructor(Type.EmptyTypes));
return Expression.Lambda<Func<T>>(newExp).Compile();
}
}

You just need to create an instance passing the base class of your hierarchy as the generic argument and add it to JsonSerializerSettings.Converters. By default the converter uses the type name as the value of the discriminator, but you can change this by subclassing and overriding the GetDiscriminator method. Also, it assumes that all the types in the hierarchy starting in are in the same assembly as T.

Hope this helps!