Skip to content

Commit

Permalink
#19 lock文の箇所を調整
Browse files Browse the repository at this point in the history
  • Loading branch information
miyaji255 committed Mar 16, 2024
1 parent 119de8e commit d63ea33
Showing 1 changed file with 76 additions and 61 deletions.
137 changes: 76 additions & 61 deletions Epub/KoeBook.Epub/Services/ScrapingClientService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public sealed class ScrapingClientService : IScrapingClientService, IDisposable
{
private readonly IHttpClientFactory _httpClientFactory;
private readonly PeriodicTimer _periodicTimer;
private readonly Queue<Func<HttpClient, ValueTask>> _actionQueue = [];
private readonly Queue<Func<HttpClient, Task>> _actionQueue = [];
private bool _workerActivated;

public ScrapingClientService(IHttpClientFactory httpClientFactory, TimeProvider timeProvider)
Expand All @@ -18,86 +18,101 @@ public ScrapingClientService(IHttpClientFactory httpClientFactory, TimeProvider
Worker();
}

public void Dispose()
public Task<string> GetAsStringAsync(string url, CancellationToken ct)
{
_periodicTimer.Dispose();
}
var taskCompletion = new TaskCompletionSource<string>();

private async void Worker()
{
lock (_actionQueue)
{
_workerActivated = true;
}

while (await _periodicTimer.WaitForNextTickAsync().ConfigureAwait(false) && _actionQueue.Count > 0)
{
if (_actionQueue.TryDequeue(out var action))
_actionQueue.Enqueue(async httpClient =>
{
await action(_httpClientFactory.CreateClient()).ConfigureAwait(false);
}
}
if (ct.IsCancellationRequested)
taskCompletion.SetCanceled(ct);
try
{
var response = await httpClient.GetAsync(url, ct).ConfigureAwait(false);
taskCompletion.SetResult(await response.Content.ReadAsStringAsync(ct).ConfigureAwait(false));
}
catch (Exception ex)
{
taskCompletion.SetException(ex);
}
});

EnsureWorkerActivated();

lock (_actionQueue)
{
_workerActivated = false;
}
return taskCompletion.Task;
}

public Task<string> GetAsStringAsync(string url, CancellationToken ct)
public Task<ContentDispositionHeaderValue?> GetAsStreamAsync(string url, Stream destination, CancellationToken ct)
{
var taskCompletion = new TaskCompletionSource<string>();
_actionQueue.Enqueue(async httpClient =>
{
if (ct.IsCancellationRequested)
taskCompletion.SetCanceled(ct);
try
{
var response = await httpClient.GetAsync(url, ct).ConfigureAwait(false);
taskCompletion.SetResult(await response.Content.ReadAsStringAsync(ct).ConfigureAwait(false));
}
catch (Exception ex)
{
taskCompletion.SetException(ex);
}
});
var taskCompletion = new TaskCompletionSource<ContentDispositionHeaderValue?>();

lock (_actionQueue)
{
if (!_workerActivated)
Worker();
}
_actionQueue.Enqueue(async httpClient =>
{
if (ct.IsCancellationRequested)
taskCompletion.SetCanceled(ct);
try
{
var response = await httpClient.GetAsync(url, ct).ConfigureAwait(false);
await response.Content.CopyToAsync(destination, ct).ConfigureAwait(false);
taskCompletion.SetResult(response.Content.Headers.ContentDisposition);
}
catch (Exception ex)
{
taskCompletion.SetException(ex);
}
});

EnsureWorkerActivated();

return taskCompletion.Task;
}

public Task<ContentDispositionHeaderValue?> GetAsStreamAsync(string url, Stream destination, CancellationToken ct)
/// <summary>
/// <see cref="Worker"/>が起動していない場合は起動します
/// </summary>
private void EnsureWorkerActivated()
{
var taskCompletion = new TaskCompletionSource<ContentDispositionHeaderValue?>();
_actionQueue.Enqueue(async httpClient =>
{
if (ct.IsCancellationRequested)
taskCompletion.SetCanceled(ct);
bool activateWorker;
lock (_actionQueue) activateWorker = !_workerActivated;

try
{
var response = await httpClient.GetAsync(url, ct).ConfigureAwait(false);
await response.Content.CopyToAsync(destination, ct).ConfigureAwait(false);
taskCompletion.SetResult(response.Content.Headers.ContentDisposition);
}
catch (Exception ex)
{
taskCompletion.SetException(ex);
}
});
if (activateWorker)
Worker();
}

/// <summary>
/// <see cref="_actionQueue"/>のConsumer
/// 別スレッドでループさせるためにvoid
/// </summary>
private async void Worker()
{
lock (_actionQueue)
_workerActivated = true;

try
{
if (!_workerActivated)
Worker();
while (await _periodicTimer.WaitForNextTickAsync().ConfigureAwait(false) && _actionQueue.Count > 0)
{
Func<HttpClient, Task>? action;
lock (_actionQueue)
if (!_actionQueue.TryDequeue(out action))
continue;

await action(_httpClientFactory.CreateClient()).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
}
}
finally
{
lock (_actionQueue)
_workerActivated = false;
}
}

return taskCompletion.Task;
public void Dispose()
{
_periodicTimer.Dispose();
}
}

0 comments on commit d63ea33

Please sign in to comment.