From 7fbfdb07d87bd3b55da154722afa2bff2a0456f8 Mon Sep 17 00:00:00 2001 From: Mike Baker <1426795+mbaker3@users.noreply.github.com> Date: Thu, 20 Jul 2023 22:43:00 -0400 Subject: [PATCH] CancelRequests - Allow multiple cancel request writers in a single job Add the ability to require multiple cancel streams in a single job. Requiring is done as you would expect, just target the different streams on the JobConfig. Fulfilling the writer/instance is done by passing the stream instance into the `Fulfill` method. The access wrappers now represent access to the cancel type not a specific cancel stream instance. This works because all of the cancel streams share the same SharedWrite Pending collection so getting write access for one stream grants write access for all. The instance passed through `Fulfill` will get safety checked in the access wrapper and delivered back to be used for writer creation. Behaviour: - One cancel request stream required: `jobData.Fulfill` behaves as normal, no explicit instance required. - N cancel request stream required: `jobData.Fulfill` must be provided the instance for each stream to fulfill otherwise the safety system will throw an exception. Note: This is a workaround implementation. #224 will make this cleaner. This approach can be applied to all write wide (shard write) types. Support will be added in an upcoming commit. --- .../Job/JobConfig/AbstractJobConfig.cs | 6 +- .../TaskSet/Job/JobData/AbstractJobData.cs | 8 +- .../Job/Wrapper/AbstractAccessWrapper.cs | 31 ++++++++ .../CancelProgressLookupAccessWrapper.cs | 8 +- .../CancelRequestsPendingAccessWrapper.cs | 75 +++++++++++++++++-- .../Cancellation/CancelRequestsDataStream.cs | 8 +- 6 files changed, 118 insertions(+), 18 deletions(-) diff --git a/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/JobConfig/AbstractJobConfig.cs b/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/JobConfig/AbstractJobConfig.cs index 3411623d..89b4a79e 100644 --- a/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/JobConfig/AbstractJobConfig.cs +++ b/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/JobConfig/AbstractJobConfig.cs @@ -124,12 +124,14 @@ private void ResolveDuplicateAccessWrappers(AbstractAccessWrapper accessWrapper) // If the existing access wrapper facilitates the needs of the new one then just keep the existing one. if (existingAccessWrapper.AccessType.IsCompatibleWith(accessWrapper.AccessType)) { + existingAccessWrapper.MergeStateFrom(accessWrapper); accessWrapper.Dispose(); } // If the new access wrapper facilitates the needs of the existing one then dispose the existing wrapper and // use the new access wrapper. else if (accessWrapper.AccessType.IsCompatibleWith(existingAccessWrapper.AccessType)) { + accessWrapper.MergeStateFrom(existingAccessWrapper); existingAccessWrapper.Dispose(); m_AccessWrappers[accessWrapper.ID] = accessWrapper; } @@ -438,12 +440,12 @@ CancelProgressLookupAccessWrapper cancelProgressLookupAccessWrapper return cancelProgressLookupAccessWrapper.ProgressLookup; } - internal CancelRequestsDataStream GetCancelRequestsDataStream() + internal CancelRequestsDataStream GetCancelRequestsDataStream(IAbstractCancelRequestDataStream explicitSource = null) { CancelRequestsPendingAccessWrapper cancelRequestsPendingAccessWrapper = GetAccessWrapper(Usage.RequestCancel); - return cancelRequestsPendingAccessWrapper.CancelRequestsDataStream; + return cancelRequestsPendingAccessWrapper.GetInstance(explicitSource); } internal EntityProxyDataStream GetPendingDataStream(Usage usage) diff --git a/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/JobData/AbstractJobData.cs b/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/JobData/AbstractJobData.cs index 5b7dd0d7..45c3e189 100644 --- a/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/JobData/AbstractJobData.cs +++ b/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/JobData/AbstractJobData.cs @@ -34,9 +34,9 @@ protected AbstractJobData(IJobConfig jobConfig) /// /// Fulfills an instance of the provided type for the job. /// - public void Fulfill(out CancelRequestsWriter instance) + public void Fulfill(out CancelRequestsWriter instance, IAbstractCancelRequestDataStream explicitSource = null) { - CancelRequestsDataStream cancelRequestDataStream = m_JobConfig.GetCancelRequestsDataStream(); + CancelRequestsDataStream cancelRequestDataStream = m_JobConfig.GetCancelRequestsDataStream(explicitSource); instance = cancelRequestDataStream.CreateCancelRequestsWriter(); } @@ -60,11 +60,11 @@ public void Fulfill(out DataStreamActiveReader instance) instance = dataStream.CreateDataStreamActiveReader(); } - + //************************************************************************************************************* // ENTITY SPAWNER //************************************************************************************************************* - + public void Fulfill(out EntitySpawner entitySpawner) { entitySpawner = m_JobConfig.GetEntitySpawner(); diff --git a/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/Wrapper/AbstractAccessWrapper.cs b/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/Wrapper/AbstractAccessWrapper.cs index c3049d79..8a5f728f 100644 --- a/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/Wrapper/AbstractAccessWrapper.cs +++ b/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/Wrapper/AbstractAccessWrapper.cs @@ -1,4 +1,5 @@ using Anvil.CSharp.Core; +using Anvil.CSharp.Logging; using Anvil.Unity.DOTS.Jobs; using System; using System.Diagnostics; @@ -23,6 +24,16 @@ protected AbstractAccessWrapper(AccessType accessType, AbstractJobConfig.Usage u public abstract JobHandle AcquireAsync(); public abstract void ReleaseAsync(JobHandle dependsOn); + /// + /// Merge the state from another wrapper instance of the same type + /// + /// + public virtual void MergeStateFrom(AbstractAccessWrapper other) + { + Debug_EnsureSameID(other); + Debug_EnsureSameType(other); + } + //************************************************************************************************************* // SAFETY //************************************************************************************************************* @@ -38,5 +49,25 @@ private void Debug_SetWrapperType() ? type.GetGenericTypeDefinition() : type; } + + [Conditional("ANVIL_DEBUG_SAFETY")] + private void Debug_EnsureSameID(AbstractAccessWrapper wrapper) + { + if (ID != wrapper.ID) + { + throw new Exception($"Cannot merge two wrappers with different IDs. This:{ID}, Other:{wrapper.ID}"); + } + } + + [Conditional("ANVIL_DEBUG_SAFETY")] + private void Debug_EnsureSameType(AbstractAccessWrapper wrapper) + { + Type thisType = GetType(); + Type otherType = wrapper.GetType(); + if (thisType != otherType) + { + throw new Exception($"Cannot merge two wrappers of different types. This:{thisType.GetReadableName()}, Other:{otherType.GetReadableName()}"); + } + } } } \ No newline at end of file diff --git a/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/Wrapper/CancelProgressLookupAccessWrapper.cs b/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/Wrapper/CancelProgressLookupAccessWrapper.cs index c3730a95..c2153879 100644 --- a/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/Wrapper/CancelProgressLookupAccessWrapper.cs +++ b/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/Wrapper/CancelProgressLookupAccessWrapper.cs @@ -11,7 +11,11 @@ internal class CancelProgressLookupAccessWrapper : AbstractAccessWrapper public UnsafeParallelHashMap ProgressLookup { get; } - public CancelProgressLookupAccessWrapper(ActiveLookupData cancelProgressLookupData, AccessType accessType, AbstractJobConfig.Usage usage) : base(accessType, usage) + public CancelProgressLookupAccessWrapper( + ActiveLookupData cancelProgressLookupData, + AccessType accessType, + AbstractJobConfig.Usage usage) + : base(accessType, usage) { m_CancelProgressLookupData = cancelProgressLookupData; ProgressLookup = m_CancelProgressLookupData.Lookup; @@ -27,4 +31,4 @@ public override void ReleaseAsync(JobHandle dependsOn) m_CancelProgressLookupData.ReleaseAsync(dependsOn); } } -} +} \ No newline at end of file diff --git a/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/Wrapper/CancelRequestsPendingAccessWrapper.cs b/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/Wrapper/CancelRequestsPendingAccessWrapper.cs index 0866d6c5..85a9e6d5 100644 --- a/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/Wrapper/CancelRequestsPendingAccessWrapper.cs +++ b/Scripts/Runtime/Entities/TaskDriver/TaskSet/Job/Wrapper/CancelRequestsPendingAccessWrapper.cs @@ -1,29 +1,92 @@ +using Anvil.CSharp.Logging; using Anvil.Unity.DOTS.Jobs; +using System; +using System.Collections.Generic; +using System.Diagnostics; using Unity.Jobs; namespace Anvil.Unity.DOTS.Entities.TaskDriver { internal class CancelRequestsPendingAccessWrapper : AbstractAccessWrapper { - public CancelRequestsDataStream CancelRequestsDataStream { get; } + private readonly CancelRequestsDataStream m_DefaultStream; public CancelRequestsPendingAccessWrapper( - CancelRequestsDataStream cancelRequestsDataStream, + CancelRequestsDataStream defaultStream, AccessType accessType, AbstractJobConfig.Usage usage) : base(accessType, usage) { - CancelRequestsDataStream = cancelRequestsDataStream; + m_DefaultStream = defaultStream; + DEBUG_TrackRequiredStream(defaultStream); } public override JobHandle AcquireAsync() { - return CancelRequestsDataStream.AcquirePendingAsync(AccessType); + return m_DefaultStream.AcquirePendingAsync(AccessType); } public override void ReleaseAsync(JobHandle dependsOn) { - CancelRequestsDataStream.ReleasePendingAsync(dependsOn); + m_DefaultStream.ReleasePendingAsync(dependsOn); + } + + public CancelRequestsDataStream GetInstance(IAbstractCancelRequestDataStream explicitStream = null) + { + DEBUG_EnforceExplicitStream(explicitStream); + DEBUG_EnsureStreamWasRequired(explicitStream); + + return (CancelRequestsDataStream)explicitStream ?? m_DefaultStream; + } + + public override void MergeStateFrom(AbstractAccessWrapper other) + { + base.MergeStateFrom(other); + + CancelRequestsPendingAccessWrapper otherTyped = (CancelRequestsPendingAccessWrapper)other; + m_DEBUG_RequiredStreams.UnionWith(otherTyped.m_DEBUG_RequiredStreams); + DEBUG_TrackRequiredStream(otherTyped.m_DefaultStream); + } + + //************************************************************************************************************* + // SAFETY + //************************************************************************************************************* + + // #if ANVIL_DEBUG_SAFETY + private HashSet m_DEBUG_RequiredStreams; + + [Conditional("ANVIL_DEBUG_SAFETY")] + public void DEBUG_TrackRequiredStream(CancelRequestsDataStream stream) + { + m_DEBUG_RequiredStreams ??= new HashSet(1); + m_DEBUG_RequiredStreams.Add(stream); + } + + [Conditional("ANVIL_DEBUG_SAFETY")] + private void DEBUG_EnsureStreamWasRequired(IAbstractCancelRequestDataStream stream) + { + CancelRequestsDataStream cancelStream = (CancelRequestsDataStream)stream; + if (stream == null || m_DEBUG_RequiredStreams.Contains(cancelStream)) + { + return; + } + + throw new Exception($"The explicit stream instance requested was not set as required. DataTargetID:{cancelStream.DataTargetID}"); + } + + [Conditional("ANVIL_DEBUG_SAFETY")] + private void DEBUG_EnforceExplicitStream(IAbstractCancelRequestDataStream stream) + { + int requiredStreamCount = m_DEBUG_RequiredStreams.Count; + if (stream == null && requiredStreamCount > 1) + { + throw new Exception($"More than one stream has set this type as a requirement. The exact stream must be provided on retrieval. Type:{typeof(CancelRequestsDataStream).GetReadableName()}"); + } + + if (stream != null && requiredStreamCount == 1) + { + Logger.Warning($"An explicit stream was provided when not required. Consider using default fulfillment. Type:{typeof(CancelRequestsDataStream).GetReadableName()}"); + } } } -} +} \ No newline at end of file diff --git a/Scripts/Runtime/Entities/TaskDriver/TaskSet/TaskData/DataStream/Cancellation/CancelRequestsDataStream.cs b/Scripts/Runtime/Entities/TaskDriver/TaskSet/TaskData/DataStream/Cancellation/CancelRequestsDataStream.cs index a0cd1590..e62bebec 100644 --- a/Scripts/Runtime/Entities/TaskDriver/TaskSet/TaskData/DataStream/Cancellation/CancelRequestsDataStream.cs +++ b/Scripts/Runtime/Entities/TaskDriver/TaskSet/TaskData/DataStream/Cancellation/CancelRequestsDataStream.cs @@ -9,7 +9,7 @@ internal class CancelRequestsDataStream : AbstractDataStream, ISystemCancelRequestDataStream { private const string UNIQUE_CONTEXT_IDENTIFIER = "CANCEL_REQUEST"; - + private readonly CancelRequestsDataSource m_DataSource; public ActiveLookupData ActiveLookupData { get; } @@ -22,7 +22,7 @@ public override IDataSource DataSource { get => m_DataSource; } - + /// public uint ActiveDataVersion { @@ -88,11 +88,11 @@ public void ReleaseCancelRequestsWriter() { ReleasePending(); } - + /// public bool IsActiveDataInvalidated(uint lastVersion) { return ActiveLookupData.IsDataInvalidated(lastVersion); } } -} +} \ No newline at end of file