diff --git a/src/core/fetchers/README.md b/src/core/fetchers/README.md index e24abcf6642..1227b41aac3 100644 --- a/src/core/fetchers/README.md +++ b/src/core/fetchers/README.md @@ -18,8 +18,8 @@ This directory actually exports two completely isolated type of fetchers: - The **Manifest fetcher** is used to download and parse the manifest file. -- The **SegmentFetcherCreator** is used to create Segment fetchers, allowing to download - and parse media segments. +- The **SegmentQueueCreator** is used to create `SegmentQueue` objects, allowing to + download and parse media segments. ## The Manifest fetcher @@ -32,22 +32,23 @@ parsing of the Manifest file. It also regularly refreshes the Manifest, based on its attributes and other criteria, like performances when doing that. -## The SegmentFetcherCreator +## The SegmentQueueCreator -The SegmentFetcherCreator allows to easily perform segment downloads for the rest of the +The SegmentQueueCreator allows to easily perform segment downloads for the rest of the code. This is the part of the code that interacts with the transport protocols - defined in `stc/transports` - to load and parse media segments. -To do so, the SegmentFetcherCreator creates "segment fetchers" of different types -(example: a video or audio segment fetcher) when you ask for it. Through those fetchers, -you can then schedule various segment requests with a given priority. +To do so, the SegmentQueueCreator creates `SegmentQueue` classes of different types +(example: a video or audio `SegmentQueue`) when you ask for it. Through those +`SegmentQueue`, you can then schedule various a FIFO queue of segment requests each with a +given priority. The priority of this request is then corroborated with the priority of all requests -currently pending in the SegmentFetcherCreator (and not only with those on the current -segment fetcher) to know when the request should effectively be done - more prioritary +currently pending in the SegmentQueueCreator (and not only with those on the current +`SegmentQueue`) to know when the request should effectively be done - more prioritary requests will be done first. -During the lifecycle of the request, the segment fetcher will communicate about data and +During the lifecycle of the request, the `SegmentQueue` will communicate about data and metrics through several means - documented in the code. ### Priorization @@ -64,7 +65,7 @@ If the request has no priorization number, the lowest priorization number (the h priority) will be set on it: `0` Basically, any new request will have their priorization number compared to the one of the -current request(s) done by the SegmentFetcherCreator: +current request(s) done by the SegmentQueueCreator: - if no request is already pending, we perform the request immediately diff --git a/src/core/fetchers/index.ts b/src/core/fetchers/index.ts index 8f34f4b32c9..85c6951f9af 100644 --- a/src/core/fetchers/index.ts +++ b/src/core/fetchers/index.ts @@ -20,17 +20,14 @@ import type { IManifestRefreshSettings, } from "./manifest"; import ManifestFetcher from "./manifest"; -import type { - IPrioritizedSegmentFetcher, - ISegmentFetcherCreatorBackoffOptions, -} from "./segment"; -import SegmentFetcherCreator from "./segment"; +import type { SegmentQueue, ISegmentQueueCreatorBackoffOptions } from "./segment"; +import SegmentQueueCreator from "./segment"; export type { IManifestFetcherSettings, IManifestFetcherEvent, IManifestRefreshSettings, - IPrioritizedSegmentFetcher, - ISegmentFetcherCreatorBackoffOptions, + ISegmentQueueCreatorBackoffOptions, + SegmentQueue, }; -export { ManifestFetcher, SegmentFetcherCreator }; +export { ManifestFetcher, SegmentQueueCreator }; diff --git a/src/core/fetchers/segment/index.ts b/src/core/fetchers/segment/index.ts index 195dd9171f3..0423d25ca76 100644 --- a/src/core/fetchers/segment/index.ts +++ b/src/core/fetchers/segment/index.ts @@ -14,9 +14,10 @@ * limitations under the License. */ -import type { IPrioritizedSegmentFetcher } from "./prioritized_segment_fetcher"; -import type { ISegmentFetcherCreatorBackoffOptions } from "./segment_fetcher_creator"; -import SegmentFetcherCreator from "./segment_fetcher_creator"; +import type SegmentQueue from "./segment_queue"; +import type { ISegmentQueueContext } from "./segment_queue"; +import type { ISegmentQueueCreatorBackoffOptions } from "./segment_queue_creator"; +import SegmentQueueCreator from "./segment_queue_creator"; -export default SegmentFetcherCreator; -export type { IPrioritizedSegmentFetcher, ISegmentFetcherCreatorBackoffOptions }; +export default SegmentQueueCreator; +export type { SegmentQueue, ISegmentQueueCreatorBackoffOptions, ISegmentQueueContext }; diff --git a/src/core/stream/representation/utils/downloading_queue.ts b/src/core/fetchers/segment/segment_queue.ts similarity index 68% rename from src/core/stream/representation/utils/downloading_queue.ts rename to src/core/fetchers/segment/segment_queue.ts index 94b9c048709..a0cdba5ff88 100644 --- a/src/core/stream/representation/utils/downloading_queue.ts +++ b/src/core/fetchers/segment/segment_queue.ts @@ -14,117 +14,58 @@ * limitations under the License. */ -import log from "../../../../log"; +import log from "../../../log"; import type { IManifest, IAdaptation, ISegment, IPeriod, IRepresentation, -} from "../../../../manifest"; -import type { IPlayerError } from "../../../../public_types"; +} from "../../../manifest"; +import type { IPlayerError } from "../../../public_types"; import type { ISegmentParserParsedInitChunk, ISegmentParserParsedMediaChunk, -} from "../../../../transports"; -import assert from "../../../../utils/assert"; -import EventEmitter from "../../../../utils/event_emitter"; -import noop from "../../../../utils/noop"; -import objectAssign from "../../../../utils/object_assign"; -import type { IReadOnlySharedReference } from "../../../../utils/reference"; -import SharedReference from "../../../../utils/reference"; -import TaskCanceller from "../../../../utils/task_canceller"; -import type { IPrioritizedSegmentFetcher } from "../../../fetchers"; -import type { IQueuedSegment } from "../types"; +} from "../../../transports"; +import assert from "../../../utils/assert"; +import EventEmitter from "../../../utils/event_emitter"; +import noop from "../../../utils/noop"; +import objectAssign from "../../../utils/object_assign"; +import SharedReference from "../../../utils/reference"; +import TaskCanceller from "../../../utils/task_canceller"; +import type { IPrioritizedSegmentFetcher } from "./prioritized_segment_fetcher"; + +/** Information about a Segment waiting to be loaded by the Stream. */ +export interface IQueuedSegment { + /** Priority of the segment request (lower number = higher priority). */ + priority: number; + /** Segment wanted. */ + segment: ISegment; +} /** - * Class scheduling segment downloads for a single Representation. - * - * TODO The request scheduling abstractions might be simplified by integrating - * the `DownloadingQueue` in the segment fetchers code, instead of having it as - * an utilis of the `RepresentationStream` like here. - * @class DownloadingQueue + * Class scheduling segment downloads as a FIFO queue. */ -export default class DownloadingQueue extends EventEmitter> { - /** Context of the Representation that will be loaded through this DownloadingQueue. */ - private _content: IDownloadingQueueContext; - /** - * Current queue of segments scheduled for download. - * - * Segments whose request are still pending are still in that queue. Segments - * are only removed from it once their request has succeeded. - */ - private _downloadQueue: IReadOnlySharedReference; - /** - * Allows to stop listening to queue updates and stop performing requests. - * Set to `null` if the DownloadingQueue is not started right now. - */ - private _currentCanceller: TaskCanceller | null; - /** - * Pending request for the initialization segment. - * `null` if no request is pending for it. - */ - private _initSegmentRequest: ISegmentRequestObject | null; - /** - * Pending request for a media (i.e. non-initialization) segment. - * `null` if no request is pending for it. - */ - private _mediaSegmentRequest: ISegmentRequestObject | null; +export default class SegmentQueue extends EventEmitter> { /** Interface used to load segments. */ private _segmentFetcher: IPrioritizedSegmentFetcher; + /** - * Emit the timescale anounced in the initialization segment once parsed. - * `undefined` when this is not yet known. - * `null` when no initialization segment or timescale exists. - */ - private _initSegmentInfoRef: SharedReference; - /** - * Some media segment might have been loaded and are only awaiting for the - * initialization segment to be parsed before being parsed themselves. - * This string will contain the `id` property of that segment if one exist or - * `null` if no segment is awaiting an init segment. + * Metadata on the content for which segments are currently loaded. + * `null` if no queue is active. */ - private _mediaSegmentAwaitingInitMetadata: string | null; + private _currentContentInfo: ISegmentQueueContentInfo | null; /** - * Create a new `DownloadingQueue`. + * Create a new `SegmentQueue`. * - * @param {Object} content - The context of the Representation you want to - * load segments for. - * @param {Object} downloadQueue - Queue of segments you want to load. * @param {Object} segmentFetcher - Interface to facilitate the download of * segments. - * @param {boolean} hasInitSegment - Declare that an initialization segment - * will need to be downloaded. - * - * A `DownloadingQueue` ALWAYS wait for the initialization segment to be - * loaded and parsed before parsing a media segment. - * - * In cases where no initialization segment exist, this would lead to the - * `DownloadingQueue` waiting indefinitely for it. - * - * By setting that value to `false`, you anounce to the `DownloadingQueue` - * that it should not wait for an initialization segment before parsing a - * media segment. */ - constructor( - content: IDownloadingQueueContext, - downloadQueue: IReadOnlySharedReference, - segmentFetcher: IPrioritizedSegmentFetcher, - hasInitSegment: boolean, - ) { + constructor(segmentFetcher: IPrioritizedSegmentFetcher) { super(); - this._content = content; - this._currentCanceller = null; - this._downloadQueue = downloadQueue; - this._initSegmentRequest = null; - this._mediaSegmentRequest = null; this._segmentFetcher = segmentFetcher; - this._initSegmentInfoRef = new SharedReference(undefined); - this._mediaSegmentAwaitingInitMetadata = null; - if (!hasInitSegment) { - this._initSegmentInfoRef.setValue(null); - } + this._currentContentInfo = null; } /** @@ -133,7 +74,7 @@ export default class DownloadingQueue extends EventEmitter extends EventEmitter { + this._currentContentInfo?.currentCanceller.cancel(); + const downloadQueue = new SharedReference({ + initSegment: null, + segmentQueue: [], + }); + const currentCanceller = new TaskCanceller(); + currentCanceller.signal.register(() => { + downloadQueue.finish(); + }); + const currentContentInfo: ISegmentQueueContentInfo = { + content, + downloadQueue, + initSegmentInfoRef: hasInitSegment + ? new SharedReference(undefined) + : new SharedReference(null), + currentCanceller, + initSegmentRequest: null, + mediaSegmentRequest: null, + mediaSegmentAwaitingInitMetadata: null, + }; + this._currentContentInfo = currentContentInfo; // Listen for asked media segments - this._downloadQueue.onUpdate( + downloadQueue.onUpdate( (queue) => { const { segmentQueue } = queue; if ( segmentQueue.length > 0 && - segmentQueue[0].segment.id === this._mediaSegmentAwaitingInitMetadata + segmentQueue[0].segment.id === + currentContentInfo.mediaSegmentAwaitingInitMetadata ) { // The most needed segment is still the same one, and there's no need to // update its priority as the request already ended, just quit. return; } - const currentSegmentRequest = this._mediaSegmentRequest; + const currentSegmentRequest = currentContentInfo.mediaSegmentRequest; if (segmentQueue.length === 0) { if (currentSegmentRequest === null) { // There's nothing to load but there's already no request pending. @@ -177,18 +162,18 @@ export default class DownloadingQueue extends EventEmitter extends EventEmitter extends EventEmitter { - const initSegmentRequest = this._initSegmentRequest; + const initSegmentRequest = currentContentInfo.initSegmentRequest; if (next.initSegment !== null && initSegmentRequest !== null) { if (next.initSegment.priority !== initSegmentRequest.priority) { this._segmentFetcher.updatePriority( @@ -238,49 +223,60 @@ export default class DownloadingQueue extends EventEmitter { - if (this._currentCanceller !== null && this._currentCanceller.isUsed()) { - this._mediaSegmentRequest = null; + if (currentCanceller !== null && currentCanceller.isUsed()) { + contentInfo.mediaSegmentRequest = null; return; } if (startingSegment === undefined) { - this._mediaSegmentRequest = null; + contentInfo.mediaSegmentRequest = null; this.trigger("emptyQueue", null); return; } const canceller = new TaskCanceller(); const unlinkCanceller = - this._currentCanceller === null + currentCanceller === null ? noop - : canceller.linkToSignal(this._currentCanceller.signal); + : canceller.linkToSignal(currentCanceller.signal); const { segment, priority } = startingSegment; - const context = objectAssign({ segment }, this._content); + const context = objectAssign({ segment }, content); /** * If `true` , the current task has either errored, finished, or was @@ -295,12 +291,12 @@ export default class DownloadingQueue extends EventEmitter { - this._mediaSegmentRequest = null; + contentInfo.mediaSegmentRequest = null; if (isComplete) { return; } - if (this._mediaSegmentAwaitingInitMetadata === segment.id) { - this._mediaSegmentAwaitingInitMetadata = null; + if (contentInfo.mediaSegmentAwaitingInitMetadata === segment.id) { + contentInfo.mediaSegmentAwaitingInitMetadata = null; } isComplete = true; isWaitingOnInitSegment = false; @@ -313,7 +309,7 @@ export default class DownloadingQueue extends EventEmitter { - const lastQueue = this._downloadQueue.getValue().segmentQueue; + const lastQueue = downloadQueue.getValue().segmentQueue; if (lastQueue.length === 0) { isComplete = true; this.trigger("emptyQueue", null); @@ -359,7 +355,7 @@ export default class DownloadingQueue extends EventEmitter ISegmentParserParsedInitChunk | ISegmentParserParsedMediaChunk, ): void => { - const initTimescale = this._initSegmentInfoRef.getValue(); + const initTimescale = initSegmentInfoRef.getValue(); if (initTimescale !== undefined) { emitChunk(parse(initTimescale ?? undefined)); } else { @@ -369,7 +365,7 @@ export default class DownloadingQueue extends EventEmitter { emitChunk(parse(actualTimescale ?? undefined)); }, @@ -383,10 +379,10 @@ export default class DownloadingQueue extends EventEmitter { - this._mediaSegmentAwaitingInitMetadata = null; + contentInfo.mediaSegmentAwaitingInitMetadata = null; isWaitingOnInitSegment = false; this.trigger("fullyLoadedSegment", segment); }, @@ -401,10 +397,10 @@ export default class DownloadingQueue extends EventEmitter { unlinkCanceller(); - this._mediaSegmentRequest = null; + contentInfo.mediaSegmentRequest = null; if (isWaitingOnInitSegment) { - this._initSegmentInfoRef.waitUntilDefined(continueToNextSegment, { + initSegmentInfoRef.waitUntilDefined(continueToNextSegment, { clearSignal: canceller.signal, }); } else { @@ -424,23 +420,24 @@ export default class DownloadingQueue extends EventEmitter extends EventEmitter extends EventEmitter { unlinkCanceller(); - this._initSegmentRequest = null; + contentInfo.initSegmentRequest = null; isComplete = true; }, onChunk: ( @@ -484,7 +481,7 @@ export default class DownloadingQueue extends EventEmitter { @@ -504,31 +501,31 @@ export default class DownloadingQueue extends EventEmitter { - this._initSegmentRequest = null; + contentInfo.initSegmentRequest = null; if (isComplete) { return; } isComplete = true; }); - this._initSegmentRequest = { segment, priority, request, canceller }; + contentInfo.initSegmentRequest = { segment, priority, request, canceller }; } } /** - * Events sent by the `DownloadingQueue`. + * Events sent by the `SegmentQueue`. * * The key is the event's name and the value the format of the corresponding * event's payload. */ -export interface IDownloadingQueueEvent { +export interface ISegmentQueueEvent { /** * Notify that the initialization segment has been fully loaded and parsed. * * You can now push that segment to its corresponding buffer and use its parsed * metadata. * - * Only sent if an initialization segment exists (when the `DownloadingQueue`'s + * Only sent if an initialization segment exists (when the `SegmentQueue`'s * `hasInitSegment` constructor option has been set to `true`). * In that case, an `IParsedInitSegmentEvent` will always be sent before any * `IParsedSegmentEvent` event is sent. @@ -538,7 +535,7 @@ export interface IDownloadingQueueEvent { * Notify that a media chunk (decodable sub-part of a media segment) has been * loaded and parsed. * - * If an initialization segment exists (when the `DownloadingQueue`'s + * If an initialization segment exists (when the `SegmentQueue`'s * `hasInitSegment` constructor option has been set to `true`), an * `IParsedSegmentEvent` will always be sent AFTER the `IParsedInitSegmentEvent` * event. @@ -586,10 +583,10 @@ export interface IRequestRetryPayload { } /** - * Structure of the object that has to be emitted through the `downloadQueue` + * Structure of the object that has to be emitted through the `SegmentQueue` * shared reference, to signal which segments are currently needed. */ -export interface IDownloadQueueItem { +export interface ISegmentQueueItem { /** * A potential initialization segment that needs to be loaded and parsed. * It will generally be requested in parralel of the first media segments. @@ -597,7 +594,7 @@ export interface IDownloadQueueItem { * Can be set to `null` if you don't need to load the initialization segment * for now. * - * If the `DownloadingQueue`'s `hasInitSegment` constructor option has been + * If the `SegmentQueue`'s `hasInitSegment` constructor option has been * set to `true`, no media segment will be parsed before the initialization * segment has been loaded and parsed. */ @@ -612,8 +609,8 @@ export interface IDownloadQueueItem { * Note that any media segments in the segment queue will only be parsed once * either of these is true: * - An initialization segment has been loaded and parsed by this - * `DownloadingQueue` instance. - * - The `DownloadingQueue`'s `hasInitSegment` constructor option has been + * `SegmentQueue` instance. + * - The `SegmentQueue`'s `hasInitSegment` constructor option has been * set to `false`. */ segmentQueue: IQueuedSegment[]; @@ -631,8 +628,8 @@ interface ISegmentRequestObject { canceller: TaskCanceller; } -/** Context for segments downloaded through the DownloadingQueue. */ -export interface IDownloadingQueueContext { +/** Context for segments downloaded through the SegmentQueue. */ +export interface ISegmentQueueContext { /** Adaptation linked to the segments you want to load. */ adaptation: IAdaptation; /** Manifest linked to the segments you want to load. */ @@ -642,3 +639,44 @@ export interface IDownloadingQueueContext { /** Representation linked to the segments you want to load. */ representation: IRepresentation; } + +interface ISegmentQueueContentInfo { + /** Context of the Representation that will be loaded through this SegmentQueue. */ + content: ISegmentQueueContext; + /** + * Current queue of segments scheduled for download. + * + * Segments whose request are still pending are still in that queue. Segments + * are only removed from it once their request has succeeded. + */ + downloadQueue: SharedReference; + /** + * Allows to stop listening to queue updates and stop performing requests. + * Set to `null` if the SegmentQueue is not started right now. + */ + currentCanceller: TaskCanceller; + /** + * Pending request for the initialization segment. + * `null` if no request is pending for it. + */ + initSegmentRequest: ISegmentRequestObject | null; + /** + * Pending request for a media (i.e. non-initialization) segment. + * `null` if no request is pending for it. + */ + mediaSegmentRequest: ISegmentRequestObject | null; + /** + * Emit the timescale anounced in the initialization segment once parsed. + * Emit `undefined` when this is not yet known. + * Emit `null` when no initialization segment or timescale exists. + */ + initSegmentInfoRef: SharedReference; + + /** + * Some media segment might have been loaded and are only awaiting for the + * initialization segment to be parsed before being parsed themselves. + * This string will contain the `id` property of that segment if one exist or + * `null` if no segment is awaiting an init segment. + */ + mediaSegmentAwaitingInitMetadata: string | null; +} diff --git a/src/core/fetchers/segment/segment_fetcher_creator.ts b/src/core/fetchers/segment/segment_queue_creator.ts similarity index 71% rename from src/core/fetchers/segment/segment_fetcher_creator.ts rename to src/core/fetchers/segment/segment_queue_creator.ts index 21f166ef61e..7602adc7bc6 100644 --- a/src/core/fetchers/segment/segment_fetcher_creator.ts +++ b/src/core/fetchers/segment/segment_queue_creator.ts @@ -19,26 +19,26 @@ import type { ISegmentPipeline, ITransportPipelines } from "../../../transports" import type { CancellationSignal } from "../../../utils/task_canceller"; import type { IBufferType } from "../../segment_sinks"; import CdnPrioritizer from "../cdn_prioritizer"; -import type { IPrioritizedSegmentFetcher } from "./prioritized_segment_fetcher"; import applyPrioritizerToSegmentFetcher from "./prioritized_segment_fetcher"; import type { ISegmentFetcherLifecycleCallbacks } from "./segment_fetcher"; import createSegmentFetcher, { getSegmentFetcherOptions } from "./segment_fetcher"; +import SegmentQueue from "./segment_queue"; import TaskPrioritizer from "./task_prioritizer"; /** * Interact with the transport pipelines to download segments with the right * priority. * - * @class SegmentFetcherCreator + * @class SegmentQueueCreator */ -export default class SegmentFetcherCreator { +export default class SegmentQueueCreator { /** * Transport pipelines of the currently choosen streaming protocol (e.g. DASH, * Smooth etc.). */ private readonly _transport: ITransportPipelines; /** - * `TaskPrioritizer` linked to this SegmentFetcherCreator. + * `TaskPrioritizer` linked to this SegmentQueueCreator. * * Note: this is typed as `any` because segment loaders / parsers can use * different types depending on the type of buffer. We could maybe be smarter @@ -47,19 +47,22 @@ export default class SegmentFetcherCreator { */ private readonly _prioritizer: TaskPrioritizer; /** - * Options used by the SegmentFetcherCreator, e.g. to allow configuration on + * Options used by the SegmentQueueCreator, e.g. to allow configuration on * segment retries (number of retries maximum, default delay and so on). */ - private readonly _backoffOptions: ISegmentFetcherCreatorBackoffOptions; + private readonly _backoffOptions: ISegmentQueueCreatorBackoffOptions; + /** Class allowing to select a CDN when multiple are available for a given resource. */ private readonly _cdnPrioritizer: CdnPrioritizer; /** * @param {Object} transport + * @param {Object} options + * @param {Object} cancelSignal */ constructor( transport: ITransportPipelines, - options: ISegmentFetcherCreatorBackoffOptions, + options: ISegmentQueueCreatorBackoffOptions, cancelSignal: CancellationSignal, ) { const cdnPrioritizer = new CdnPrioritizer(cancelSignal); @@ -77,16 +80,18 @@ export default class SegmentFetcherCreator { } /** - * Create a segment fetcher, allowing to easily perform segment requests. + * Create a `SegmentQueue`, allowing to easily perform segment requests. * @param {string} bufferType - The type of buffer concerned (e.g. "audio", * "video", etc.) * @param {Object} callbacks - * @returns {Object} + * @returns {Object} - `SegmentQueue`, which is an abstraction allowing to + * perform a queue of segment requests for a given media type (here defined by + * `bufferType`) with associated priorities. */ - createSegmentFetcher( + public createSegmentQueue( bufferType: IBufferType, callbacks: ISegmentFetcherLifecycleCallbacks, - ): IPrioritizedSegmentFetcher { + ): SegmentQueue { const backoffOptions = getSegmentFetcherOptions(this._backoffOptions); const pipelines = this._transport[bufferType]; @@ -98,12 +103,16 @@ export default class SegmentFetcherCreator { callbacks, backoffOptions, ); - return applyPrioritizerToSegmentFetcher(this._prioritizer, segmentFetcher); + const prioritizedSegmentFetcher = applyPrioritizerToSegmentFetcher( + this._prioritizer, + segmentFetcher, + ); + return new SegmentQueue(prioritizedSegmentFetcher); } } -/** Options used by the `SegmentFetcherCreator`. */ -export interface ISegmentFetcherCreatorBackoffOptions { +/** Options used by the `SegmentQueueCreator`. */ +export interface ISegmentQueueCreatorBackoffOptions { /** * Whether the content is played in a low-latency mode. * This has an impact on default backoff delays. @@ -118,5 +127,13 @@ export interface ISegmentFetcherCreatorBackoffOptions { * `undefined` will lead to a default, large, timeout being used. */ requestTimeout: number | undefined; + /** + * Timeout for just the "connection" part of the request, before data is + * actually being transferred. + * + * Setting a lower `connectionTimeout` than a `requestTimeout` allows to + * fail faster without having to take into account a potentially low + * bandwidth. + */ connectionTimeout: number | undefined; } diff --git a/src/core/fetchers/segment/task_prioritizer.ts b/src/core/fetchers/segment/task_prioritizer.ts index b673d231524..2850c1dfffd 100644 --- a/src/core/fetchers/segment/task_prioritizer.ts +++ b/src/core/fetchers/segment/task_prioritizer.ts @@ -4,6 +4,15 @@ import createCancellablePromise from "../../../utils/create_cancellable_promise" import type { CancellationSignal } from "../../../utils/task_canceller"; import TaskCanceller, { CancellationError } from "../../../utils/task_canceller"; +/** + * Utilitary class which allows to perform multiple tasks at once each with an + * associated priority. + * + * This class will then schedule the given tasks in the right order based on the + * priorities. + * + * @class TaskPrioritizer + */ export default class TaskPrioritizer { /** * Priority of the most prioritary task currently running. diff --git a/src/core/main/worker/content_preparer.ts b/src/core/main/worker/content_preparer.ts index 5c52970073c..33f7765fc8d 100644 --- a/src/core/main/worker/content_preparer.ts +++ b/src/core/main/worker/content_preparer.ts @@ -22,7 +22,7 @@ import TaskCanceller from "../../../utils/task_canceller"; import type { IRepresentationEstimator } from "../../adaptive"; import createAdaptiveRepresentationSelector from "../../adaptive"; import type { IManifestRefreshSettings } from "../../fetchers"; -import { ManifestFetcher, SegmentFetcherCreator } from "../../fetchers"; +import { ManifestFetcher, SegmentQueueCreator } from "../../fetchers"; import SegmentSinksStore from "../../segment_sinks"; import type { INeedsMediaSourceReloadPayload } from "../../stream"; import DecipherabilityFreezeDetector from "../common/DecipherabilityFreezeDetector"; @@ -122,7 +122,7 @@ export default class ContentPreparer { }, ); - const segmentFetcherCreator = new SegmentFetcherCreator( + const segmentQueueCreator = new SegmentQueueCreator( dashPipelines, context.segmentRetryOptions, contentCanceller.signal, @@ -151,7 +151,7 @@ export default class ContentPreparer { manifestFetcher, representationEstimator, segmentSinksStore, - segmentFetcherCreator, + segmentQueueCreator, workerTextSender, trackChoiceSetter, }; @@ -344,10 +344,10 @@ export interface IPreparedContentData { /** Allows to send timed text media data so it can be rendered. */ workerTextSender: WorkerTextDisplayerInterface | null; /** - * Allows to create `SegmentFetcher` which simplifies complex media segment + * Allows to create `SegmentQueue` which simplifies complex media segment * fetching. */ - segmentFetcherCreator: SegmentFetcherCreator; + segmentQueueCreator: SegmentQueueCreator; /** * Allows to store and update the wanted tracks and Representation inside that * track. diff --git a/src/core/main/worker/worker_main.ts b/src/core/main/worker/worker_main.ts index b902a1a749d..92ac5611bf5 100644 --- a/src/core/main/worker/worker_main.ts +++ b/src/core/main/worker/worker_main.ts @@ -512,7 +512,7 @@ function loadOrReloadPreparedContent( mediaSource, representationEstimator, segmentSinksStore, - segmentFetcherCreator, + segmentQueueCreator, } = preparedContent; const { drmSystemId, enableFastSwitching, initialTime, onCodecSwitch } = val; playbackObservationRef.onUpdate((observation) => { @@ -583,7 +583,7 @@ function loadOrReloadPreparedContent( playbackObserver, representationEstimator, segmentSinksStore, - segmentFetcherCreator, + segmentQueueCreator, { wantedBufferAhead, maxVideoBufferSize, diff --git a/src/core/stream/adaptation/adaptation_stream.ts b/src/core/stream/adaptation/adaptation_stream.ts index 6ba8b16daf0..2bcee06812d 100644 --- a/src/core/stream/adaptation/adaptation_stream.ts +++ b/src/core/stream/adaptation/adaptation_stream.ts @@ -58,7 +58,7 @@ export default function AdaptationStream( options, representationEstimator, segmentSink, - segmentFetcherCreator, + segmentQueueCreator, wantedBufferAhead, maxVideoBufferSize, }: IAdaptationStreamArguments, @@ -117,7 +117,7 @@ export default function AdaptationStream( ); /** Allows a `RepresentationStream` to easily fetch media segments. */ - const segmentFetcher = segmentFetcherCreator.createSegmentFetcher( + const segmentQueue = segmentQueueCreator.createSegmentQueue( adaptation.type, /* eslint-disable @typescript-eslint/unbound-method */ { @@ -435,7 +435,7 @@ export default function AdaptationStream( playbackObserver, content: { representation, adaptation, period, manifest }, segmentSink, - segmentFetcher, + segmentQueue, terminate: terminateCurrentStream, options: { bufferGoal, diff --git a/src/core/stream/adaptation/types.ts b/src/core/stream/adaptation/types.ts index f1708127d7e..862466a416e 100644 --- a/src/core/stream/adaptation/types.ts +++ b/src/core/stream/adaptation/types.ts @@ -6,7 +6,7 @@ import type { } from "../../../public_types"; import type { IReadOnlySharedReference } from "../../../utils/reference"; import type { IRepresentationEstimator } from "../../adaptive"; -import type { SegmentFetcherCreator } from "../../fetchers"; +import type { SegmentQueueCreator } from "../../fetchers"; import type { IBufferType, SegmentSink } from "../../segment_sinks"; import type { IRepresentationsChoice, @@ -141,7 +141,7 @@ export interface IAdaptationStreamArguments { /** SourceBuffer wrapper - needed to push media segments. */ segmentSink: SegmentSink; /** Module used to fetch the wanted media segments. */ - segmentFetcherCreator: SegmentFetcherCreator; + segmentQueueCreator: SegmentQueueCreator; /** * "Buffer goal" wanted, or the ideal amount of time ahead of the current * position in the current SegmentSink. When this amount has been reached diff --git a/src/core/stream/orchestrator/stream_orchestrator.ts b/src/core/stream/orchestrator/stream_orchestrator.ts index c3c30cfcb8f..23b76bd747b 100644 --- a/src/core/stream/orchestrator/stream_orchestrator.ts +++ b/src/core/stream/orchestrator/stream_orchestrator.ts @@ -32,7 +32,7 @@ import type { CancellationSignal } from "../../../utils/task_canceller"; import TaskCanceller from "../../../utils/task_canceller"; import WeakMapMemory from "../../../utils/weak_map_memory"; import type { IRepresentationEstimator } from "../../adaptive"; -import type { SegmentFetcherCreator } from "../../fetchers"; +import type { SegmentQueueCreator } from "../../fetchers"; import type { IBufferType, SegmentSink } from "../../segment_sinks"; import type SegmentSinksStore from "../../segment_sinks"; import { BufferGarbageCollector } from "../../segment_sinks"; @@ -68,7 +68,7 @@ import getTimeRangesForContent from "./get_time_ranges_for_content"; * Representation to play. * @param {Object} segmentSinksStore - Will be used to lazily create * SegmentSink instances associated with the current content. - * @param {Object} segmentFetcherCreator - Allow to download segments. + * @param {Object} segmentQueueCreator - Allow to download segments. * @param {Object} options * @param {Object} callbacks - The `StreamOrchestrator` relies on a system of * callbacks that it will call on various events. @@ -94,7 +94,7 @@ export default function StreamOrchestrator( playbackObserver: IReadOnlyPlaybackObserver, representationEstimator: IRepresentationEstimator, segmentSinksStore: SegmentSinksStore, - segmentFetcherCreator: SegmentFetcherCreator, + segmentQueueCreator: SegmentQueueCreator, options: IStreamOrchestratorOptions, callbacks: IStreamOrchestratorCallbacks, orchestratorCancelSignal: CancellationSignal, @@ -499,7 +499,7 @@ export default function StreamOrchestrator( content: { manifest, period: basePeriod }, garbageCollectors, maxVideoBufferSize, - segmentFetcherCreator, + segmentQueueCreator, segmentSinksStore, options, playbackObserver, diff --git a/src/core/stream/period/period_stream.ts b/src/core/stream/period/period_stream.ts index 9667758f74f..531c1813ec8 100644 --- a/src/core/stream/period/period_stream.ts +++ b/src/core/stream/period/period_stream.ts @@ -83,7 +83,7 @@ export default function PeriodStream( garbageCollectors, playbackObserver, representationEstimator, - segmentFetcherCreator, + segmentQueueCreator, segmentSinksStore, options, wantedBufferAhead, @@ -351,7 +351,7 @@ export default function PeriodStream( playbackObserver: adaptationPlaybackObserver, representationEstimator, segmentSink, - segmentFetcherCreator, + segmentQueueCreator, wantedBufferAhead, maxVideoBufferSize, }, diff --git a/src/core/stream/period/types.ts b/src/core/stream/period/types.ts index 3d6951e2fd2..67dbd7c756b 100644 --- a/src/core/stream/period/types.ts +++ b/src/core/stream/period/types.ts @@ -10,7 +10,7 @@ import type SharedReference from "../../../utils/reference"; import type { CancellationSignal } from "../../../utils/task_canceller"; import type WeakMapMemory from "../../../utils/weak_map_memory"; import type { IRepresentationEstimator } from "../../adaptive"; -import type { SegmentFetcherCreator } from "../../fetchers"; +import type { SegmentQueueCreator } from "../../fetchers"; import type { IBufferType, SegmentSink } from "../../segment_sinks"; import type SegmentSinksStore from "../../segment_sinks"; import type { @@ -106,7 +106,7 @@ export interface IPeriodStreamArguments { SegmentSink, (cancelSignal: CancellationSignal) => void >; - segmentFetcherCreator: SegmentFetcherCreator; + segmentQueueCreator: SegmentQueueCreator; segmentSinksStore: SegmentSinksStore; playbackObserver: IReadOnlyPlaybackObserver; options: IPeriodStreamOptions; diff --git a/src/core/stream/representation/representation_stream.ts b/src/core/stream/representation/representation_stream.ts index 6bf6b57d90a..c8fee252f21 100644 --- a/src/core/stream/representation/representation_stream.ts +++ b/src/core/stream/representation/representation_stream.ts @@ -27,20 +27,17 @@ import config from "../../../config"; import log from "../../../log"; import type { ISegment } from "../../../manifest"; import objectAssign from "../../../utils/object_assign"; -import SharedReference from "../../../utils/reference"; import type { CancellationSignal } from "../../../utils/task_canceller"; import TaskCanceller, { CancellationError } from "../../../utils/task_canceller"; +import type { + IParsedInitSegmentPayload, + IParsedSegmentPayload, +} from "../../fetchers/segment/segment_queue"; import type { IQueuedSegment, IRepresentationStreamArguments, IRepresentationStreamCallbacks, } from "./types"; -import type { - IDownloadQueueItem, - IParsedInitSegmentPayload, - IParsedSegmentPayload, -} from "./utils/downloading_queue"; -import DownloadingQueue from "./utils/downloading_queue"; import getBufferStatus from "./utils/get_buffer_status"; import getSegmentPriority from "./utils/get_segment_priority"; import pushInitSegment from "./utils/push_init_segment"; @@ -87,7 +84,7 @@ export default function RepresentationStream( options, playbackObserver, segmentSink, - segmentFetcher, + segmentQueue, terminate, }: IRepresentationStreamArguments, callbacks: IRepresentationStreamCallbacks, @@ -122,15 +119,6 @@ export default function RepresentationStream( } }); - /** Emit the last scheduled downloading queue for segments. */ - const lastSegmentQueue = new SharedReference( - { - initSegment: null, - segmentQueue: [], - }, - segmentsLoadingCanceller.signal, - ); - /** If `true`, the current Representation has a linked initialization segment. */ const hasInitSegment = initSegmentState.segment !== null; @@ -167,45 +155,60 @@ export default function RepresentationStream( } } - /** Will load every segments in `lastSegmentQueue` */ - const downloadingQueue = new DownloadingQueue( - content, - lastSegmentQueue, - segmentFetcher, - hasInitSegment, - ); - downloadingQueue.addEventListener("error", (err) => { + segmentQueue.addEventListener("error", (err) => { if (segmentsLoadingCanceller.signal.isCancelled()) { return; // ignore post requests-cancellation loading-related errors, } globalCanceller.cancel(); // Stop every operations callbacks.error(err); }); - downloadingQueue.addEventListener("parsedInitSegment", onParsedChunk); - downloadingQueue.addEventListener("parsedMediaSegment", onParsedChunk); - downloadingQueue.addEventListener("emptyQueue", checkStatus); - downloadingQueue.addEventListener("requestRetry", (payload) => { - callbacks.warning(payload.error); - if (segmentsLoadingCanceller.signal.isCancelled()) { - return; // If the previous callback led to loading operations being stopped, skip - } - const retriedSegment = payload.segment; - const { index } = representation; - if (index.isSegmentStillAvailable(retriedSegment) === false) { - checkStatus(); - } else if (index.canBeOutOfSyncError(payload.error, retriedSegment)) { - callbacks.manifestMightBeOufOfSync(); - } - }); - downloadingQueue.addEventListener("fullyLoadedSegment", (segment) => { - segmentSink - .signalSegmentComplete(objectAssign({ segment }, content)) - .catch(onFatalBufferError); - }); - downloadingQueue.start(); + segmentQueue.addEventListener( + "parsedInitSegment", + onParsedChunk, + segmentsLoadingCanceller.signal, + ); + segmentQueue.addEventListener( + "parsedMediaSegment", + onParsedChunk, + segmentsLoadingCanceller.signal, + ); + segmentQueue.addEventListener( + "emptyQueue", + checkStatus, + segmentsLoadingCanceller.signal, + ); + segmentQueue.addEventListener( + "requestRetry", + (payload) => { + callbacks.warning(payload.error); + if (segmentsLoadingCanceller.signal.isCancelled()) { + return; // If the previous callback led to loading operations being stopped, skip + } + const retriedSegment = payload.segment; + const { index } = representation; + if (index.isSegmentStillAvailable(retriedSegment) === false) { + checkStatus(); + } else if (index.canBeOutOfSyncError(payload.error, retriedSegment)) { + callbacks.manifestMightBeOufOfSync(); + } + }, + segmentsLoadingCanceller.signal, + ); + segmentQueue.addEventListener( + "fullyLoadedSegment", + (segment) => { + segmentSink + .signalSegmentComplete(objectAssign({ segment }, content)) + .catch(onFatalBufferError); + }, + segmentsLoadingCanceller.signal, + ); + + /** Emit the last scheduled downloading queue for segments. */ + const segmentsToLoadRef = segmentQueue.resetForContent(content, hasInitSegment); + segmentsLoadingCanceller.signal.register(() => { - downloadingQueue.removeEventListener(); - downloadingQueue.stop(); + segmentQueue.stop(); }); playbackObserver.listen(checkStatus, { @@ -286,14 +289,14 @@ export default function RepresentationStream( const terminateVal = terminate.getValue(); if (terminateVal === null) { - lastSegmentQueue.setValue({ + segmentsToLoadRef.setValue({ initSegment: neededInitSegment, segmentQueue: neededSegments, }); } else if (terminateVal.urgent) { log.debug("Stream: Urgent switch, terminate now.", bufferType); - lastSegmentQueue.setValue({ initSegment: null, segmentQueue: [] }); - lastSegmentQueue.finish(); + segmentsToLoadRef.setValue({ initSegment: null, segmentQueue: [] }); + segmentsToLoadRef.finish(); segmentsLoadingCanceller.cancel(); callbacks.terminating(); return; @@ -304,8 +307,8 @@ export default function RepresentationStream( // is wanted instead, whichever comes first. const mostNeededSegment = neededSegments[0]; - const initSegmentRequest = downloadingQueue.getRequestedInitSegment(); - const currentSegmentRequest = downloadingQueue.getRequestedMediaSegment(); + const initSegmentRequest = segmentQueue.getRequestedInitSegment(); + const currentSegmentRequest = segmentQueue.getRequestedMediaSegment(); const nextQueue = currentSegmentRequest === null || @@ -315,13 +318,13 @@ export default function RepresentationStream( : [mostNeededSegment]; const nextInit = initSegmentRequest === null ? null : neededInitSegment; - lastSegmentQueue.setValue({ + segmentsToLoadRef.setValue({ initSegment: nextInit, segmentQueue: nextQueue, }); if (nextQueue.length === 0 && nextInit === null) { log.debug("Stream: No request left, terminate", bufferType); - lastSegmentQueue.finish(); + segmentsToLoadRef.finish(); segmentsLoadingCanceller.cancel(); callbacks.terminating(); return; diff --git a/src/core/stream/representation/types.ts b/src/core/stream/representation/types.ts index da5d4389b83..4ab8a3fa178 100644 --- a/src/core/stream/representation/types.ts +++ b/src/core/stream/representation/types.ts @@ -18,7 +18,7 @@ import type { } from "../../../public_types"; import type { IRange } from "../../../utils/ranges"; import type { IReadOnlySharedReference } from "../../../utils/reference"; -import type { IPrioritizedSegmentFetcher } from "../../fetchers"; +import type { SegmentQueue } from "../../fetchers"; import type { IBufferType, SegmentSink } from "../../segment_sinks"; /** Callbacks called by the `RepresentationStream` on various events. */ @@ -257,7 +257,7 @@ export interface IRepresentationStreamArguments { /** The `SegmentSink` on which segments will be pushed. */ segmentSink: SegmentSink; /** Interface used to load new segments. */ - segmentFetcher: IPrioritizedSegmentFetcher; + segmentQueue: SegmentQueue; /** * Reference emitting when the RepresentationStream should "terminate". * diff --git a/src/core/types.ts b/src/core/types.ts index 7f9e1ba9e18..cb7e6212629 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -5,7 +5,7 @@ import type { } from "./adaptive"; import type { IManifestFetcherSettings, - ISegmentFetcherCreatorBackoffOptions, + ISegmentQueueCreatorBackoffOptions, } from "./fetchers"; import type { IBufferedChunk, @@ -36,7 +36,7 @@ export type { // Fetchers Metadata IManifestFetcherSettings, - ISegmentFetcherCreatorBackoffOptions, + ISegmentQueueCreatorBackoffOptions, // Media Sinks Metadata IBufferType, diff --git a/src/main_thread/init/media_source_content_initializer.ts b/src/main_thread/init/media_source_content_initializer.ts index 4693a479fde..79ed37119aa 100644 --- a/src/main_thread/init/media_source_content_initializer.ts +++ b/src/main_thread/init/media_source_content_initializer.ts @@ -23,7 +23,7 @@ import type { } from "../../core/adaptive"; import AdaptiveRepresentationSelector from "../../core/adaptive"; import type { IManifestFetcherSettings } from "../../core/fetchers"; -import { ManifestFetcher, SegmentFetcherCreator } from "../../core/fetchers"; +import { ManifestFetcher, SegmentQueueCreator } from "../../core/fetchers"; import createContentTimeBoundariesObserver from "../../core/main/common/create_content_time_boundaries_observer"; import DecipherabilityFreezeDetector from "../../core/main/common/DecipherabilityFreezeDetector"; import SegmentSinksStore from "../../core/segment_sinks"; @@ -362,7 +362,7 @@ export default class MediaSourceContentInitializer extends ContentInitializer { bufferOptions, ); - const segmentFetcherCreator = new SegmentFetcherCreator( + const segmentQueueCreator = new SegmentQueueCreator( transport, segmentRequestOptions, initCanceller.signal, @@ -409,7 +409,7 @@ export default class MediaSourceContentInitializer extends ContentInitializer { autoPlay: shouldPlay, manifest, representationEstimator, - segmentFetcherCreator, + segmentQueueCreator, speed, protectionRef, bufferOptions: subBufferOptions, @@ -471,7 +471,7 @@ export default class MediaSourceContentInitializer extends ContentInitializer { playbackObserver, protectionRef, representationEstimator, - segmentFetcherCreator, + segmentQueueCreator, speed, } = args; @@ -696,7 +696,7 @@ export default class MediaSourceContentInitializer extends ContentInitializer { coreObserver, representationEstimator, segmentSinksStore, - segmentFetcherCreator, + segmentQueueCreator, bufferOptions, handleStreamOrchestratorCallbacks(), cancelSignal, @@ -1039,7 +1039,7 @@ interface IBufferingMediaSettings { /** Estimate the right Representation. */ representationEstimator: IRepresentationEstimator; /** Module to facilitate segment fetching. */ - segmentFetcherCreator: SegmentFetcherCreator; + segmentQueueCreator: SegmentQueueCreator; /** Last wanted playback rate. */ speed: IReadOnlySharedReference; /** diff --git a/src/multithread_types.ts b/src/multithread_types.ts index 1938065a792..2ead85a1532 100644 --- a/src/multithread_types.ts +++ b/src/multithread_types.ts @@ -8,7 +8,7 @@ import type { ISegmentSinkMetrics } from "./core/segment_sinks/segment_buffers_s import type { IResolutionInfo, IManifestFetcherSettings, - ISegmentFetcherCreatorBackoffOptions, + ISegmentQueueCreatorBackoffOptions, IInbandEvent, IPausedPlaybackObservation, IRepresentationsChoice, @@ -130,7 +130,7 @@ export interface IContentInitializationData { */ manifestRetryOptions: IManifestFetcherSettings; /** Options relative to the fetching of media segments. */ - segmentRetryOptions: ISegmentFetcherCreatorBackoffOptions; + segmentRetryOptions: ISegmentQueueCreatorBackoffOptions; } export interface ILogLevelUpdateMessage {