Skip to content

Commit

Permalink
CancelRequests - Allow multiple cancel request writers in a single job
Browse files Browse the repository at this point in the history
Add the ability to require multiple cancel streams in a single job. Requiring is done as you would expect, just target the different streams on the JobConfig. Fulfilling the writer/instance is done by passing the stream instance into the `Fulfill` method.

The access wrappers now represent access to the cancel type not a specific cancel stream instance. This works because all of the cancel streams share the same SharedWrite Pending collection so getting write access for one stream grants write access for all. The instance passed through `Fulfill` will get safety checked in the access wrapper and delivered back to be used for writer creation.

Behaviour:
 - One cancel request stream required: `jobData.Fulfill` behaves as normal, no explicit instance required.
 - N cancel request stream required: `jobData.Fulfill` must be provided the instance for each stream to fulfill otherwise the safety system will throw an exception.

Note: This is a workaround implementation. #224 will make this cleaner.

This approach can be applied to all write wide (shard write) types. Support will be added in an upcoming commit.
  • Loading branch information
mbaker3 committed Jul 21, 2023
1 parent e286531 commit 7fbfdb0
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,14 @@ private void ResolveDuplicateAccessWrappers(AbstractAccessWrapper accessWrapper)
// If the existing access wrapper facilitates the needs of the new one then just keep the existing one.
if (existingAccessWrapper.AccessType.IsCompatibleWith(accessWrapper.AccessType))
{
existingAccessWrapper.MergeStateFrom(accessWrapper);
accessWrapper.Dispose();
}
// If the new access wrapper facilitates the needs of the existing one then dispose the existing wrapper and
// use the new access wrapper.
else if (accessWrapper.AccessType.IsCompatibleWith(existingAccessWrapper.AccessType))
{
accessWrapper.MergeStateFrom(existingAccessWrapper);
existingAccessWrapper.Dispose();
m_AccessWrappers[accessWrapper.ID] = accessWrapper;
}
Expand Down Expand Up @@ -438,12 +440,12 @@ CancelProgressLookupAccessWrapper cancelProgressLookupAccessWrapper
return cancelProgressLookupAccessWrapper.ProgressLookup;
}

internal CancelRequestsDataStream GetCancelRequestsDataStream()
internal CancelRequestsDataStream GetCancelRequestsDataStream(IAbstractCancelRequestDataStream explicitSource = null)
{
CancelRequestsPendingAccessWrapper cancelRequestsPendingAccessWrapper
= GetAccessWrapper<CancelRequestsPendingAccessWrapper>(Usage.RequestCancel);

return cancelRequestsPendingAccessWrapper.CancelRequestsDataStream;
return cancelRequestsPendingAccessWrapper.GetInstance(explicitSource);
}

internal EntityProxyDataStream<TInstance> GetPendingDataStream<TInstance>(Usage usage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ protected AbstractJobData(IJobConfig jobConfig)
/// <summary>
/// Fulfills an instance of the provided type for the job.
/// </summary>
public void Fulfill(out CancelRequestsWriter instance)
public void Fulfill(out CancelRequestsWriter instance, IAbstractCancelRequestDataStream explicitSource = null)
{
CancelRequestsDataStream cancelRequestDataStream = m_JobConfig.GetCancelRequestsDataStream();
CancelRequestsDataStream cancelRequestDataStream = m_JobConfig.GetCancelRequestsDataStream(explicitSource);
instance = cancelRequestDataStream.CreateCancelRequestsWriter();
}

Expand All @@ -60,11 +60,11 @@ public void Fulfill<TInstance>(out DataStreamActiveReader<TInstance> instance)
instance = dataStream.CreateDataStreamActiveReader();
}


//*************************************************************************************************************
// ENTITY SPAWNER
//*************************************************************************************************************

