diff --git a/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/BaseHttpDownloader.cs b/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/BaseHttpDownloader.cs index 7c33ea66..245d8c6c 100644 --- a/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/BaseHttpDownloader.cs +++ b/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/BaseHttpDownloader.cs @@ -1,6 +1,8 @@ using System; using System.IO; using System.Net; +using System.Collections.Generic; +using System.Threading; using JetBrains.Annotations; using PatchKit.Logging; using PatchKit.Network; @@ -15,9 +17,9 @@ public sealed class BaseHttpDownloader : IBaseHttpDownloader { private class Handler : DownloadHandlerScript { - Action _receiveData; + DataAvailableHandler _receiveData; - public Handler(Action receiveData) + public Handler(DataAvailableHandler receiveData) { _receiveData = receiveData; } @@ -32,25 +34,29 @@ protected override bool ReceiveData(byte[] data, int dataLength) private readonly ILogger _logger; - private static readonly int BufferSize = 5 * (int) Units.MB; + private static readonly ulong DefaultBufferSize = 5 * (ulong)Units.MB; private readonly string _url; private readonly int _timeout; + private readonly ulong _bufferSize; private readonly byte[] _buffer; private bool _downloadHasBeenCalled; private BytesRange? _bytesRange; - public event DataAvailableHandler DataAvailable; - public BaseHttpDownloader(string url, int timeout) : - this(url, timeout, PatcherLogManager.DefaultLogger) + this(url, timeout, PatcherLogManager.DefaultLogger, DefaultBufferSize) + { + } + + public BaseHttpDownloader([NotNull] string url, int timeout, ulong bufferSize) + : this(url, timeout, PatcherLogManager.DefaultLogger, bufferSize) { } public BaseHttpDownloader([NotNull] string url, int timeout, - [NotNull] ILogger logger) + [NotNull] ILogger logger, ulong bufferSize) { if (string.IsNullOrEmpty(url)) throw new ArgumentException("Value cannot be null or empty.", "url"); if (timeout <= 0) throw new ArgumentOutOfRangeException("timeout"); @@ -60,7 +66,8 @@ public BaseHttpDownloader([NotNull] string url, int timeout, _timeout = timeout; _logger = logger; - _buffer = new byte[BufferSize]; + _bufferSize = bufferSize; + _buffer = new byte[_bufferSize]; ServicePointManager.ServerCertificateValidationCallback = (sender, certificate, chain, errors) => true; @@ -77,13 +84,101 @@ public void SetBytesRange(BytesRange? range) } } - public void Download(CancellationToken cancellationToken) + public IEnumerable ReadPackets(CancellationToken cancellationToken) + { + CancellationTokenSource localCancel = new CancellationTokenSource(); + cancellationToken.Register(() => localCancel.Cancel()); + + var packetQueue = new Queue(); + bool isDone = false; + + Exception threadException = null; + + var threadHandle = new Thread(_ => + { + try + { + Download(localCancel.Token, (data, length) => + { + lock (packetQueue) + { + packetQueue.Enqueue(new DataPacket + { + Data = data, + Length = length, + }); + } + }); + } + catch (OperationCanceledException e) + { + if (cancellationToken.IsCancelled) + { + threadException = e; + } + } + catch (Exception e) + { + threadException = e; + } + + isDone = true; + }); + + Func safeCount = () => { + lock (packetQueue) + { + return packetQueue.Count; + } + }; + + try + { + threadHandle.Start(); + do + { + cancellationToken.ThrowIfCancellationRequested(); + if (safeCount() > 0) + { + lock (packetQueue) + { + foreach (var packet in packetQueue) + { + yield return packet; + } + packetQueue.Clear(); + } + } + else + { + Thread.Sleep(100); + } + } while (!isDone); + } + finally + { + localCancel.Cancel(); + threadHandle.Join(); + + if (threadException != null) + { + throw threadException; + } + } + } + + public void Download(CancellationToken cancellationToken, [NotNull] DataAvailableHandler onDataAvailable) { + if (onDataAvailable == null) + { + throw new ArgumentNullException("onDataAvailable"); + } + try { _logger.LogDebug("Downloading..."); _logger.LogTrace("url = " + _url); - _logger.LogTrace("bufferSize = " + BufferSize); + _logger.LogTrace("bufferSize = " + _bufferSize); _logger.LogTrace("bytesRange = " + (_bytesRange.HasValue ? _bytesRange.Value.Start + "-" + _bytesRange.Value.End : "(none)")); @@ -93,7 +188,7 @@ public void Download(CancellationToken cancellationToken) UnityWebRequest request = null; - UnityDispatcher.Invoke(() => + UnityDispatcher.Invoke(() => { request = new UnityWebRequest(); request.uri = new Uri(_url); @@ -101,25 +196,24 @@ public void Download(CancellationToken cancellationToken) if (_bytesRange.HasValue) { - var bytesRangeEndText = + var bytesRangeEndText = _bytesRange.Value.End >= 0L ? _bytesRange.Value.End.ToString() : string.Empty; request.SetRequestHeader( - "Range", + "Range", "bytes=" + _bytesRange.Value.Start + "-" + bytesRangeEndText); } - request.downloadHandler = new Handler(OnDataAvailable); - + request.downloadHandler = new Handler(onDataAvailable); }).WaitOne(); using (request) { - using(request.downloadHandler) + using (request.downloadHandler) { UnityWebRequestAsyncOperation op = null; - UnityDispatcher.Invoke(() => + UnityDispatcher.Invoke(() => { op = request.SendWebRequest(); }).WaitOne(); @@ -128,7 +222,7 @@ public void Download(CancellationToken cancellationToken) while (requestResponseCode <= 0) { - UnityDispatcher.Invoke(() => + UnityDispatcher.Invoke(() => { requestResponseCode = request.responseCode; }).WaitOne(); @@ -137,11 +231,11 @@ public void Download(CancellationToken cancellationToken) System.Threading.Thread.Sleep(100); } - + _logger.LogDebug("Received response from server."); _logger.LogTrace("statusCode = " + requestResponseCode); - if (Is2XXStatus((HttpStatusCode) requestResponseCode)) + if (Is2XXStatus((HttpStatusCode)requestResponseCode)) { _logger.LogDebug("Successful response. Reading response stream..."); @@ -149,7 +243,7 @@ public void Download(CancellationToken cancellationToken) while (!opIsDone) { - UnityDispatcher.Invoke(() => + UnityDispatcher.Invoke(() => { opIsDone = op.isDone; }).WaitOne(); @@ -161,16 +255,16 @@ public void Download(CancellationToken cancellationToken) _logger.LogDebug("Stream has been read."); } - else if (Is4XXStatus((HttpStatusCode) requestResponseCode)) + else if (Is4XXStatus((HttpStatusCode)requestResponseCode)) { throw new DataNotAvailableException(string.Format( - "Request data for {0} is not available (status: {1})", _url, (HttpStatusCode) request.responseCode)); + "Request data for {0} is not available (status: {1})", _url, (HttpStatusCode)request.responseCode)); } else { throw new ServerErrorException(string.Format( "Server has experienced some issues with request for {0} which resulted in {1} status code.", - _url, (HttpStatusCode) requestResponseCode)); + _url, (HttpStatusCode)requestResponseCode)); } } } @@ -190,22 +284,15 @@ public void Download(CancellationToken cancellationToken) } } - // ReSharper disable once InconsistentNaming private static bool Is2XXStatus(HttpStatusCode statusCode) { - return (int) statusCode >= 200 && (int) statusCode <= 299; + return (int)statusCode >= 200 && (int)statusCode <= 299; } // ReSharper disable once InconsistentNaming private static bool Is4XXStatus(HttpStatusCode statusCode) { - return (int) statusCode >= 400 && (int) statusCode <= 499; - } - - private void OnDataAvailable(byte[] data, int length) - { - var handler = DataAvailable; - if (handler != null) handler(data, length); + return (int)statusCode >= 400 && (int)statusCode <= 499; } } } \ No newline at end of file diff --git a/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/ChunkedFileStream.cs b/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/ChunkedFileStream.cs index 10649ff4..ef7211c3 100644 --- a/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/ChunkedFileStream.cs +++ b/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/ChunkedFileStream.cs @@ -9,16 +9,16 @@ namespace PatchKit.Unity.Patcher.AppData.Remote.Downloaders { /// /// Helps to save a file making a hash-checking of its chunks during the save process. - /// + /// /// ChunkedFileStream is a file that has hashes defined for its segments (chunks). Chunks size /// has to be predefined. - /// + /// /// Usage: /// Use Write() function as usuall to write bytes. As soon as you will catch InvalidChunkDataException stop copying /// procedure and restart it from next byte after VerifiedLength. If you will try to write /// bytes above the limit, you will get ArgumentOutOfRangeException. /// - public sealed class ChunkedFileStream : IDisposable + public sealed class ChunkedFileStream : ChunkedStream { [Flags] public enum WorkFlags @@ -29,78 +29,25 @@ public enum WorkFlags private readonly ILogger _logger; - public delegate byte[] HashFunction(byte[] buffer, int offset, int length); - - private readonly ChunksData _chunksData; - private readonly long _fileSize; - private readonly HashFunction _hashFunction; - - private readonly byte[] _buffer; - private int _bufferPos; - private int _chunkIndex; - - private int _startChunk; - private FileStream _fileStream; - private bool _disposed; - - public long VerifiedLength - { - get { return Math.Min(_chunkIndex * _chunksData.ChunkSize, _fileSize); } - } - - public long SavedLength - { - get { return VerifiedLength + _bufferPos; } - } - - public long RemainingLength - { - get { return _fileSize - VerifiedLength; } - } - - public long Length - { - get { return _fileSize; } - } - - public ChunkedFileStream([NotNull] string path, long fileSize, ChunksData chunksData, - [NotNull] HashFunction hashFunction, - WorkFlags workFlags = WorkFlags.None, int startChunk = 0, int endChunk = -1) + public static ChunkedFileStream CreateChunkedFileStream([NotNull] string path, long fileSize, ChunksData chunksData, + [NotNull] HashFunction hashFunction, WorkFlags workFlags = WorkFlags.None, int startChunk = 0, int endChunk = -1) { if (path == null) throw new ArgumentNullException("path"); if (fileSize <= 0) throw new ArgumentOutOfRangeException("fileSize"); if (hashFunction == null) throw new ArgumentNullException("hashFunction"); - _logger = PatcherLogManager.DefaultLogger; - _chunksData = chunksData; - _hashFunction = hashFunction; - - _buffer = new byte[_chunksData.ChunkSize]; + var logger = PatcherLogManager.DefaultLogger; - _logger.LogTrace("path = " + path); - _logger.LogTrace("chunksData.ChunkSize = " + chunksData.ChunkSize); + logger.LogTrace("path = " + path); - _startChunk = startChunk; - bool noEndChunk = endChunk == -1; - bool isLastChunkIncomplete = endChunk * chunksData.ChunkSize > fileSize; - - if (noEndChunk || isLastChunkIncomplete) - { - _fileSize = fileSize - (startChunk * chunksData.ChunkSize); - } - else - { - _fileSize = (endChunk - startChunk) * chunksData.ChunkSize; - } + bool preservePreviousFile = (workFlags | WorkFlags.PreservePreviousFile) != 0; - _logger.LogTrace("fileSize = " + fileSize); + logger.LogTrace("preservePreviousFile = " + preservePreviousFile); - bool preservePreviousFile = (workFlags | WorkFlags.PreservePreviousFile) != 0; - - _logger.LogTrace("preservePreviousFile = " + preservePreviousFile); + FileStream fileStream = null; if (preservePreviousFile) { @@ -109,147 +56,54 @@ public ChunkedFileStream([NotNull] string path, long fileSize, ChunksData chunks // It does not check the hash of the file. It trusts that the file is already valid up to that point. // Because the only way to download the file should be using Chunked Downloader. - _logger.LogDebug("Opening file stream..."); - _fileStream = new FileStream(path, FileMode.OpenOrCreate, FileAccess.Write, FileShare.None); - _fileStream.Seek(0, SeekOrigin.End); // seek and stay at the end, so we can append - long currentFileSize = _fileStream.Position; - _logger.LogDebug("File stream opened."); - _logger.LogTrace("currentFileSize = " + currentFileSize); + logger.LogDebug("Trying to preserve previous file, opening file stream..."); + fileStream = new FileStream(path, FileMode.OpenOrCreate, FileAccess.Write, FileShare.None); + fileStream.Seek(0, SeekOrigin.End); // seek and stay at the end, so we can append - _logger.LogDebug("Checking whether stream can append to current file..."); + long currentFileSize = fileStream.Position; - if (currentFileSize == 0) - { - _logger.LogDebug("File is new. Append is not possible."); - } - // Let's make sure that file size is a multiply of chunk size. - // If not, something is wrong with the file. - else if (currentFileSize % chunksData.ChunkSize == 0) - { - _chunkIndex = (int) (currentFileSize / chunksData.ChunkSize); - _logger.LogDebug(string.Format("Append is possible - starting from {0} chunk index.", _chunkIndex)); - } - else + logger.LogDebug("File stream opened."); + logger.LogTrace("currentFileSize = " + currentFileSize); + + logger.LogDebug("Checking whether stream can append to current file..."); + + if (currentFileSize != 0 && currentFileSize % chunksData.ChunkSize != 0) { - _logger.LogDebug(string.Format( + logger.LogDebug(string.Format( "File size {0} is not a multiply of chunk size: {1}. Append is not possible - recreating file.", currentFileSize, chunksData.ChunkSize)); - _logger.LogDebug("Closing previous file stream..."); - _fileStream.Close(); - _fileStream.Dispose(); - _logger.LogDebug("Previous file stream closed."); + logger.LogDebug("Closing previous file stream..."); + fileStream.Close(); + fileStream.Dispose(); + logger.LogDebug("Previous file stream closed."); - OpenNewFileStream(path); + logger.LogDebug("Opening new file stream"); + fileStream = OpenNewFileStream(path); } } else { - OpenNewFileStream(path); + logger.LogDebug("Not preserving previous file, opening new file stream"); + fileStream = OpenNewFileStream(path); } - } - private void OpenNewFileStream(string path) - { - _logger.LogDebug("Opening new file stream..."); - _fileStream = new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None); - _logger.LogDebug("New file stream opened."); + return new ChunkedFileStream(fileStream, fileSize, chunksData, hashFunction, startChunk, endChunk); } - /// - /// Writes buffer into the file. If false is returned, stop file transfer and resume it - /// starting from VerifiedLength + 1 byte. - /// - /// - /// - /// - /// - public void Write([NotNull] byte[] buffer, int offset, int count) + private static FileStream OpenNewFileStream(string path) { - if (buffer == null) throw new ArgumentNullException("buffer"); - if (offset < 0) throw new ArgumentOutOfRangeException("offset"); - if (count < 0) throw new ArgumentOutOfRangeException("count"); - - do - { - if (RemainingLength == 0) - { - throw new ArgumentOutOfRangeException( - "Cannot write bytes over the file size: " + _fileSize); - } - - int copyNum = (int) Math.Min(Math.Min(count, _chunksData.ChunkSize - _bufferPos), RemainingLength); - Array.Copy(buffer, offset, _buffer, _bufferPos, copyNum); - - count -= copyNum; - offset += copyNum; - _bufferPos += copyNum; - - if (ChunkFullyInBuffer()) - { - if (BufferedChunkValid()) - { - FlushBuffer(); - } - else - { - DiscardBuffer(); - throw new InvalidChunkDataException("Invalid chunk data."); - } - } - } while (count > 0); - } - - private bool ChunkFullyInBuffer() - { - return _bufferPos == Math.Min(_chunksData.ChunkSize, RemainingLength); - } - - private bool BufferedChunkValid() - { - byte[] bufferHash = _hashFunction(_buffer, 0, (int) Math.Min(_chunksData.ChunkSize, RemainingLength)); - byte[] chunkHash = _chunksData.Chunks[_chunkIndex + _startChunk].Hash; - - return bufferHash.SequenceEqual(chunkHash); - } - - private void FlushBuffer() - { - _fileStream.Write(_buffer, 0, (int) Math.Min(_chunksData.ChunkSize, RemainingLength)); - _bufferPos = 0; - _chunkIndex++; + return new FileStream(path, FileMode.Create, FileAccess.Write, FileShare.None); } - private void DiscardBuffer() + public ChunkedFileStream([NotNull] FileStream fileStream, long fileSize, ChunksData chunksData, + [NotNull] HashFunction hashFunction, int startChunk = 0, int endChunk = -1) + : base(fileStream, fileSize, chunksData, hashFunction, startChunk, endChunk) { - _bufferPos = 0; - } - - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - ~ChunkedFileStream() - { - Dispose(false); - } - - private void Dispose(bool disposing) - { - if (_disposed) - { - return; - } - - if (disposing) - { - _fileStream.Dispose(); - } - - _disposed = true; + if (fileSize <= 0) throw new ArgumentOutOfRangeException("fileSize"); + if (hashFunction == null) throw new ArgumentNullException("hashFunction"); } } } \ No newline at end of file diff --git a/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/ChunkedHttpDownloader.cs b/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/ChunkedHttpDownloader.cs index db42c800..ad8787ed 100644 --- a/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/ChunkedHttpDownloader.cs +++ b/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/ChunkedHttpDownloader.cs @@ -8,6 +8,7 @@ using PatchKit.Logging; using PatchKit.Network; using PatchKit.Unity.Patcher.AppData.FileSystem; +using PatchKit.Unity.Patcher.AppUpdater.Status; using PatchKit.Unity.Patcher.Debug; using PatchKit.Unity.Utilities; using CancellationToken = PatchKit.Unity.Patcher.Cancellation.CancellationToken; @@ -34,6 +35,18 @@ public DownloadJob(string url, long start = 0, long end = -1) public BytesRange Range; } + public struct UrlPair + { + public UrlPair(ResourceUrl primary, ResourceUrl? secondary = null) + { + Primary = primary; + Secondary = secondary; + } + + public readonly ResourceUrl Primary; + public readonly ResourceUrl? Secondary; + } + private readonly ILogger _logger; private readonly IRequestTimeoutCalculator _timeoutCalculator = new SimpleRequestTimeoutCalculator(); @@ -50,6 +63,8 @@ public DownloadJob(string url, long start = 0, long end = -1) private bool _downloadHasBeenCalled; + private bool _hasCheckedAnotherNode = false; + private BytesRange _range = new BytesRange(0, -1); public event DownloadProgressChangedHandler DownloadProgressChanged; @@ -92,7 +107,7 @@ private ChunkedFileStream OpenFileStream(CancellationToken cancellationToken) } _logger.LogTrace(string.Format("Opening chunked file stream for chunks {0}-{1}", startChunk, endChunk)); - return new ChunkedFileStream(_destinationFilePath, _size, _chunksData, + return ChunkedFileStream.CreateChunkedFileStream(_destinationFilePath, _size, _chunksData, HashFunction, ChunkedFileStream.WorkFlags.PreservePreviousFile, startChunk, endChunk); } @@ -125,8 +140,22 @@ public void Download(CancellationToken cancellationToken) do { + var urlsWithBackup = Enumerable.Range(0, _urls.Length) + // .Select(idx => _urls.Length - (idx + 1)) // Reverse, only for testing purposes + .Select(idx => { + var nextIdx = idx + 1; + if (nextIdx >= _urls.Length) + { + return new UrlPair(_urls[idx]); + } + else + { + return new UrlPair(_urls[idx], _urls[nextIdx]); + } + }); + bool success = - _urls.Any(url => TryDownload(url, fileStream, cancellationToken)); + urlsWithBackup.Any(urlPair => TryDownload(urlPair.Primary, urlPair.Secondary, fileStream, cancellationToken)); if (success) { @@ -161,62 +190,113 @@ public void Download(CancellationToken cancellationToken) } } - private bool TryDownload(ResourceUrl url, ChunkedFileStream fileStream, CancellationToken cancellationToken) + private bool TryDownload(ResourceUrl url, ResourceUrl? secondaryUrl, ChunkedFileStream fileStream, CancellationToken cancellationToken) { try { - _logger.LogDebug(string.Format("Trying to download from {0}", url.Url)); - _logger.LogTrace("fileStream.VerifiedLength = " + fileStream.VerifiedLength); + _logger.LogDebug(string.Format("Downloading from {0}", url)); + fileStream.ClearUnverified(); + if (!secondaryUrl.HasValue) + { + _logger.LogDebug("Secondary url is null"); + } + + var nodeTester = secondaryUrl.HasValue ? new NodeTester(secondaryUrl.Value) : null; + + var downloadLogIntervalStopwatch = new Stopwatch(); + + var calculator = new DownloadSpeedCalculator(); + IEnumerable downloadJobQueue = BuildDownloadJobQueue(url, fileStream.VerifiedLength); - var downloadJobQueue = BuildDownloadJobQueue(url, fileStream.VerifiedLength); foreach (var downloadJob in downloadJobQueue) { - _logger.LogDebug(string.Format("Executing download job {0} with offest {1}", downloadJob.Url, - downloadJob.Range.Start)); + _logger.LogDebug(string.Format("Executing download job {0} with offest {1}", + downloadJob.Url, downloadJob.Range.Start)); _logger.LogTrace("fileStream.VerifiedLength = " + fileStream.VerifiedLength); _logger.LogTrace("fileStream.SavedLength = " + fileStream.SavedLength); var baseHttpDownloader = new BaseHttpDownloader(downloadJob.Url, 30000); baseHttpDownloader.SetBytesRange(downloadJob.Range); - const long downloadStatusLogInterval = 5000L; - var stopwatch = Stopwatch.StartNew(); + downloadLogIntervalStopwatch.Start(); - long downloadedBytes = 0; + ulong totalBytesDownloaded = 0; - var job = downloadJob; - baseHttpDownloader.DataAvailable += (bytes, length) => + foreach (var dataPacket in baseHttpDownloader.ReadPackets(cancellationToken)) { - fileStream.Write(bytes, 0, length); + if (!_hasCheckedAnotherNode + && nodeTester != null + && nodeTester.CanStart(_size, calculator)) + { + _logger.LogDebug("Testing secondary url"); + Exception exception = nodeTester.TryStart(cancellationToken); + if (exception != null) + { + _logger.LogWarning(string.Format("Node tester failed to start: {}", exception), exception); + } + } - downloadedBytes += length; + if (nodeTester != null && nodeTester.IsDone) + { + _logger.LogDebug("Secondary url test finished."); + _logger.LogTrace(string.Format("Current download speed {0} bps", calculator.BytesPerSecond)); + _logger.LogTrace(string.Format("Secondary node download speed {0} bps", nodeTester.BytesPerSecond)); - if (stopwatch.ElapsedMilliseconds > downloadStatusLogInterval) + if (nodeTester.BytesPerSecond.HasValue && nodeTester.BytesPerSecond.Value > 2 * calculator.BytesPerSecond) + { + _logger.LogDebug("Secondary url download speed is 2 times faster, switching."); + _hasCheckedAnotherNode = true; + return false; + } + else if(!nodeTester.BytesPerSecond.HasValue) + { + _logger.LogWarning("Secondary node download speed was null. An error probably occured during testing, not switching."); + _hasCheckedAnotherNode = true; + nodeTester = null; + } + else + { + _logger.LogDebug("Secondary node download speed was not 2 times faster, not switching."); + _hasCheckedAnotherNode = true; + nodeTester = null; + } + } + + int length = dataPacket.Length; + + totalBytesDownloaded += (ulong) length; + + fileStream.Write(dataPacket.Data, 0, length); + + if (downloadLogIntervalStopwatch.Elapsed > TimeSpan.FromSeconds(5)) { - stopwatch.Reset(); - stopwatch.Start(); + downloadLogIntervalStopwatch.Reset(); + downloadLogIntervalStopwatch.Start(); - _logger.LogDebug(string.Format("Downloaded {0} from {1}", downloadedBytes, job.Url)); + _logger.LogDebug(string.Format("Downloaded {0} from {1}", totalBytesDownloaded, downloadJob.Url)); _logger.LogTrace("fileStream.VerifiedLength = " + fileStream.VerifiedLength); _logger.LogTrace("fileStream.SavedLength = " + fileStream.SavedLength); + _logger.LogTrace("calculator.BytesPerSecond = " + calculator.BytesPerSecond); } - OnDownloadProgressChanged(fileStream.VerifiedLength); - }; - - baseHttpDownloader.Download(cancellationToken); + if (!_hasCheckedAnotherNode) + { + calculator.AddSample((long) totalBytesDownloaded, DateTime.Now); + } - _logger.LogDebug("Download job execution success."); - _logger.LogTrace("fileStream.VerifiedLength = " + fileStream.VerifiedLength); - _logger.LogTrace("fileStream.SavedLength = " + fileStream.SavedLength); + OnDownloadProgressChanged(fileStream.VerifiedLength); + } } + _logger.LogDebug("Download job execution success."); + _logger.LogTrace("fileStream.VerifiedLength = " + fileStream.VerifiedLength); + _logger.LogTrace("fileStream.SavedLength = " + fileStream.SavedLength); + if (fileStream.RemainingLength != 0) { throw new IncompleteDataException("Chunks downloading must finish downloading whole file"); } - _logger.LogDebug(string.Format("Download from {0} has been successful.", url.Url)); return true; } catch (IncompleteDataException e) @@ -261,7 +341,7 @@ public static IEnumerable BuildDownloadJobQueue(ResourceUrl resourc // The effective range is the original range contained within multiples of chunk size BytesRange effectiveRange = range.Chunkify(chunksData); var dataBounds = new BytesRange(currentOffset, -1); - + BytesRange bounds = effectiveRange.ContainIn(dataBounds); // An uncommon edge case might occur, in which bounds.Start is equal to dataSize, @@ -298,7 +378,7 @@ public static IEnumerable BuildDownloadJobQueue(ResourceUrl resourc lastPart += 1; } } - + long lastByte = dataSize - 1; for (int i = firstPart; i < lastPart; i++) diff --git a/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/ChunkedStream.cs b/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/ChunkedStream.cs new file mode 100644 index 00000000..a777e089 --- /dev/null +++ b/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/ChunkedStream.cs @@ -0,0 +1,218 @@ +using System; +using System.IO; +using System.Linq; +using JetBrains.Annotations; +using PatchKit.Logging; +using PatchKit.Unity.Patcher.Debug; + +namespace PatchKit.Unity.Patcher.AppData.Remote.Downloaders +{ + public interface IChunkedStream + { + long VerifiedLength { get; } + long SavedLength { get; } + long RemainingLength { get; } + long Length { get; } + + void Write([NotNull] byte[] buffer, int offset, int count); + void ClearUnverified(); + } + + /// + /// A generic implementation of a chunked data stream. Can be used to save chunked data into any stream be it file or buffer. + /// + /// Be aware that this class doesn't implement the Stream interface. But you can use the Write method as if it did. + /// + public class ChunkedStream : IChunkedStream, IDisposable where T: Stream + { + private readonly ILogger _logger; + + public delegate byte[] HashFunction(byte[] buffer, int offset, int length); + + private readonly ChunksData _chunksData; + private readonly long _size; + private readonly HashFunction _hashFunction; + + private readonly byte[] _buffer; + private int _bufferPos; + protected int _chunkIndex; + + private int _startChunk; + + private T _targetStream; + + private bool _disposed; + + public long VerifiedLength + { + get { return Math.Min(_chunkIndex * _chunksData.ChunkSize, _size); } + } + + public long SavedLength + { + get { return VerifiedLength + _bufferPos; } + } + + public long RemainingLength + { + get { return _size - VerifiedLength; } + } + + public long Length + { + get { return _size; } + } + + public ChunkedStream([NotNull] T targetStream, long size, ChunksData chunksData, + [NotNull] HashFunction hashFunction, int startChunk = 0, int endChunk = -1) + { + if (size <= 0) throw new ArgumentOutOfRangeException("size"); + if (hashFunction == null) throw new ArgumentNullException("hashFunction"); + if (!targetStream.CanSeek) throw new ArgumentException("Target stream must support seeking", "targetStream"); + if (!targetStream.CanWrite) throw new ArgumentException("Target stream must support writing", "targetStream"); + + _targetStream = targetStream; + + _logger = PatcherLogManager.DefaultLogger; + _chunksData = chunksData; + _hashFunction = hashFunction; + + _buffer = new byte[_chunksData.ChunkSize]; + + _logger.LogTrace("chunksData.ChunkSize = " + chunksData.ChunkSize); + + _startChunk = startChunk; + + bool noEndChunk = endChunk == -1; + bool isLastChunkIncomplete = !noEndChunk && (endChunk * chunksData.ChunkSize > size); + + if (noEndChunk || isLastChunkIncomplete) + { + _size = size - (startChunk * chunksData.ChunkSize); + } + else + { + _size = (endChunk - startChunk) * chunksData.ChunkSize; + } + + _logger.LogTrace("size = " + size); + + long currentStreamSize = _targetStream.Seek(0, SeekOrigin.End); + + if (currentStreamSize == 0) + { + _logger.LogDebug("Stream is empty. Append is not possible"); + } + else if (currentStreamSize % chunksData.ChunkSize == 0) + { + _chunkIndex = (int) (currentStreamSize / chunksData.ChunkSize); + _logger.LogDebug(string.Format("Append is possible - starting from {0} chunk index", _chunkIndex)); + } + else + { + _logger.LogError(string.Format("Stream size {0} is not a multiple of chunk size {1}. Stream is invalid.")); + throw new ArgumentException("Stream is invalid", "targetStream"); + } + } + + /// + /// Writes buffer into the file. If false is returned, stop file transfer and resume it + /// starting from VerifiedLength + 1 byte. + /// + /// + /// + /// + /// + public void Write([NotNull] byte[] buffer, int offset, int count) + { + if (buffer == null) throw new ArgumentNullException("buffer"); + if (offset < 0) throw new ArgumentOutOfRangeException("offset"); + if (count < 0) throw new ArgumentOutOfRangeException("count"); + + do + { + if (RemainingLength == 0) + { + throw new ArgumentOutOfRangeException( + "Cannot write bytes over the file size: " + _size); + } + + int copyNum = (int) Math.Min(Math.Min(count, _chunksData.ChunkSize - _bufferPos), RemainingLength); + Array.Copy(buffer, offset, _buffer, _bufferPos, copyNum); + + count -= copyNum; + offset += copyNum; + _bufferPos += copyNum; + + if (ChunkFullyInBuffer()) + { + if (BufferedChunkValid()) + { + FlushBuffer(); + } + else + { + DiscardBuffer(); + throw new InvalidChunkDataException("Invalid chunk data."); + } + } + } while (count > 0); + } + + public void ClearUnverified() + { + DiscardBuffer(); + } + + private bool ChunkFullyInBuffer() + { + return _bufferPos == Math.Min(_chunksData.ChunkSize, RemainingLength); + } + + private bool BufferedChunkValid() + { + byte[] bufferHash = _hashFunction(_buffer, 0, (int) Math.Min(_chunksData.ChunkSize, RemainingLength)); + byte[] chunkHash = _chunksData.Chunks[_chunkIndex + _startChunk].Hash; + + return bufferHash.SequenceEqual(chunkHash); + } + + private void FlushBuffer() + { + _targetStream.Write(_buffer, 0, (int) Math.Min(_chunksData.ChunkSize, RemainingLength)); + _bufferPos = 0; + _chunkIndex++; + } + + private void DiscardBuffer() + { + _bufferPos = 0; + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + ~ChunkedStream() + { + Dispose(false); + } + + private void Dispose(bool disposing) + { + if (_disposed) + { + return; + } + + if (disposing) + { + _targetStream.Dispose(); + } + + _disposed = true; + } + } +} \ No newline at end of file diff --git a/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/ChunkedStream.cs.meta b/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/ChunkedStream.cs.meta new file mode 100644 index 00000000..c77278a9 --- /dev/null +++ b/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/ChunkedStream.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: ab8c81913c8aa404f83ff2930302d856 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/HttpDownloader.cs b/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/HttpDownloader.cs index fd88dd81..98025179 100644 --- a/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/HttpDownloader.cs +++ b/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/HttpDownloader.cs @@ -121,7 +121,7 @@ private bool TryDownload(string url, FileStream fileStream, CancellationToken ca var stopwatch = Stopwatch.StartNew(); var baseHttpDownloader = new BaseHttpDownloader(url, 30000); - baseHttpDownloader.DataAvailable += (bytes, length) => + baseHttpDownloader.Download(cancellationToken, (bytes, length) => { fileStream.Write(bytes, 0, length); @@ -135,9 +135,7 @@ private bool TryDownload(string url, FileStream fileStream, CancellationToken ca downloadedBytes += length; OnDownloadProgressChanged(downloadedBytes); - }; - - baseHttpDownloader.Download(cancellationToken); + }); _logger.LogDebug(string.Format("Download from {0} has been successful.", url)); diff --git a/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/IBaseHttpDownloader.cs b/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/IBaseHttpDownloader.cs index 4e5ebebb..ad766b59 100644 --- a/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/IBaseHttpDownloader.cs +++ b/Assets/PatchKit Patcher/Scripts/AppData/Remote/Downloaders/IBaseHttpDownloader.cs @@ -1,17 +1,25 @@ -using PatchKit.Network; +using System.Collections.Generic; +using PatchKit.Network; using PatchKit.Unity.Patcher.Cancellation; +using JetBrains.Annotations; namespace PatchKit.Unity.Patcher.AppData.Remote.Downloaders { + public struct DataPacket + { + public byte[] Data; + public int Length; + } + /// /// Base HTTP downloader. /// public interface IBaseHttpDownloader { - event DataAvailableHandler DataAvailable; - void SetBytesRange(BytesRange? range); - void Download(CancellationToken cancellationToken); + void Download(CancellationToken cancellationToken, [NotNull] DataAvailableHandler onDataAvailable); + + IEnumerable ReadPackets(CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/Assets/PatchKit Patcher/Scripts/AppData/Remote/GarbageNodeTester.cs b/Assets/PatchKit Patcher/Scripts/AppData/Remote/GarbageNodeTester.cs new file mode 100644 index 00000000..c24f71b5 --- /dev/null +++ b/Assets/PatchKit Patcher/Scripts/AppData/Remote/GarbageNodeTester.cs @@ -0,0 +1,143 @@ +using System; +using System.Diagnostics; +using System.Threading; +using PatchKit.Logging; +using PatchKit.Unity.Patcher.Debug; +using PatchKit.Unity.Patcher.AppUpdater.Status; +using PatchKit.Unity.Patcher.AppData.Remote.Downloaders; +using CancellationToken = PatchKit.Unity.Patcher.Cancellation.CancellationToken; +using CancellationTokenSource = PatchKit.Unity.Patcher.Cancellation.CancellationTokenSource; + +namespace PatchKit.Unity.Patcher.AppData.Remote +{ + public interface IGarbageNodeTester + { + void Start(CancellationToken cancellationToken); + + bool IsDone { get; } + + bool WasSuccess { get; } + + double BytesPerSecond { get; } + } + + public class GarbageNodeTester : IGarbageNodeTester + { + private static readonly int Timeout = 10000; + + private readonly string _url; + private readonly ulong _size; + private readonly double _seed; + + private Thread _thread = null; + + private DownloadSpeedCalculator _calculator = new DownloadSpeedCalculator(); + + private bool _isDone; + + private bool _wasSuccess; + + private static readonly ulong DefaultSize = 50; + private static readonly double DefaultSeed = 0.123; + + private static readonly TimeSpan MaxTestDuration = TimeSpan.FromSeconds(15.0); + + private static string PruneUrl(string url, ulong size, double seed) + { + var uri = new Uri(url); + return string.Format("{3}://{0}:8888/garbage.php?r={1}&ckSize={2}", uri.Host, seed, size, uri.Scheme); + } + + public GarbageNodeTester(string url) + : this(url, DefaultSize, DefaultSeed) + { + } + + public GarbageNodeTester(string url, ulong size, double seed) + { + if (string.IsNullOrEmpty(url)) + { + throw new ArgumentException("url cannot be null or empty", "url"); + } + + if (size == 0) + { + throw new ArgumentException("size must be a positive value", "size"); + } + + _url = url; + _size = size; + _seed = seed; + } + + public bool IsDone + { + get + { + return _isDone; + } + } + + public bool WasSuccess + { + get + { + return _wasSuccess; + } + } + + public double BytesPerSecond + { + get + { + if (!_isDone) + { + PatcherLogManager.DefaultLogger.LogError("Reading BPS from an unfinished GarbageNodeTester."); + } + lock (_calculator) + + { + return _calculator.BytesPerSecond; + } + } + } + + public void Start(CancellationToken cancellationToken) + { + _wasSuccess = false; + _isDone = false; + + _thread = new Thread(() => + { + CancellationTokenSource cancellationSource = new CancellationTokenSource(); + cancellationToken.Register(cancellationSource.Cancel); + + var stopwatch = new Stopwatch(); + stopwatch.Start(); + + var downloader = new BaseHttpDownloader(PruneUrl(_url, _size, _seed), Timeout); + + long totalBytes = 0; + + foreach (var packet in downloader.ReadPackets(cancellationSource.Token)) + { + totalBytes += packet.Length; + lock (_calculator) + { + _calculator.AddSample(totalBytes, DateTime.Now); + } + + if (stopwatch.Elapsed > MaxTestDuration) + { + break; + } + } + + _isDone = true; + _wasSuccess = true; + }); + + _thread.Start(); + } + } +} \ No newline at end of file diff --git a/Assets/PatchKit Patcher/Scripts/AppData/Remote/GarbageNodeTester.cs.meta b/Assets/PatchKit Patcher/Scripts/AppData/Remote/GarbageNodeTester.cs.meta new file mode 100644 index 00000000..9b151dba --- /dev/null +++ b/Assets/PatchKit Patcher/Scripts/AppData/Remote/GarbageNodeTester.cs.meta @@ -0,0 +1,12 @@ +fileFormatVersion: 2 +guid: 62aab5cc010522f4689e974716f4e52a +timeCreated: 1560936268 +licenseType: Free +MonoImporter: + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/PatchKit Patcher/Scripts/AppData/Remote/NodeTester.cs b/Assets/PatchKit Patcher/Scripts/AppData/Remote/NodeTester.cs new file mode 100644 index 00000000..66eee816 --- /dev/null +++ b/Assets/PatchKit Patcher/Scripts/AppData/Remote/NodeTester.cs @@ -0,0 +1,76 @@ +using System.Diagnostics; +using PatchKit.Unity.Patcher.AppUpdater.Status; +using PatchKit.Api.Models.Main; +using PatchKit.Unity.Patcher.Cancellation; +using System; + +namespace PatchKit.Unity.Patcher.AppData.Remote +{ + public class NodeTester + { + private static readonly TimeSpan MinTimeRemaining = TimeSpan.FromMinutes(2.0); + private static readonly TimeSpan MinDownloadTime = TimeSpan.FromSeconds(15.0); + + private readonly ResourceUrl _resourceUrl; + private GarbageNodeTester _garbageNodeTester; + private Stopwatch _testingStopwatch; + private DownloadSpeedCalculator _downloadSpeedCalculator; + + public NodeTester(ResourceUrl url) + { + _resourceUrl = url; + _testingStopwatch = new Stopwatch(); + _testingStopwatch.Start(); + } + + public Exception TryStart(CancellationToken cancellationToken) + { + try + { + Start(cancellationToken); + return null; + } + catch (ArgumentException e) + { + return e; + } + } + + public void Start(CancellationToken cancellationToken) + { + _garbageNodeTester = new GarbageNodeTester(_resourceUrl.Url); + _garbageNodeTester.Start(cancellationToken); + + _testingStopwatch.Stop(); + } + + public bool CanStart(long dataSize, DownloadSpeedCalculator calculator) + { + return calculator.TimeRemaining(dataSize) > MinTimeRemaining + && _garbageNodeTester == null + && _testingStopwatch.IsRunning + && _testingStopwatch.Elapsed > MinDownloadTime; + } + + public bool IsDone + { + get + { + return _garbageNodeTester != null && _garbageNodeTester.IsDone; + } + } + + public double? BytesPerSecond + { + get + { + if (_garbageNodeTester == null) + { + return null; + } + return _garbageNodeTester.BytesPerSecond; + } + } + + } +} \ No newline at end of file diff --git a/Assets/PatchKit Patcher/Scripts/AppData/Remote/NodeTester.cs.meta b/Assets/PatchKit Patcher/Scripts/AppData/Remote/NodeTester.cs.meta new file mode 100644 index 00000000..ded1c978 --- /dev/null +++ b/Assets/PatchKit Patcher/Scripts/AppData/Remote/NodeTester.cs.meta @@ -0,0 +1,12 @@ +fileFormatVersion: 2 +guid: b40bcb9a44b0f40af8d92aa7eb8bb35d +timeCreated: 1560936423 +licenseType: Free +MonoImporter: + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/PatchKit Patcher/Scripts/AppUpdater/Commands/CheckDiskSpaceCommand.cs b/Assets/PatchKit Patcher/Scripts/AppUpdater/Commands/CheckDiskSpaceCommand.cs index efe0c955..1bf50e43 100644 --- a/Assets/PatchKit Patcher/Scripts/AppUpdater/Commands/CheckDiskSpaceCommand.cs +++ b/Assets/PatchKit Patcher/Scripts/AppUpdater/Commands/CheckDiskSpaceCommand.cs @@ -53,6 +53,7 @@ private static extern bool GetDiskFreeSpaceEx(string directoryName, #elif UNITY_STANDALONE_LINUX + // NOTE: libgetdiskspace doesn't work on Linux (Manjaro x86_64) in Editor (2017 LTS), has to be changed to getdiskspace [DllImport("libgetdiskspace", SetLastError = true, CharSet = CharSet.Auto)] [return: MarshalAs(UnmanagedType.Bool)] private static extern bool getAvailableDiskSpace(string t_path, out long freeBytes); @@ -180,7 +181,7 @@ private long GetRequiredDiskSpaceForContent() // estimate the size uncompressedSize = (long) (_contentSummary.Value.Size * 1.4); } - + long requiredDiskSpace = _contentSummary.Value.Size + uncompressedSize + Reserve; return requiredDiskSpace; } @@ -193,7 +194,7 @@ private long GetRequiredDiskSpaceForDiff() // estimate the size uncompressedSize = (long) (_diffSummary.Value.Size * 1.4); } - + long requiredDiskSpace = _diffSummary.Value.Size + uncompressedSize + _bigestFileSize + Reserve; return requiredDiskSpace; } diff --git a/Assets/PatchKit Patcher/Scripts/AppUpdater/Status/DownloadSpeedCalculator.cs b/Assets/PatchKit Patcher/Scripts/AppUpdater/Status/DownloadSpeedCalculator.cs index 5492517f..b831d0ed 100644 --- a/Assets/PatchKit Patcher/Scripts/AppUpdater/Status/DownloadSpeedCalculator.cs +++ b/Assets/PatchKit Patcher/Scripts/AppUpdater/Status/DownloadSpeedCalculator.cs @@ -37,44 +37,56 @@ public void Restart(DateTime time) _samples.Clear(); } - public void AddSample(long? bytes, DateTime time) + public void AddSample(long? totalBytes, DateTime time) { - if (bytes.HasValue && _lastBytes > bytes) + if (totalBytes.HasValue && _lastBytes > totalBytes) { Restart(time); } var duration = time - _lastTime; - if (duration < MinimumDelayBetweenSamples) + if (_samples.Count > 0 && duration < MinimumDelayBetweenSamples) { return; } CleanOldSamples(time); - if (bytes.HasValue) + if (totalBytes.HasValue) { _samples.Add(new Sample { - Bytes = bytes.Value - _lastBytes, + Bytes = totalBytes.Value - _lastBytes, Duration = duration, AddTime = time }); - _lastBytes = bytes.Value; + _lastBytes = totalBytes.Value; } _lastTime = time; } - public double Calculate(long? bytes, DateTime time) + public double Calculate(long? bytes, DateTime? time = null) { - AddSample(bytes, DateTime.Now); + AddSample(bytes, time.HasValue ? time.Value : DateTime.Now); return BytesPerSecond; } + public TimeSpan TimeRemaining(long dataSize) + { + try + { + return TimeSpan.FromSeconds(dataSize / BytesPerSecond); + } + catch (OverflowException) + { + return new TimeSpan(Int64.MaxValue); + } + } + public double BytesPerSecond { get diff --git a/Assets/PatchKit Patcher/Scripts/UI/ProgressBar.cs b/Assets/PatchKit Patcher/Scripts/UI/ProgressBar.cs index b00d026c..aece2317 100644 --- a/Assets/PatchKit Patcher/Scripts/UI/ProgressBar.cs +++ b/Assets/PatchKit Patcher/Scripts/UI/ProgressBar.cs @@ -125,7 +125,7 @@ private void Start() { var progress = Patcher.Instance.UpdaterStatus.SelectSwitchOrDefault(p => p.Progress, -1.0); var isUpdatingIdle = Patcher.Instance.UpdaterStatus - .SelectSwitchOrDefault(p => (IObservable) p.LatestActiveOperation, (IReadOnlyOperationStatus) null) + .SelectSwitchOrDefault(p => (UniRx.IObservable) p.LatestActiveOperation, (IReadOnlyOperationStatus) null) .SelectSwitchOrDefault(p => p.IsIdle, false); Patcher.Instance.State diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ff194fe..8225285c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [3.14.0.0] ### Added +- Dynamic url testing for faster nodes (#1226) - Automated scripting runtime changing to .NET 3.5 on Unity 2017 or newer (#1241) - Automated Unity API compatiblity change from .NET 2.0 subset to .NET 2.0 (#1242) - Displaying proper message when in offline mode (#1217)