diff --git a/python/flink_agents/runtime/flink_runner_context.py b/python/flink_agents/runtime/flink_runner_context.py index 431907430..75ba9c24e 100644 --- a/python/flink_agents/runtime/flink_runner_context.py +++ b/python/flink_agents/runtime/flink_runner_context.py @@ -219,13 +219,23 @@ def create_flink_runner_context( agent_plan_json: str, executor: ThreadPoolExecutor, j_resource_adapter: Any, - job_identifier: str, - key: int, ) -> FlinkRunnerContext: """Used to create a FlinkRunnerContext Python object in Pemja environment.""" - ctx = FlinkRunnerContext( + return FlinkRunnerContext( j_runner_context, agent_plan_json, executor, j_resource_adapter ) + + +def flink_runner_context_switch_action_context( + ctx: FlinkRunnerContext, + job_identifier: str, + key: int, +) -> None: + """Switch the context of the flink runner context. + + The ctx is reused across keyed partitions, the context related to + specific key should be switched when process new action. + """ backend = ctx.config.get(LongTermMemoryOptions.BACKEND) # use external vector store based long term memory if backend == LongTermMemoryBackend.EXTERNAL_VECTOR_STORE: @@ -240,19 +250,6 @@ def create_flink_runner_context( key=str(key), ) ) - return ctx - - -def create_long_term_memory( - j_runner_context: Any, - agent_plan_json: str, - executor: ThreadPoolExecutor, - j_resource_adapter: Any, -) -> FlinkRunnerContext: - """Used to create a FlinkRunnerContext Python object in Pemja environment.""" - return FlinkRunnerContext( - j_runner_context, agent_plan_json, executor, j_resource_adapter - ) def create_async_thread_pool() -> ThreadPoolExecutor: diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java index 6321d9854..743b4aacd 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java @@ -42,34 +42,61 @@ * actions. */ public class RunnerContextImpl implements RunnerContext { + public static class MemoryContext { + private final CachedMemoryStore sensoryMemStore; + private final CachedMemoryStore shortTermMemStore; + private final List sensoryMemoryUpdates; + private final List shortTermMemoryUpdates; + + public MemoryContext( + CachedMemoryStore sensoryMemStore, CachedMemoryStore shortTermMemStore) { + this.sensoryMemStore = sensoryMemStore; + this.shortTermMemStore = shortTermMemStore; + this.sensoryMemoryUpdates = new LinkedList<>(); + this.shortTermMemoryUpdates = new LinkedList<>(); + } + + public List getShortTermMemoryUpdates() { + return shortTermMemoryUpdates; + } + + public List getSensoryMemoryUpdates() { + return sensoryMemoryUpdates; + } + + public CachedMemoryStore getShortTermMemStore() { + return shortTermMemStore; + } + + public CachedMemoryStore getSensoryMemStore() { + return sensoryMemStore; + } + } protected final List pendingEvents = new ArrayList<>(); - protected final CachedMemoryStore sensoryMemStore; - protected final CachedMemoryStore shortTermMemStore; protected final FlinkAgentsMetricGroupImpl agentMetricGroup; protected final Runnable mailboxThreadChecker; protected final AgentPlan agentPlan; - protected final List sensoryMemoryUpdates; - protected final List shortTermMemoryUpdates; + + protected MemoryContext memoryContext; protected String actionName; public RunnerContextImpl( - CachedMemoryStore sensoryMemStore, - CachedMemoryStore shortTermMemStore, FlinkAgentsMetricGroupImpl agentMetricGroup, Runnable mailboxThreadChecker, AgentPlan agentPlan) { - this.sensoryMemStore = sensoryMemStore; - this.shortTermMemStore = shortTermMemStore; this.agentMetricGroup = agentMetricGroup; this.mailboxThreadChecker = mailboxThreadChecker; this.agentPlan = agentPlan; - this.sensoryMemoryUpdates = new LinkedList<>(); - this.shortTermMemoryUpdates = new LinkedList<>(); } - public void setActionName(String actionName) { + public void switchActionContext(String actionName, MemoryContext memoryContext) { this.actionName = actionName; + this.memoryContext = memoryContext; + } + + public MemoryContext getMemoryContext() { + return memoryContext; } @Override @@ -112,7 +139,7 @@ public void checkNoPendingEvents() { public List getSensoryMemoryUpdates() { mailboxThreadChecker.run(); - return List.copyOf(sensoryMemoryUpdates); + return List.copyOf(memoryContext.getSensoryMemoryUpdates()); } /** @@ -124,7 +151,7 @@ public List getSensoryMemoryUpdates() { */ public List getShortTermMemoryUpdates() { mailboxThreadChecker.run(); - return List.copyOf(shortTermMemoryUpdates); + return List.copyOf(memoryContext.getShortTermMemoryUpdates()); } @Override @@ -132,10 +159,10 @@ public MemoryObject getSensoryMemory() throws Exception { mailboxThreadChecker.run(); return new MemoryObjectImpl( MemoryObject.MemoryType.SENSORY, - sensoryMemStore, + memoryContext.getSensoryMemStore(), MemoryObjectImpl.ROOT_KEY, mailboxThreadChecker, - sensoryMemoryUpdates); + memoryContext.getSensoryMemoryUpdates()); } @Override @@ -143,10 +170,10 @@ public MemoryObject getShortTermMemory() throws Exception { mailboxThreadChecker.run(); return new MemoryObjectImpl( MemoryObject.MemoryType.SHORT_TERM, - shortTermMemStore, + memoryContext.getShortTermMemStore(), MemoryObjectImpl.ROOT_KEY, mailboxThreadChecker, - shortTermMemoryUpdates); + memoryContext.getShortTermMemoryUpdates()); } @Override @@ -177,11 +204,11 @@ public String getActionName() { } public void persistMemory() throws Exception { - sensoryMemStore.persistCache(); - shortTermMemStore.persistCache(); + memoryContext.getSensoryMemStore().persistCache(); + memoryContext.getShortTermMemStore().persistCache(); } public void clearSensoryMemory() throws Exception { - sensoryMemStore.clear(); + memoryContext.getSensoryMemStore().clear(); } } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index 98ff14469..d827eb604 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -133,6 +133,9 @@ public class ActionExecutionOperator extends AbstractStreamOperator extends AbstractStreamOperator extends AbstractStreamOperator actionTaskRunnerContexts; + private final transient Map + actionTaskMemoryContexts; // Each job can only have one identifier and this identifier must be consistent across restarts. // We cannot use job id as the identifier here because user may change job id by @@ -198,7 +205,7 @@ public ActionExecutionOperator( this.eventListeners = new ArrayList<>(); this.actionStateStore = actionStateStore; this.checkpointIdToSeqNums = new HashMap<>(); - this.actionTaskRunnerContexts = new HashMap<>(); + this.actionTaskMemoryContexts = new HashMap<>(); } @Override @@ -443,12 +450,14 @@ private void processActionTaskForKey(Object key) throws Exception { } else { maybeInitActionState(key, sequenceNumber, actionTask.action, actionTask.event); ActionTask.ActionTaskResult actionTaskResult = - actionTask.invoke(getRuntimeContext().getUserCodeClassLoader()); + actionTask.invoke( + getRuntimeContext().getUserCodeClassLoader(), + this.pythonActionExecutor); // We remove the RunnerContext of the action task from the map after it is finished. The // RunnerContext will be added later if the action task has a generated action task, // meaning it is not finished. - actionTaskRunnerContexts.remove(actionTask); + actionTaskMemoryContexts.remove(actionTask); maybePersistTaskResult( key, sequenceNumber, @@ -483,7 +492,8 @@ private void processActionTaskForKey(Object key) throws Exception { // If the action task is not finished, we keep the runner context in the memory for the // next generated ActionTask to be invoked. - actionTaskRunnerContexts.put(generatedActionTask, actionTask.getRunnerContext()); + actionTaskMemoryContexts.put( + generatedActionTask, actionTask.getRunnerContext().getMemoryContext()); actionTasksKState.add(generatedActionTask); } @@ -552,6 +562,9 @@ private void initPythonEnvironment() throws Exception { pythonEnvironmentManager.open(); EmbeddedPythonEnvironment env = pythonEnvironmentManager.createEnvironment(); pythonInterpreter = env.getInterpreter(); + pythonRunnerContext = + new PythonRunnerContextImpl( + this.metricGroup, this::checkMailboxThread, this.agentPlan); if (containPythonAction) { initPythonActionExecutor(); } else { @@ -568,6 +581,7 @@ private void initPythonActionExecutor() throws Exception { pythonInterpreter, new ObjectMapper().writeValueAsString(agentPlan), javaResourceAdapter, + pythonRunnerContext, jobIdentifier); pythonActionExecutor.open(); } @@ -752,31 +766,28 @@ private void createAndSetRunnerContext(ActionTask actionTask) { } RunnerContextImpl runnerContext; - if (actionTaskRunnerContexts.containsKey(actionTask)) { - runnerContext = actionTaskRunnerContexts.get(actionTask); - } else if (actionTask.action.getExec() instanceof JavaFunction) { - runnerContext = - new RunnerContextImpl( - new CachedMemoryStore(sensoryMemState), - new CachedMemoryStore(shortTermMemState), - metricGroup, - this::checkMailboxThread, - agentPlan); + if (actionTask.action.getExec() instanceof JavaFunction) { + runnerContext = createOrGetRunnerContext(true); } else if (actionTask.action.getExec() instanceof PythonFunction) { - runnerContext = - new PythonRunnerContextImpl( - new CachedMemoryStore(sensoryMemState), - new CachedMemoryStore(shortTermMemState), - metricGroup, - this::checkMailboxThread, - agentPlan, - pythonActionExecutor); + runnerContext = createOrGetRunnerContext(false); } else { throw new IllegalStateException( "Unsupported action type: " + actionTask.action.getExec().getClass()); } - runnerContext.setActionName(actionTask.action.getName()); + RunnerContextImpl.MemoryContext memoryContext; + if (actionTaskMemoryContexts.containsKey(actionTask)) { + // action task for async execution action, should retrieve intermediate results from + // map. + memoryContext = actionTaskMemoryContexts.get(actionTask); + } else { + memoryContext = + new RunnerContextImpl.MemoryContext( + new CachedMemoryStore(sensoryMemState), + new CachedMemoryStore(shortTermMemState)); + } + + runnerContext.switchActionContext(actionTask.action.getName(), memoryContext); actionTask.setRunnerContext(runnerContext); } @@ -883,6 +894,24 @@ private void processEligibleWatermarks() throws Exception { } } + private RunnerContextImpl createOrGetRunnerContext(Boolean isJava) { + if (isJava) { + if (runnerContext == null) { + runnerContext = + new RunnerContextImpl( + this.metricGroup, this::checkMailboxThread, this.agentPlan); + } + return runnerContext; + } else { + if (pythonRunnerContext == null) { + pythonRunnerContext = + new PythonRunnerContextImpl( + this.metricGroup, this::checkMailboxThread, this.agentPlan); + } + return pythonRunnerContext; + } + } + /** Failed to execute Action task. */ public static class ActionTaskExecutionException extends Exception { public ActionTaskExecutionException(String message, Throwable cause) { diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java index 34f7850e8..053b9d955 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java @@ -20,6 +20,7 @@ import org.apache.flink.agents.api.Event; import org.apache.flink.agents.plan.actions.Action; import org.apache.flink.agents.runtime.context.RunnerContextImpl; +import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,7 +88,8 @@ public int hashCode() { } /** Invokes the action task. */ - public abstract ActionTaskResult invoke(ClassLoader userCodeClassLoader) throws Exception; + public abstract ActionTaskResult invoke( + ClassLoader userCodeClassLoader, PythonActionExecutor executor) throws Exception; public class ActionTaskResult { private final boolean finished; diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java index 9fc641b7a..65d8ef4ab 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/JavaActionTask.java @@ -20,6 +20,7 @@ import org.apache.flink.agents.api.Event; import org.apache.flink.agents.plan.JavaFunction; import org.apache.flink.agents.plan.actions.Action; +import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor; import static org.apache.flink.util.Preconditions.checkState; @@ -36,7 +37,8 @@ public JavaActionTask(Object key, Event event, Action action) { } @Override - public ActionTaskResult invoke(ClassLoader userCodeClassLoader) throws Exception { + public ActionTaskResult invoke(ClassLoader userCodeClassLoader, PythonActionExecutor executor) + throws Exception { LOG.debug( "Try execute java action {} for event {} with key {}.", action.getName(), diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java index 26d5a79a6..690d41240 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java @@ -21,10 +21,8 @@ import org.apache.flink.agents.api.context.RunnerContext; import org.apache.flink.agents.plan.AgentPlan; import org.apache.flink.agents.runtime.context.RunnerContextImpl; -import org.apache.flink.agents.runtime.memory.CachedMemoryStore; import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl; import org.apache.flink.agents.runtime.python.event.PythonEvent; -import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor; import org.apache.flink.util.Preconditions; import javax.annotation.concurrent.NotThreadSafe; @@ -32,23 +30,11 @@ /** A specialized {@link RunnerContext} that is specifically used when executing Python actions. */ @NotThreadSafe public class PythonRunnerContextImpl extends RunnerContextImpl { - - private final PythonActionExecutor pythonActionExecutor; - public PythonRunnerContextImpl( - CachedMemoryStore sensoryMemStore, - CachedMemoryStore shortTermMemStore, FlinkAgentsMetricGroupImpl agentMetricGroup, Runnable mailboxThreadChecker, - AgentPlan agentPlan, - PythonActionExecutor pythonActionExecutor) { - super( - sensoryMemStore, - shortTermMemStore, - agentMetricGroup, - mailboxThreadChecker, - agentPlan); - this.pythonActionExecutor = pythonActionExecutor; + AgentPlan agentPlan) { + super(agentMetricGroup, mailboxThreadChecker, agentPlan); } @Override @@ -62,8 +48,4 @@ public void sendEvent(String type, byte[] event, String eventString) { // this method will be invoked by PythonActionExecutor's python interpreter. sendEvent(new PythonEvent(event, type, eventString)); } - - public PythonActionExecutor getPythonActionExecutor() { - return pythonActionExecutor; - } } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java index a03c4c808..8f1e2d53e 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java @@ -21,7 +21,6 @@ import org.apache.flink.agents.plan.PythonFunction; import org.apache.flink.agents.plan.actions.Action; import org.apache.flink.agents.runtime.operator.ActionTask; -import org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl; import org.apache.flink.agents.runtime.python.event.PythonEvent; import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor; @@ -43,7 +42,8 @@ public PythonActionTask(Object key, Event event, Action action) { "Python action only accept python event, but got " + event); } - public ActionTaskResult invoke(ClassLoader userCodeClassLoader) throws Exception { + public ActionTaskResult invoke(ClassLoader userCodeClassLoader, PythonActionExecutor executor) + throws Exception { LOG.debug( "Try execute python action {} for event {} with key {}.", action.getName(), @@ -51,13 +51,9 @@ public ActionTaskResult invoke(ClassLoader userCodeClassLoader) throws Exception key); runnerContext.checkNoPendingEvents(); - PythonActionExecutor pythonActionExecutor = getPythonActionExecutor(); String pythonGeneratorRef = - pythonActionExecutor.executePythonFunction( - (PythonFunction) action.getExec(), - (PythonEvent) event, - runnerContext, - key.hashCode()); + executor.executePythonFunction( + (PythonFunction) action.getExec(), (PythonEvent) event, key.hashCode()); // If a user-defined action uses an interface to submit asynchronous tasks, it will return a // Python generator object instance upon its first execution. Otherwise, it means that no // asynchronous tasks were submitted and the action has already completed. @@ -67,14 +63,9 @@ public ActionTaskResult invoke(ClassLoader userCodeClassLoader) throws Exception ActionTask tempGeneratedActionTask = new PythonGeneratorActionTask(key, event, action, pythonGeneratorRef); tempGeneratedActionTask.setRunnerContext(runnerContext); - return tempGeneratedActionTask.invoke(userCodeClassLoader); + return tempGeneratedActionTask.invoke(userCodeClassLoader, executor); } return new ActionTaskResult( true, runnerContext.drainEvents(event.getSourceTimestamp()), null); } - - protected PythonActionExecutor getPythonActionExecutor() { - checkState(runnerContext != null && runnerContext instanceof PythonRunnerContextImpl); - return ((PythonRunnerContextImpl) runnerContext).getPythonActionExecutor(); - } } diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java index 96afa19e1..969cb8f31 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java @@ -20,6 +20,7 @@ import org.apache.flink.agents.api.Event; import org.apache.flink.agents.plan.actions.Action; import org.apache.flink.agents.runtime.operator.ActionTask; +import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor; /** An {@link ActionTask} wrapper a Python Generator to represent a code block in Python action. */ public class PythonGeneratorActionTask extends PythonActionTask { @@ -32,13 +33,13 @@ public PythonGeneratorActionTask( } @Override - public ActionTaskResult invoke(ClassLoader userCodeClassLoader) { + public ActionTaskResult invoke(ClassLoader userCodeClassLoader, PythonActionExecutor executor) { LOG.debug( "Try execute python generator action {} for event {} with key {}.", action.getName(), event, key); - boolean finished = getPythonActionExecutor().callPythonGenerator(pythonGeneratorRef); + boolean finished = executor.callPythonGenerator(pythonGeneratorRef); ActionTask generatedActionTask = finished ? null : this; return new ActionTaskResult( finished, diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java index 4c087891a..e0108fb8e 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java @@ -18,10 +18,11 @@ package org.apache.flink.agents.runtime.python.utils; import org.apache.flink.agents.plan.PythonFunction; -import org.apache.flink.agents.runtime.context.RunnerContextImpl; +import org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl; import org.apache.flink.agents.runtime.python.event.PythonEvent; import org.apache.flink.agents.runtime.utils.EventUtil; import pemja.core.PythonInterpreter; +import pemja.core.object.PyObject; import java.util.concurrent.atomic.AtomicLong; @@ -39,6 +40,9 @@ public class PythonActionExecutor { private static final String CREATE_FLINK_RUNNER_CONTEXT = "flink_runner_context.create_flink_runner_context"; + private static final String FLINK_RUNNER_CONTEXT_SWITCH_ACTION_CONTEXT = + "flink_runner_context.flink_runner_context_switch_action_context"; + // ========== ASYNC THREAD POOL =========== private static final String CREATE_ASYNC_THREAD_POOL = "flink_runner_context.create_async_thread_pool"; @@ -59,17 +63,21 @@ public class PythonActionExecutor { private final PythonInterpreter interpreter; private final String agentPlanJson; + private final PythonRunnerContextImpl runnerContext; private final JavaResourceAdapter javaResourceAdapter; private final String jobIdentifier; - private Object pythonAsyncThreadPool; + private PyObject pythonAsyncThreadPool; + private PyObject pythonRunnerContext; public PythonActionExecutor( PythonInterpreter interpreter, String agentPlanJson, JavaResourceAdapter javaResourceAdapter, + PythonRunnerContextImpl runnerContext, String jobIdentifier) { this.interpreter = interpreter; this.agentPlanJson = agentPlanJson; + this.runnerContext = runnerContext; this.javaResourceAdapter = javaResourceAdapter; this.jobIdentifier = jobIdentifier; } @@ -77,7 +85,16 @@ public PythonActionExecutor( public void open() throws Exception { interpreter.exec(PYTHON_IMPORTS); - pythonAsyncThreadPool = interpreter.invoke(CREATE_ASYNC_THREAD_POOL); + pythonAsyncThreadPool = (PyObject) interpreter.invoke(CREATE_ASYNC_THREAD_POOL); + + pythonRunnerContext = + (PyObject) + interpreter.invoke( + CREATE_FLINK_RUNNER_CONTEXT, + runnerContext, + agentPlanJson, + pythonAsyncThreadPool, + javaResourceAdapter); } /** @@ -90,29 +107,21 @@ public void open() throws Exception { * @return The name of the Python generator variable. It may be null if the Python function does * not return a generator. */ - public String executePythonFunction( - PythonFunction function, - PythonEvent event, - RunnerContextImpl runnerContext, - int hashOfKey) + public String executePythonFunction(PythonFunction function, PythonEvent event, int hashOfKey) throws Exception { runnerContext.checkNoPendingEvents(); function.setInterpreter(interpreter); - Object pythonRunnerContextObject = - interpreter.invoke( - CREATE_FLINK_RUNNER_CONTEXT, - runnerContext, - agentPlanJson, - pythonAsyncThreadPool, - javaResourceAdapter, - jobIdentifier, - hashOfKey); + interpreter.invoke( + FLINK_RUNNER_CONTEXT_SWITCH_ACTION_CONTEXT, + pythonRunnerContext, + jobIdentifier, + hashOfKey); Object pythonEventObject = interpreter.invoke(CONVERT_TO_PYTHON_OBJECT, event.getEvent()); try { - Object calledResult = function.call(pythonEventObject, pythonRunnerContextObject); + Object calledResult = function.call(pythonEventObject, pythonRunnerContext); if (calledResult == null) { return null; } else {