Skip to content

Commit 33cb1ec

Browse files
committed
feat: add support for FluxRecord streaming
1 parent d292ca7 commit 33cb1ec

File tree

4 files changed

+97
-44
lines changed

4 files changed

+97
-44
lines changed

Client.Core/Internal/AbstractQueryClient.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -203,14 +203,14 @@ protected async IAsyncEnumerable<T> QueryEnumerable<T>(
203203
)
204204
};
205205

206-
var stream = await RestClient.DownloadStreamAsync(query, cancellationToken).ConfigureAwait(false);
207-
await foreach (var (_, record) in _csvParser
208-
.ParseFluxResponseAsync(new StreamReader(stream), cancellationToken)
209-
.ConfigureAwait(false))
206+
using var stream = await RestClient.DownloadStreamAsync(query, cancellationToken).ConfigureAwait(false);
207+
using var sr = new StreamReader(stream);
208+
209+
await foreach (var (_, record) in _csvParser.ParseFluxResponseAsync(sr, cancellationToken).ConfigureAwait(false))
210+
{
210211
if (!(record is null))
211-
{
212212
yield return convert.Invoke(record);
213-
}
213+
}
214214
}
215215

216216
protected abstract void BeforeIntercept(RestRequest query);

Client.Test/QueryApiTest.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,31 @@ public async Task QueryAsyncEnumerable()
9898
await foreach (var item in measurements.ConfigureAwait(false)) list.Add(item);
9999

100100
Assert.AreEqual(2, list.Count);
101+
Assert.AreEqual("mem", list[0].Measurement);
102+
Assert.AreEqual("mem", list[1].Measurement);
103+
Assert.AreEqual(12.25, list[0].Value);
104+
Assert.AreEqual(13.00, list[1].Value);
105+
}
106+
107+
[Test]
108+
public async Task QueryAsyncEnumerableOfFluxRecords()
109+
{
110+
MockServer
111+
.Given(Request.Create().WithPath("/api/v2/query").UsingPost())
112+
.RespondWith(CreateResponse(Data));
113+
114+
var measurements = _queryApi.QueryAsyncEnumerable(
115+
new Query(null, "from(...)"),
116+
"my-org");
117+
118+
var list = new List<FluxRecord>();
119+
await foreach (var item in measurements.ConfigureAwait(false)) list.Add(item);
120+
121+
Assert.AreEqual(2, list.Count);
122+
Assert.AreEqual("mem", list[0].GetMeasurement());
123+
Assert.AreEqual("mem", list[1].GetMeasurement());
124+
Assert.AreEqual(12.25, list[0].GetValue());
125+
Assert.AreEqual(13.00, list[1].GetValue());
101126
}
102127

103128
[Test]

