|
#if NET5_0_OR_GREATER |
|
/// <summary> |
|
/// Execute a query asynchronously using <see cref="IAsyncEnumerable{dynamic}"/>. |
|
/// </summary> |
|
/// <param name="cnn">The connection to query on.</param> |
|
/// <param name="sql">The SQL to execute for the query.</param> |
|
/// <param name="param">The parameters to pass, if any.</param> |
|
/// <param name="transaction">The transaction to use, if any.</param> |
|
/// <param name="commandTimeout">The command timeout (in seconds).</param> |
|
/// <param name="commandType">The type of command to execute.</param> |
|
/// <returns> |
|
/// A sequence of data of dynamic data |
|
/// </returns> |
|
public static IAsyncEnumerable<dynamic> QueryUnbufferedAsync(this DbConnection cnn, string sql, object? param = null, DbTransaction? transaction = null, int? commandTimeout = null, CommandType? commandType = null) |
|
{ |
|
// note: in many cases of adding a new async method I might add a CancellationToken - however, cancellation is expressed via WithCancellation on iterators |
|
return QueryUnbufferedAsync<dynamic>(cnn, typeof(object), new CommandDefinition(sql, param, transaction, commandTimeout, commandType, CommandFlags.None, default)); |
|
} |
|
|
|
/// <summary> |
|
/// Execute a query asynchronously using <see cref="IAsyncEnumerable{T}"/>. |
|
/// </summary> |
|
/// <typeparam name="T">The type of results to return.</typeparam> |
|
/// <param name="cnn">The connection to query on.</param> |
|
/// <param name="sql">The SQL to execute for the query.</param> |
|
/// <param name="param">The parameters to pass, if any.</param> |
|
/// <param name="transaction">The transaction to use, if any.</param> |
|
/// <param name="commandTimeout">The command timeout (in seconds).</param> |
|
/// <param name="commandType">The type of command to execute.</param> |
|
/// <returns> |
|
/// A sequence of data of <typeparamref name="T"/>; if a basic type (int, string, etc) is queried then the data from the first column is assumed, otherwise an instance is |
|
/// created per row, and a direct column-name===member-name mapping is assumed (case insensitive). |
|
/// </returns> |
|
public static IAsyncEnumerable<T> QueryUnbufferedAsync<T>(this DbConnection cnn, string sql, object? param = null, DbTransaction? transaction = null, int? commandTimeout = null, CommandType? commandType = null) |
|
{ |
|
// note: in many cases of adding a new async method I might add a CancellationToken - however, cancellation is expressed via WithCancellation on iterators |
|
return QueryUnbufferedAsync<T>(cnn, typeof(T), new CommandDefinition(sql, param, transaction, commandTimeout, commandType, CommandFlags.None, default)); |
|
} |
|
|
|
private static IAsyncEnumerable<T> QueryUnbufferedAsync<T>(this IDbConnection cnn, Type effectiveType, CommandDefinition command) |
|
{ |
|
return Impl(cnn, effectiveType, command, command.CancellationToken); // proxy to allow CT expression |
|
|
|
static async IAsyncEnumerable<T> Impl(IDbConnection cnn, Type effectiveType, CommandDefinition command, |
|
[EnumeratorCancellation] CancellationToken cancel) |
|
{ |
|
object? param = command.Parameters; |
|
var identity = new Identity(command.CommandText, command.CommandTypeDirect, cnn, effectiveType, param?.GetType()); |
|
var info = GetCacheInfo(identity, param, command.AddToCache); |
|
bool wasClosed = cnn.State == ConnectionState.Closed; |
|
using var cmd = command.TrySetupAsyncCommand(cnn, info.ParamReader); |
|
DbDataReader? reader = null; |
|
try |
|
{ |
|
if (wasClosed) await cnn.TryOpenAsync(cancel).ConfigureAwait(false); |
|
reader = await ExecuteReaderWithFlagsFallbackAsync(cmd, wasClosed, CommandBehavior.SequentialAccess | CommandBehavior.SingleResult, cancel).ConfigureAwait(false); |
|
|
|
var tuple = info.Deserializer; |
|
int hash = GetColumnHash(reader); |
|
if (tuple.Func is null || tuple.Hash != hash) |
|
{ |
|
if (reader.FieldCount == 0) |
|
{ |
|
yield break; |
|
} |
|
tuple = info.Deserializer = new DeserializerState(hash, GetDeserializer(effectiveType, reader, 0, -1, false)); |
|
if (command.AddToCache) SetQueryCache(identity, info); |
|
} |
|
|
|
var func = tuple.Func; |
|
|
|
var convertToType = Nullable.GetUnderlyingType(effectiveType) ?? effectiveType; |
|
while (await reader.ReadAsync(cancel).ConfigureAwait(false)) |
|
{ |
|
object val = func(reader); |
|
yield return GetValue<T>(reader, effectiveType, val); |
|
} |
|
while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ } |
|
command.OnCompleted(); |
|
} |
|
finally |
|
{ |
|
if (reader is not null) |
|
{ |
|
if (!reader.IsClosed) |
|
{ |
|
try { cmd?.Cancel(); } |
|
catch { /* don't spoil any existing exception */ } |
|
} |
|
await reader.DisposeAsync(); |
|
} |
|
if (wasClosed) cnn.Close(); |
|
} |
|
} |
|
} |
|
#endif |
Currently,
IAsyncEnumerablesupport is conditioned on .NET5 or greater. For example:Dapper/Dapper/SqlMapper.Async.cs
Lines 1252 to 1347 in 6434c69
However, there shouldn't be any reason not to support
IAsyncEnumerableon every target currently supported by Dapper, since there exists a well-maintained BCL package that adds the interface on older frameworks:To allow for
IAsyncEnumerableto work everywhere, it would just be a matter of including a dependency on the package above for thenetstandard2.0andnet462targets. This approach is used heavily by other libraries for this exact same purpose.We currently have a solution that targets NET472 and we wanted to be able to tap into
IAsyncEnumerablein a couple of places where we are currently being forced to useTask<IEnumerable<T>>methods such asQueryAsync.Could there be an update to the library that adds support for the
IAsyncEnumerable-returning methods for all frameworks based on the above suggestion?