Skip to content

[Feature Request] Add support for IAsyncEnumerable (QueryUnbufferedAsync) on all .NET targets #2143

@julealgon

Description

@julealgon

Currently, IAsyncEnumerable support is conditioned on .NET5 or greater. For example:

#if NET5_0_OR_GREATER
/// <summary>
/// Execute a query asynchronously using <see cref="IAsyncEnumerable{dynamic}"/>.
/// </summary>
/// <param name="cnn">The connection to query on.</param>
/// <param name="sql">The SQL to execute for the query.</param>
/// <param name="param">The parameters to pass, if any.</param>
/// <param name="transaction">The transaction to use, if any.</param>
/// <param name="commandTimeout">The command timeout (in seconds).</param>
/// <param name="commandType">The type of command to execute.</param>
/// <returns>
/// A sequence of data of dynamic data
/// </returns>
public static IAsyncEnumerable<dynamic> QueryUnbufferedAsync(this DbConnection cnn, string sql, object? param = null, DbTransaction? transaction = null, int? commandTimeout = null, CommandType? commandType = null)
{
// note: in many cases of adding a new async method I might add a CancellationToken - however, cancellation is expressed via WithCancellation on iterators
return QueryUnbufferedAsync<dynamic>(cnn, typeof(object), new CommandDefinition(sql, param, transaction, commandTimeout, commandType, CommandFlags.None, default));
}
/// <summary>
/// Execute a query asynchronously using <see cref="IAsyncEnumerable{T}"/>.
/// </summary>
/// <typeparam name="T">The type of results to return.</typeparam>
/// <param name="cnn">The connection to query on.</param>
/// <param name="sql">The SQL to execute for the query.</param>
/// <param name="param">The parameters to pass, if any.</param>
/// <param name="transaction">The transaction to use, if any.</param>
/// <param name="commandTimeout">The command timeout (in seconds).</param>
/// <param name="commandType">The type of command to execute.</param>
/// <returns>
/// A sequence of data of <typeparamref name="T"/>; if a basic type (int, string, etc) is queried then the data from the first column is assumed, otherwise an instance is
/// created per row, and a direct column-name===member-name mapping is assumed (case insensitive).
/// </returns>
public static IAsyncEnumerable<T> QueryUnbufferedAsync<T>(this DbConnection cnn, string sql, object? param = null, DbTransaction? transaction = null, int? commandTimeout = null, CommandType? commandType = null)
{
// note: in many cases of adding a new async method I might add a CancellationToken - however, cancellation is expressed via WithCancellation on iterators
return QueryUnbufferedAsync<T>(cnn, typeof(T), new CommandDefinition(sql, param, transaction, commandTimeout, commandType, CommandFlags.None, default));
}
private static IAsyncEnumerable<T> QueryUnbufferedAsync<T>(this IDbConnection cnn, Type effectiveType, CommandDefinition command)
{
return Impl(cnn, effectiveType, command, command.CancellationToken); // proxy to allow CT expression
static async IAsyncEnumerable<T> Impl(IDbConnection cnn, Type effectiveType, CommandDefinition command,
[EnumeratorCancellation] CancellationToken cancel)
{
object? param = command.Parameters;
var identity = new Identity(command.CommandText, command.CommandTypeDirect, cnn, effectiveType, param?.GetType());
var info = GetCacheInfo(identity, param, command.AddToCache);
bool wasClosed = cnn.State == ConnectionState.Closed;
using var cmd = command.TrySetupAsyncCommand(cnn, info.ParamReader);
DbDataReader? reader = null;
try
{
if (wasClosed) await cnn.TryOpenAsync(cancel).ConfigureAwait(false);
reader = await ExecuteReaderWithFlagsFallbackAsync(cmd, wasClosed, CommandBehavior.SequentialAccess | CommandBehavior.SingleResult, cancel).ConfigureAwait(false);
var tuple = info.Deserializer;
int hash = GetColumnHash(reader);
if (tuple.Func is null || tuple.Hash != hash)
{
if (reader.FieldCount == 0)
{
yield break;
}
tuple = info.Deserializer = new DeserializerState(hash, GetDeserializer(effectiveType, reader, 0, -1, false));
if (command.AddToCache) SetQueryCache(identity, info);
}
var func = tuple.Func;
var convertToType = Nullable.GetUnderlyingType(effectiveType) ?? effectiveType;
while (await reader.ReadAsync(cancel).ConfigureAwait(false))
{
object val = func(reader);
yield return GetValue<T>(reader, effectiveType, val);
}
while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
command.OnCompleted();
}
finally
{
if (reader is not null)
{
if (!reader.IsClosed)
{
try { cmd?.Cancel(); }
catch { /* don't spoil any existing exception */ }
}
await reader.DisposeAsync();
}
if (wasClosed) cnn.Close();
}
}
}
#endif

However, there shouldn't be any reason not to support IAsyncEnumerable on every target currently supported by Dapper, since there exists a well-maintained BCL package that adds the interface on older frameworks:

To allow for IAsyncEnumerable to work everywhere, it would just be a matter of including a dependency on the package above for the netstandard2.0 and net462 targets. This approach is used heavily by other libraries for this exact same purpose.

We currently have a solution that targets NET472 and we wanted to be able to tap into IAsyncEnumerable in a couple of places where we are currently being forced to use Task<IEnumerable<T>> methods such as QueryAsync.

Could there be an update to the library that adds support for the IAsyncEnumerable-returning methods for all frameworks based on the above suggestion?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions