Skip to content

QueryAsyncEnumerable() now supports FluxRecord streaming #705

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

⚠️ Important Notice: Starting from this release, we won’t be listing every dependency change in our changelog. This helps us maintain the project faster and focus on important features for our InfluxDB client.

### Features:
1. [#705](https://github.com/influxdata/influxdb-client-csharp/pull/705): `QueryAsyncEnumerable()` now supports `FluxRecord` streaming.

### CI
1. [#681](https://github.com/influxdata/influxdb-client-csharp/pull/681): Add build for `dotnet8`

Expand Down
7 changes: 4 additions & 3 deletions Client.Core/Internal/AbstractQueryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,11 @@ protected async IAsyncEnumerable<T> QueryEnumerable<T>(
)
};

var stream = await RestClient.DownloadStreamAsync(query, cancellationToken).ConfigureAwait(false);
using var stream = await RestClient.DownloadStreamAsync(query, cancellationToken).ConfigureAwait(false);
using var sr = new StreamReader(stream);

await foreach (var (_, record) in _csvParser
.ParseFluxResponseAsync(new StreamReader(stream), cancellationToken)
.ConfigureAwait(false))
.ParseFluxResponseAsync(sr, cancellationToken).ConfigureAwait(false))
if (!(record is null))
{
yield return convert.Invoke(record);
Expand Down
25 changes: 25 additions & 0 deletions Client.Test/QueryApiTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,31 @@ public async Task QueryAsyncEnumerable()
await foreach (var item in measurements.ConfigureAwait(false)) list.Add(item);

Assert.AreEqual(2, list.Count);
Assert.AreEqual("mem", list[0].Measurement);
Assert.AreEqual("mem", list[1].Measurement);
Assert.AreEqual(12.25, list[0].Value);
Assert.AreEqual(13.00, list[1].Value);
}

[Test]
public async Task QueryAsyncEnumerableOfFluxRecords()
{
MockServer
.Given(Request.Create().WithPath("/api/v2/query").UsingPost())
.RespondWith(CreateResponse(Data));

var measurements = _queryApi.QueryAsyncEnumerable(
new Query(null, "from(...)"),
"my-org");

var list = new List<FluxRecord>();
await foreach (var item in measurements.ConfigureAwait(false)) list.Add(item);

Assert.AreEqual(2, list.Count);
Assert.AreEqual("mem", list[0].GetMeasurement());
Assert.AreEqual("mem", list[1].GetMeasurement());
Assert.AreEqual(12.25, list[0].GetValue());
Assert.AreEqual(13.00, list[1].GetValue());
}