public void Fulfill(out EntitySpawner entitySpawner)
{
entitySpawner = m_JobConfig.GetEntitySpawner();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Anvil.CSharp.Core;
using Anvil.CSharp.Logging;
using Anvil.Unity.DOTS.Jobs;
using System;
using System.Diagnostics;
Expand All @@ -23,6 +24,16 @@ protected AbstractAccessWrapper(AccessType accessType, AbstractJobConfig.Usage u
public abstract JobHandle AcquireAsync();
public abstract void ReleaseAsync(JobHandle dependsOn);

/// <summary>
/// Merge the state from another wrapper instance of the same type
/// </summary>
/// <param name="other"></param>
public virtual void MergeStateFrom(AbstractAccessWrapper other)
{
Debug_EnsureSameID(other);
Debug_EnsureSameType(other);
}

//*************************************************************************************************************
// SAFETY
//*************************************************************************************************************
Expand All @@ -38,5 +49,25 @@ private void Debug_SetWrapperType()
? type.GetGenericTypeDefinition()
: type;
}

[Conditional("ANVIL_DEBUG_SAFETY")]
private void Debug_EnsureSameID(AbstractAccessWrapper wrapper)
{
if (ID != wrapper.ID)
{
throw new Exception($"Cannot merge two wrappers with different IDs. This:{ID}, Other:{wrapper.ID}");
}
}

[Conditional("ANVIL_DEBUG_SAFETY")]
private void Debug_EnsureSameType(AbstractAccessWrapper wrapper)
{
Type thisType = GetType();
Type otherType = wrapper.GetType();
if (thisType != otherType)
{
throw new Exception($"Cannot merge two wrappers of different types. This:{thisType.GetReadableName()}, Other:{otherType.GetReadableName()}");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ internal class CancelProgressLookupAccessWrapper : AbstractAccessWrapper
public UnsafeParallelHashMap<EntityKeyedTaskID, bool> ProgressLookup { get; }


public CancelProgressLookupAccessWrapper(ActiveLookupData<EntityKeyedTaskID> cancelProgressLookupData, AccessType accessType, AbstractJobConfig.Usage usage) : base(accessType, usage)
public CancelProgressLookupAccessWrapper(
ActiveLookupData<EntityKeyedTaskID> cancelProgressLookupData,
AccessType accessType,
AbstractJobConfig.Usage usage)
: base(accessType, usage)
{
m_CancelProgressLookupData = cancelProgressLookupData;
ProgressLookup = m_CancelProgressLookupData.Lookup;
Expand All @@ -27,4 +31,4 @@ public override void ReleaseAsync(JobHandle dependsOn)
m_CancelProgressLookupData.ReleaseAsync(dependsOn);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,92 @@
using Anvil.CSharp.Logging;
using Anvil.Unity.DOTS.Jobs;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using Unity.Jobs;

namespace Anvil.Unity.DOTS.Entities.TaskDriver
{
internal class CancelRequestsPendingAccessWrapper : AbstractAccessWrapper
{
public CancelRequestsDataStream CancelRequestsDataStream { get; }
private readonly CancelRequestsDataStream m_DefaultStream;

public CancelRequestsPendingAccessWrapper(
CancelRequestsDataStream cancelRequestsDataStream,
CancelRequestsDataStream defaultStream,
AccessType accessType,
AbstractJobConfig.Usage usage)
: base(accessType, usage)
{
CancelRequestsDataStream = cancelRequestsDataStream;
m_DefaultStream = defaultStream;
DEBUG_TrackRequiredStream(defaultStream);
}

public override JobHandle AcquireAsync()
{
return CancelRequestsDataStream.AcquirePendingAsync(AccessType);
return m_DefaultStream.AcquirePendingAsync(AccessType);
}

public override void ReleaseAsync(JobHandle dependsOn)
{
CancelRequestsDataStream.ReleasePendingAsync(dependsOn);
m_DefaultStream.ReleasePendingAsync(dependsOn);
}

public CancelRequestsDataStream GetInstance(IAbstractCancelRequestDataStream explicitStream = null)
{
DEBUG_EnforceExplicitStream(explicitStream);
DEBUG_EnsureStreamWasRequired(explicitStream);

return (CancelRequestsDataStream)explicitStream ?? m_DefaultStream;
}

public override void MergeStateFrom(AbstractAccessWrapper other)
{
base.MergeStateFrom(other);

CancelRequestsPendingAccessWrapper otherTyped = (CancelRequestsPendingAccessWrapper)other;
m_DEBUG_RequiredStreams.UnionWith(otherTyped.m_DEBUG_RequiredStreams);
DEBUG_TrackRequiredStream(otherTyped.m_DefaultStream);
}

//*************************************************************************************************************
// SAFETY
//*************************************************************************************************************

// #if ANVIL_DEBUG_SAFETY
private HashSet<CancelRequestsDataStream> m_DEBUG_RequiredStreams;

[Conditional("ANVIL_DEBUG_SAFETY")]
public void DEBUG_TrackRequiredStream(CancelRequestsDataStream stream)
{
m_DEBUG_RequiredStreams ??= new HashSet<CancelRequestsDataStream>(1);
m_DEBUG_RequiredStreams.Add(stream);
}

[Conditional("ANVIL_DEBUG_SAFETY")]
private void DEBUG_EnsureStreamWasRequired(IAbstractCancelRequestDataStream stream)
{
CancelRequestsDataStream cancelStream = (CancelRequestsDataStream)stream;
if (stream == null || m_DEBUG_RequiredStreams.Contains(cancelStream))
{
return;
}

throw new Exception($"The explicit stream instance requested was not set as required. DataTargetID:{cancelStream.DataTargetID}");
}

[Conditional("ANVIL_DEBUG_SAFETY")]
private void DEBUG_EnforceExplicitStream(IAbstractCancelRequestDataStream stream)
{
int requiredStreamCount = m_DEBUG_RequiredStreams.Count;
if (stream == null && requiredStreamCount > 1)
{
throw new Exception($"More than one stream has set this type as a requirement. The exact stream must be provided on retrieval. Type:{typeof(CancelRequestsDataStream).GetReadableName()}");
}

if (stream != null && requiredStreamCount == 1)
{
Logger.Warning($"An explicit stream was provided when not required. Consider using default fulfillment. Type:{typeof(CancelRequestsDataStream).GetReadableName()}");
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ internal class CancelRequestsDataStream : AbstractDataStream,
ISystemCancelRequestDataStream
{
private const string UNIQUE_CONTEXT_IDENTIFIER = "CANCEL_REQUEST";

private readonly CancelRequestsDataSource m_DataSource;
public ActiveLookupData<EntityKeyedTaskID> ActiveLookupData { get; }

Expand All @@ -22,7 +22,7 @@ public override IDataSource DataSource
{
get => m_DataSource;
}

/// <inheritdoc cref="IAbstractDataStream.ActiveDataVersion"/>
public uint ActiveDataVersion
{
Expand Down Expand Up @@ -88,11 +88,11 @@ public void ReleaseCancelRequestsWriter()
{
ReleasePending();
}

/// <inheritdoc cref="IAbstractDataStream.IsActiveDataInvalidated"/>
public bool IsActiveDataInvalidated(uint lastVersion)
{
return ActiveLookupData.IsDataInvalidated(lastVersion);
}
}
}
}

0 comments on commit 7fbfdb0

Please sign in to comment.