diff --git a/CHANGELOG.md b/CHANGELOG.md index 5683da693..a21807380 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ ⚠️ Important Notice: Starting from this release, we won’t be listing every dependency change in our changelog. This helps us maintain the project faster and focus on important features for our InfluxDB client. +### Features: +1. [#705](https://github.com/influxdata/influxdb-client-csharp/pull/705): `QueryAsyncEnumerable()` now supports `FluxRecord` streaming. + ### CI 1. [#681](https://github.com/influxdata/influxdb-client-csharp/pull/681): Add build for `dotnet8` diff --git a/Client.Core/Internal/AbstractQueryClient.cs b/Client.Core/Internal/AbstractQueryClient.cs index b184be46b..806ff1d18 100644 --- a/Client.Core/Internal/AbstractQueryClient.cs +++ b/Client.Core/Internal/AbstractQueryClient.cs @@ -203,10 +203,11 @@ protected async IAsyncEnumerable QueryEnumerable( ) }; - var stream = await RestClient.DownloadStreamAsync(query, cancellationToken).ConfigureAwait(false); + using var stream = await RestClient.DownloadStreamAsync(query, cancellationToken).ConfigureAwait(false); + using var sr = new StreamReader(stream); + await foreach (var (_, record) in _csvParser - .ParseFluxResponseAsync(new StreamReader(stream), cancellationToken) - .ConfigureAwait(false)) + .ParseFluxResponseAsync(sr, cancellationToken).ConfigureAwait(false)) if (!(record is null)) { yield return convert.Invoke(record); diff --git a/Client.Test/QueryApiTest.cs b/Client.Test/QueryApiTest.cs index ec3cc5417..b76752722 100644 --- a/Client.Test/QueryApiTest.cs +++ b/Client.Test/QueryApiTest.cs @@ -98,6 +98,31 @@ public async Task QueryAsyncEnumerable() await foreach (var item in measurements.ConfigureAwait(false)) list.Add(item); Assert.AreEqual(2, list.Count); + Assert.AreEqual("mem", list[0].Measurement); + Assert.AreEqual("mem", list[1].Measurement); + Assert.AreEqual(12.25, list[0].Value); + Assert.AreEqual(13.00, list[1].Value); + } + + [Test] + public async Task QueryAsyncEnumerableOfFluxRecords() + { + MockServer + .Given(Request.Create().WithPath("/api/v2/query").UsingPost()) + .RespondWith(CreateResponse(Data)); + + var measurements = _queryApi.QueryAsyncEnumerable( + new Query(null, "from(...)"), + "my-org"); + + var list = new List(); + await foreach (var item in measurements.ConfigureAwait(false)) list.Add(item); + + Assert.AreEqual(2, list.Count); + Assert.AreEqual("mem", list[0].GetMeasurement()); + Assert.AreEqual("mem", list[1].GetMeasurement()); + Assert.AreEqual(12.25, list[0].GetValue()); + Assert.AreEqual(13.00, list[1].GetValue()); } [Test] diff --git a/Client/InvokableScriptsApi.cs b/Client/InvokableScriptsApi.cs index 9e603d29d..3b5aff90d 100644 --- a/Client/InvokableScriptsApi.cs +++ b/Client/InvokableScriptsApi.cs @@ -1,7 +1,5 @@ using System; using System.Collections.Generic; -using System.Net.Http; -using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using InfluxDB.Client.Api.Domain; @@ -248,15 +246,13 @@ await QueryRaw(CreateRequest(scriptId, bindParams), Consumer, ErrorConsumer, Emp /// Represent key/value pairs parameters to be injected into script /// Cancellation token /// stream of FluxRecord - public async IAsyncEnumerable InvokeScriptEnumerableAsync(string scriptId, + public IAsyncEnumerable InvokeScriptEnumerableAsync(string scriptId, Dictionary bindParams = default, - [EnumeratorCancellation] CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default) { var requestMessage = CreateRequest(scriptId, bindParams); - await foreach (var record in QueryEnumerable(requestMessage, it => it, cancellationToken) - .ConfigureAwait(false)) - yield return record; + return QueryEnumerable(requestMessage, r => r, cancellationToken); } /// @@ -266,15 +262,13 @@ public async IAsyncEnumerable InvokeScriptEnumerableAsync(string scr /// Represent key/value pairs parameters to be injected into script /// Cancellation token /// stream of Measurement - public async IAsyncEnumerable InvokeScriptMeasurementsEnumerableAsync(string scriptId, + public IAsyncEnumerable InvokeScriptMeasurementsEnumerableAsync(string scriptId, Dictionary bindParams = default, - [EnumeratorCancellation] CancellationToken cancellationToken = default) + CancellationToken cancellationToken = default) { var requestMessage = CreateRequest(scriptId, bindParams); - await foreach (var record in QueryEnumerable(requestMessage, it => Mapper.ConvertToEntity(it), - cancellationToken).ConfigureAwait(false)) - yield return record; + return QueryEnumerable(requestMessage, r => Mapper.ConvertToEntity(r), cancellationToken); } protected override void BeforeIntercept(RestRequest request) diff --git a/Client/QueryApi.cs b/Client/QueryApi.cs index 2946ff26b..cbfd1ab7b 100644 --- a/Client/QueryApi.cs +++ b/Client/QueryApi.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Net.Http; -using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using InfluxDB.Client.Api.Domain; @@ -236,6 +235,28 @@ IAsyncEnumerable QueryAsyncEnumerable(string query, string org = null, IAsyncEnumerable QueryAsyncEnumerable(Query query, string org = null, CancellationToken cancellationToken = default); + /// + /// Executes the Flux query against the InfluxDB 2.x and returns + /// an asynchronous enumerable of . + /// + /// the flux query to execute + /// specifies the source organization. If the org is not specified then is used config from . + /// cancellation token + /// An asynchronous enumerable of . + IAsyncEnumerable QueryAsyncEnumerable(string query, string org = null, + CancellationToken cancellationToken = default); + + /// + /// Executes the Flux query against the InfluxDB 2.x and returns + /// an asynchronous enumerable of . + /// + /// the flux query to execute + /// specifies the source organization. If the org is not specified then is used config from . + /// cancellation token + /// An asynchronous enumerable of . + IAsyncEnumerable QueryAsyncEnumerable(Query query, string org = null, + CancellationToken cancellationToken = default); + /// /// Executes the Flux query against the InfluxDB and synchronously map whole response to result. /// @@ -442,16 +463,10 @@ await QueryAsync(query, consumer, ErrorConsumer, EmptyAction, org, cancellationT /// cancellation token /// the type of measurement /// Measurements which are matched the query - public async IAsyncEnumerable QueryAsyncEnumerable(string query, string org = null, - [EnumeratorCancellation] CancellationToken cancellationToken = default) + public IAsyncEnumerable QueryAsyncEnumerable(string query, string org = null, + CancellationToken cancellationToken = default) { - Arguments.CheckNonEmptyString(query, nameof(query)); - - var requestMessage = CreateRequest(CreateQuery(query), org); - - await foreach (var record in QueryEnumerable(requestMessage, it => Mapper.ConvertToEntity(it), - cancellationToken).ConfigureAwait(false)) - yield return record; + return QueryAsyncEnumerable(CreateQuery(query), org, cancellationToken); } /// @@ -463,16 +478,40 @@ public async IAsyncEnumerable QueryAsyncEnumerable(string query, string or /// cancellation token /// the type of measurement /// Measurements which are matched the query - public async IAsyncEnumerable QueryAsyncEnumerable(Query query, string org = null, - [EnumeratorCancellation] CancellationToken cancellationToken = default) + public IAsyncEnumerable QueryAsyncEnumerable(Query query, string org = null, + CancellationToken cancellationToken = default) { - Arguments.CheckNotNull(query, nameof(query)); + var request = CreateRequest(query, org); + return QueryEnumerable(request, r => Mapper.ConvertToEntity(r), cancellationToken); + } - var requestMessage = CreateRequest(query, org); + /// + /// Executes the Flux query against the InfluxDB 2.x and returns + /// an asynchronous enumerable of . + /// + /// the flux query to execute + /// specifies the source organization. If the org is not specified then is used config from . + /// cancellation token + /// An asynchronous enumerable of . + public IAsyncEnumerable QueryAsyncEnumerable(string query, string org = null, + CancellationToken cancellationToken = default) + { + return QueryAsyncEnumerable(CreateQuery(query), org, cancellationToken); + } - await foreach (var record in QueryEnumerable(requestMessage, it => Mapper.ConvertToEntity(it), - cancellationToken).ConfigureAwait(false)) - yield return record; + /// + /// Executes the Flux query against the InfluxDB 2.x and returns + /// an asynchronous enumerable of . + /// + /// the flux query to execute + /// specifies the source organization. If the org is not specified then is used config from . + /// cancellation token + /// An asynchronous enumerable of . + public IAsyncEnumerable QueryAsyncEnumerable(Query query, string org = null, + CancellationToken cancellationToken = default) + { + var request = CreateRequest(query, org); + return QueryEnumerable(request, r => r, cancellationToken); } /// @@ -494,8 +533,7 @@ public Task QueryAsync(string query, Action onNext, Action @@ -540,8 +578,7 @@ public Task QueryAsync(string query, Action onNext, Action onEr var consumer = new FluxResponseConsumerPoco(onNext, Mapper); - return QueryAsync(CreateQuery(query, Dialect), consumer, onError, onComplete, org, - cancellationToken); + return QueryAsync(CreateQuery(query, Dialect), consumer, onError, onComplete, org, cancellationToken); } /// @@ -801,9 +838,7 @@ internal static Query CreateQuery(string query, Dialect dialect = null) { Arguments.CheckNonEmptyString(query, nameof(query)); - var created = new Query(query: query, dialect: dialect ?? Dialect); - - return created; + return new Query(query: query, dialect: dialect ?? Dialect); } } } \ No newline at end of file