[Test]
Expand Down
18 changes: 6 additions & 12 deletions Client/InvokableScriptsApi.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using InfluxDB.Client.Api.Domain;
Expand Down Expand Up @@ -248,15 +246,13 @@ await QueryRaw(CreateRequest(scriptId, bindParams), Consumer, ErrorConsumer, Emp
/// <param name="bindParams">Represent key/value pairs parameters to be injected into script</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>stream of FluxRecord</returns>
public async IAsyncEnumerable<FluxRecord> InvokeScriptEnumerableAsync(string scriptId,
public IAsyncEnumerable<FluxRecord> InvokeScriptEnumerableAsync(string scriptId,
Dictionary<string, object> bindParams = default,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
CancellationToken cancellationToken = default)
{
var requestMessage = CreateRequest(scriptId, bindParams);

await foreach (var record in QueryEnumerable(requestMessage, it => it, cancellationToken)
.ConfigureAwait(false))
yield return record;
return QueryEnumerable(requestMessage, r => r, cancellationToken);
}

/// <summary>
Expand All @@ -266,15 +262,13 @@ public async IAsyncEnumerable<FluxRecord> InvokeScriptEnumerableAsync(string scr
/// <param name="bindParams">Represent key/value pairs parameters to be injected into script</param>
/// <param name="cancellationToken">Cancellation token</param>
/// <returns>stream of Measurement</returns>
public async IAsyncEnumerable<T> InvokeScriptMeasurementsEnumerableAsync<T>(string scriptId,
public IAsyncEnumerable<T> InvokeScriptMeasurementsEnumerableAsync<T>(string scriptId,
Dictionary<string, object> bindParams = default,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
CancellationToken cancellationToken = default)
{
var requestMessage = CreateRequest(scriptId, bindParams);

await foreach (var record in QueryEnumerable(requestMessage, it => Mapper.ConvertToEntity<T>(it),
cancellationToken).ConfigureAwait(false))
yield return record;
return QueryEnumerable(requestMessage, r => Mapper.ConvertToEntity<T>(r), cancellationToken);
}

protected override void BeforeIntercept(RestRequest request)
Expand Down
83 changes: 59 additions & 24 deletions Client/QueryApi.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using InfluxDB.Client.Api.Domain;
Expand Down Expand Up @@ -236,6 +235,28 @@ IAsyncEnumerable<T> QueryAsyncEnumerable<T>(string query, string org = null,
IAsyncEnumerable<T> QueryAsyncEnumerable<T>(Query query, string org = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Executes the Flux query against the InfluxDB 2.x and returns
/// an asynchronous enumerable of <see cref="FluxRecord"/>.
/// </summary>
/// <param name="query">the flux query to execute</param>
/// <param name="org">specifies the source organization. If the org is not specified then is used config from <see cref="InfluxDBClientOptions.Org" />.</param>
/// <param name="cancellationToken">cancellation token</param>
/// <returns>An asynchronous enumerable of <see cref="FluxRecord"/>.</returns>
IAsyncEnumerable<FluxRecord> QueryAsyncEnumerable(string query, string org = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Executes the Flux query against the InfluxDB 2.x and returns
/// an asynchronous enumerable of <see cref="FluxRecord"/>.
/// </summary>
/// <param name="query">the flux query to execute</param>
/// <param name="org">specifies the source organization. If the org is not specified then is used config from <see cref="InfluxDBClientOptions.Org" />.</param>
/// <param name="cancellationToken">cancellation token</param>
/// <returns>An asynchronous enumerable of <see cref="FluxRecord"/>.</returns>
IAsyncEnumerable<FluxRecord> QueryAsyncEnumerable(Query query, string org = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Executes the Flux query against the InfluxDB and synchronously map whole response to <see cref="string"/> result.
///
Expand Down Expand Up @@ -442,16 +463,10 @@ await QueryAsync(query, consumer, ErrorConsumer, EmptyAction, org, cancellationT
/// <param name="cancellationToken">cancellation token</param>
/// <typeparam name="T">the type of measurement</typeparam>
/// <returns>Measurements which are matched the query</returns>
public async IAsyncEnumerable<T> QueryAsyncEnumerable<T>(string query, string org = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
public IAsyncEnumerable<T> QueryAsyncEnumerable<T>(string query, string org = null,
CancellationToken cancellationToken = default)
{
Arguments.CheckNonEmptyString(query, nameof(query));

var requestMessage = CreateRequest(CreateQuery(query), org);

await foreach (var record in QueryEnumerable(requestMessage, it => Mapper.ConvertToEntity<T>(it),
cancellationToken).ConfigureAwait(false))
yield return record;
return QueryAsyncEnumerable<T>(CreateQuery(query), org, cancellationToken);
}

/// <summary>
Expand All @@ -463,16 +478,40 @@ public async IAsyncEnumerable<T> QueryAsyncEnumerable<T>(string query, string or
/// <param name="cancellationToken">cancellation token</param>
/// <typeparam name="T">the type of measurement</typeparam>
/// <returns>Measurements which are matched the query</returns>
public async IAsyncEnumerable<T> QueryAsyncEnumerable<T>(Query query, string org = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
public IAsyncEnumerable<T> QueryAsyncEnumerable<T>(Query query, string org = null,
CancellationToken cancellationToken = default)
{
Arguments.CheckNotNull(query, nameof(query));
var request = CreateRequest(query, org);
return QueryEnumerable(request, r => Mapper.ConvertToEntity<T>(r), cancellationToken);
}

var requestMessage = CreateRequest(query, org);
/// <summary>
/// Executes the Flux query against the InfluxDB 2.x and returns
/// an asynchronous enumerable of <see cref="FluxRecord"/>.
/// </summary>
/// <param name="query">the flux query to execute</param>
/// <param name="org">specifies the source organization. If the org is not specified then is used config from <see cref="InfluxDBClientOptions.Org" />.</param>
/// <param name="cancellationToken">cancellation token</param>
/// <returns>An asynchronous enumerable of <see cref="FluxRecord"/>.</returns>
public IAsyncEnumerable<FluxRecord> QueryAsyncEnumerable(string query, string org = null,
CancellationToken cancellationToken = default)
{
return QueryAsyncEnumerable(CreateQuery(query), org, cancellationToken);
}

await foreach (var record in QueryEnumerable(requestMessage, it => Mapper.ConvertToEntity<T>(it),
cancellationToken).ConfigureAwait(false))
yield return record;
/// <summary>
/// Executes the Flux query against the InfluxDB 2.x and returns
/// an asynchronous enumerable of <see cref="FluxRecord"/>.
/// </summary>
/// <param name="query">the flux query to execute</param>
/// <param name="org">specifies the source organization. If the org is not specified then is used config from <see cref="InfluxDBClientOptions.Org" />.</param>
/// <param name="cancellationToken">cancellation token</param>
/// <returns>An asynchronous enumerable of <see cref="FluxRecord"/>.</returns>
public IAsyncEnumerable<FluxRecord> QueryAsyncEnumerable(Query query, string org = null,
CancellationToken cancellationToken = default)
{
var request = CreateRequest(query, org);
return QueryEnumerable(request, r => r, cancellationToken);
}

/// <summary>
Expand All @@ -494,8 +533,7 @@ public Task QueryAsync(string query, Action<FluxRecord> onNext, Action<Exception

var consumer = new FluxResponseConsumerRecord(onNext);

return QueryAsync(CreateQuery(query, Dialect), consumer, onError, onComplete, org,
cancellationToken);
return QueryAsync(CreateQuery(query, Dialect), consumer, onError, onComplete, org, cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -540,8 +578,7 @@ public Task QueryAsync<T>(string query, Action<T> onNext, Action<Exception> onEr

var consumer = new FluxResponseConsumerPoco<T>(onNext, Mapper);

return QueryAsync(CreateQuery(query, Dialect), consumer, onError, onComplete, org,
cancellationToken);
return QueryAsync(CreateQuery(query, Dialect), consumer, onError, onComplete, org, cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -801,9 +838,7 @@ internal static Query CreateQuery(string query, Dialect dialect = null)
{
Arguments.CheckNonEmptyString(query, nameof(query));

var created = new Query(query: query, dialect: dialect ?? Dialect);

return created;
return new Query(query: query, dialect: dialect ?? Dialect);
}
}
}