Client/InvokableScriptsApi.cs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
using System;
22
using System.Collections.Generic;
3-
using System.Net.Http;
4-
using System.Runtime.CompilerServices;
53
using System.Threading;
64
using System.Threading.Tasks;
75
using InfluxDB.Client.Api.Domain;
@@ -248,15 +246,13 @@ await QueryRaw(CreateRequest(scriptId, bindParams), Consumer, ErrorConsumer, Emp
248246
/// <param name="bindParams">Represent key/value pairs parameters to be injected into script</param>
249247
/// <param name="cancellationToken">Cancellation token</param>
250248
/// <returns>stream of FluxRecord</returns>
251-
public async IAsyncEnumerable<FluxRecord> InvokeScriptEnumerableAsync(string scriptId,
249+
public IAsyncEnumerable<FluxRecord> InvokeScriptEnumerableAsync(string scriptId,
252250
Dictionary<string, object> bindParams = default,
253-
[EnumeratorCancellation] CancellationToken cancellationToken = default)
251+
CancellationToken cancellationToken = default)
254252
{
255253
var requestMessage = CreateRequest(scriptId, bindParams);
256254

257-
await foreach (var record in QueryEnumerable(requestMessage, it => it, cancellationToken)
258-
.ConfigureAwait(false))
259-
yield return record;
255+
return QueryEnumerable(requestMessage, r => r, cancellationToken);
260256
}
261257

262258
/// <summary>
@@ -266,15 +262,13 @@ public async IAsyncEnumerable<FluxRecord> InvokeScriptEnumerableAsync(string scr
266262
/// <param name="bindParams">Represent key/value pairs parameters to be injected into script</param>
267263
/// <param name="cancellationToken">Cancellation token</param>
268264
/// <returns>stream of Measurement</returns>
269-
public async IAsyncEnumerable<T> InvokeScriptMeasurementsEnumerableAsync<T>(string scriptId,
265+
public IAsyncEnumerable<T> InvokeScriptMeasurementsEnumerableAsync<T>(string scriptId,
270266
Dictionary<string, object> bindParams = default,
271-
[EnumeratorCancellation] CancellationToken cancellationToken = default)
267+
CancellationToken cancellationToken = default)
272268
{
273269
var requestMessage = CreateRequest(scriptId, bindParams);
274270

275-
await foreach (var record in QueryEnumerable(requestMessage, it => Mapper.ConvertToEntity<T>(it),
276-
cancellationToken).ConfigureAwait(false))
277-
yield return record;
271+
return QueryEnumerable(requestMessage, r => Mapper.ConvertToEntity<T>(r), cancellationToken);
278272
}
279273

280274
protected override void BeforeIntercept(RestRequest request)

Client/QueryApi.cs

Lines changed: 60 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Net.Http;
4-
using System.Runtime.CompilerServices;
54
using System.Threading;
65
using System.Threading.Tasks;
76
using InfluxDB.Client.Api.Domain;
@@ -236,6 +235,28 @@ IAsyncEnumerable<T> QueryAsyncEnumerable<T>(string query, string org = null,
236235
IAsyncEnumerable<T> QueryAsyncEnumerable<T>(Query query, string org = null,
237236
CancellationToken cancellationToken = default);
238237

238+
/// <summary>
239+
/// Executes the Flux query against the InfluxDB 2.x and returns
240+
/// an asynchronous enumerable of <see cref="FluxRecord"/>.
241+
/// </summary>
242+
/// <param name="query">the flux query to execute</param>
243+
/// <param name="org">specifies the source organization. If the org is not specified then is used config from <see cref="InfluxDBClientOptions.Org" />.</param>
244+
/// <param name="cancellationToken">cancellation token</param>
245+
/// <returns>An asynchronous enumerable of <see cref="FluxRecord"/>.</returns>
246+
IAsyncEnumerable<FluxRecord> QueryAsyncEnumerable(string query, string org = null,
247+
CancellationToken cancellationToken = default);
248+
249+
/// <summary>
250+
/// Executes the Flux query against the InfluxDB 2.x and returns
251+
/// an asynchronous enumerable of <see cref="FluxRecord"/>.
252+
/// </summary>
253+
/// <param name="query">the flux query to execute</param>
254+
/// <param name="org">specifies the source organization. If the org is not specified then is used config from <see cref="InfluxDBClientOptions.Org" />.</param>
255+
/// <param name="cancellationToken">cancellation token</param>
256+
/// <returns>An asynchronous enumerable of <see cref="FluxRecord"/>.</returns>
257+
IAsyncEnumerable<FluxRecord> QueryAsyncEnumerable(Query query, string org = null,
258+
CancellationToken cancellationToken = default);
259+
239260
/// <summary>
240261
/// Executes the Flux query against the InfluxDB and synchronously map whole response to <see cref="string"/> result.
241262
///
@@ -442,16 +463,10 @@ await QueryAsync(query, consumer, ErrorConsumer, EmptyAction, org, cancellationT
442463
/// <param name="cancellationToken">cancellation token</param>
443464
/// <typeparam name="T">the type of measurement</typeparam>
444465
/// <returns>Measurements which are matched the query</returns>
445-
public async IAsyncEnumerable<T> QueryAsyncEnumerable<T>(string query, string org = null,
446-
[EnumeratorCancellation] CancellationToken cancellationToken = default)
466+
public IAsyncEnumerable<T> QueryAsyncEnumerable<T>(string query, string org = null,
467+
CancellationToken cancellationToken = default)
447468
{
448-
Arguments.CheckNonEmptyString(query, nameof(query));
449-
450-
var requestMessage = CreateRequest(CreateQuery(query), org);
451-
452-
await foreach (var record in QueryEnumerable(requestMessage, it => Mapper.ConvertToEntity<T>(it),
453-
cancellationToken).ConfigureAwait(false))
454-
yield return record;
469+
return QueryAsyncEnumerable<T>(CreateQuery(query), org, cancellationToken);
455470
}
456471

457472
/// <summary>
@@ -463,16 +478,40 @@ public async IAsyncEnumerable<T> QueryAsyncEnumerable<T>(string query, string or
463478
/// <param name="cancellationToken">cancellation token</param>
464479
/// <typeparam name="T">the type of measurement</typeparam>
465480
/// <returns>Measurements which are matched the query</returns>
466-
public async IAsyncEnumerable<T> QueryAsyncEnumerable<T>(Query query, string org = null,
467-
[EnumeratorCancellation] CancellationToken cancellationToken = default)
481+
public IAsyncEnumerable<T> QueryAsyncEnumerable<T>(Query query, string org = null,
482+
CancellationToken cancellationToken = default)
468483
{
469-
Arguments.CheckNotNull(query, nameof(query));
484+
var request = CreateRequest(query, org);
485+
return QueryEnumerable(request, r => Mapper.ConvertToEntity<T>(r), cancellationToken);
486+
}
470487

471-
var requestMessage = CreateRequest(query, org);
488+
/// <summary>
489+
/// Executes the Flux query against the InfluxDB 2.x and returns
490+
/// an asynchronous enumerable of <see cref="FluxRecord"/>.
491+
/// </summary>
492+
/// <param name="query">the flux query to execute</param>
493+
/// <param name="org">specifies the source organization. If the org is not specified then is used config from <see cref="InfluxDBClientOptions.Org" />.</param>
494+
/// <param name="cancellationToken">cancellation token</param>
495+
/// <returns>An asynchronous enumerable of <see cref="FluxRecord"/>.</returns>
496+
public IAsyncEnumerable<FluxRecord> QueryAsyncEnumerable(string query, string org = null,
497+
CancellationToken cancellationToken = default)
498+
{
499+
return QueryAsyncEnumerable(CreateQuery(query), org, cancellationToken);
500+
}
472501

473-
await foreach (var record in QueryEnumerable(requestMessage, it => Mapper.ConvertToEntity<T>(it),
474-
cancellationToken).ConfigureAwait(false))
475-
yield return record;
502+
/// <summary>
503+
/// Executes the Flux query against the InfluxDB 2.x and returns
504+
/// an asynchronous enumerable of <see cref="FluxRecord"/>.
505+
/// </summary>
506+
/// <param name="query">the flux query to execute</param>
507+
/// <param name="org">specifies the source organization. If the org is not specified then is used config from <see cref="InfluxDBClientOptions.Org" />.</param>
508+
/// <param name="cancellationToken">cancellation token</param>
509+
/// <returns>An asynchronous enumerable of <see cref="FluxRecord"/>.</returns>
510+
public IAsyncEnumerable<FluxRecord> QueryAsyncEnumerable(Query query, string org = null,
511+
CancellationToken cancellationToken = default)
512+
{
513+
var request = CreateRequest(query, org);
514+
return QueryEnumerable(request, r => r, cancellationToken);
476515
}
477516

478517
/// <summary>
@@ -494,8 +533,7 @@ public Task QueryAsync(string query, Action<FluxRecord> onNext, Action<Exception
494533

495534
var consumer = new FluxResponseConsumerRecord(onNext);
496535

497-
return QueryAsync(CreateQuery(query, Dialect), consumer, onError, onComplete, org,
498-
cancellationToken);
536+
return QueryAsync(CreateQuery(query, Dialect), consumer, onError, onComplete, org, cancellationToken);
499537
}
500538

501539
/// <summary>
@@ -540,8 +578,7 @@ public Task QueryAsync<T>(string query, Action<T> onNext, Action<Exception> onEr
540578

541579
var consumer = new FluxResponseConsumerPoco<T>(onNext, Mapper);
542580

543-
return QueryAsync(CreateQuery(query, Dialect), consumer, onError, onComplete, org,
544-
cancellationToken);
581+
return QueryAsync(CreateQuery(query, Dialect), consumer, onError, onComplete, org, cancellationToken);
545582
}
546583

547584
/// <summary>
@@ -779,8 +816,7 @@ private Task QueryAsync(Query query, FluxCsvParser.IFluxResponseConsumer consume
779816

780817
var requestMessage = CreateRequest(query, org);
781818

782-
return Query(requestMessage, consumer, onError ?? ErrorConsumer, onComplete ?? EmptyAction,
783-
cancellationToken);
819+
return Query(requestMessage, consumer, onError ?? ErrorConsumer, onComplete ?? EmptyAction, cancellationToken);
784820
}
785821

786822
private RestRequest CreateRequest(Query query, string org = null)
@@ -801,9 +837,7 @@ internal static Query CreateQuery(string query, Dialect dialect = null)
801837
{
802838
Arguments.CheckNonEmptyString(query, nameof(query));
803839

804-
var created = new Query(query: query, dialect: dialect ?? Dialect);
805-
806-
return created;
840+
return new Query(query: query, dialect: dialect ?? Dialect);
807841
}
808842
}
809843
}

0 commit comments

Comments
 (0)