From 1f1d67cea2575dc12b09ef899023d9f6fcdc27f7 Mon Sep 17 00:00:00 2001 From: Diego Tavares Date: Thu, 14 Sep 2023 10:45:50 -0700 Subject: [PATCH 1/6] Change OOM protection logic The current logic relies on hardcoded values which are not suitable for large hosts. The new logic takes into account the size of hosts and also tries to be more aggressive with misbehaving frames. Prevent host from entering an OOM state where oom-killer might start killing important OS processes. The kill logic will kick in one of the following conditions is met: - Host has less than OOM_MEMORY_LEFT_THRESHOLD_PERCENT memory available - A frame is taking more than OOM_FRAME_OVERBOARD_PERCENT of what it had reserved For frames that are using more than they had reserved but not above the threshold, negotiate expanding the reservations with other frames on the same host (cherry picked from commit e88a5295f23bd927614de6d5af6a09d496d3e6ac) Signed-off-by: Diego Tavares --- .../imageworks/spcue/PrometheusMetrics.java | 57 +++ .../com/imageworks/spcue/dao/HostDao.java | 9 - .../spcue/dao/postgres/HostDaoJdbc.java | 9 - .../spcue/dao/postgres/ProcDaoJdbc.java | 2 +- .../spcue/dispatcher/Dispatcher.java | 10 +- .../spcue/dispatcher/HostReportHandler.java | 245 ++++++++---- .../imageworks/spcue/service/HostManager.java | 9 +- .../spcue/service/HostManagerService.java | 5 +- .../spcue/test/dao/postgres/HostDaoTests.java | 18 - .../dispatcher/HostReportHandlerGpuTests.java | 15 +- .../dispatcher/HostReportHandlerTests.java | 372 ++++++++++++++++-- .../conf/jobspec/jobspec_multiple_frames.xml | 48 +++ cuebot/src/test/resources/opencue.properties | 2 +- 13 files changed, 628 insertions(+), 173 deletions(-) create mode 100644 cuebot/src/main/java/com/imageworks/spcue/PrometheusMetrics.java create mode 100644 cuebot/src/test/resources/conf/jobspec/jobspec_multiple_frames.xml diff --git a/cuebot/src/main/java/com/imageworks/spcue/PrometheusMetrics.java b/cuebot/src/main/java/com/imageworks/spcue/PrometheusMetrics.java new file mode 100644 index 000000000..14f79cea5 --- /dev/null +++ b/cuebot/src/main/java/com/imageworks/spcue/PrometheusMetrics.java @@ -0,0 +1,57 @@ +package com.imageworks.spcue; + +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; +import io.prometheus.client.Histogram; + +public class PrometheusMetrics { + private static final Counter findJobsByShowQueryCountMetric = Counter.build() + .name("cue_find_jobs_by_show_count") + .help("Count the occurrences of the query FIND_JOBS_BY_SHOW.") + .labelNames("env", "cuebot_hosts") + .register(); + private static final Gauge bookingDurationMillisMetric = Gauge.build() + .name("cue_booking_durations_in_millis") + .help("Register duration of booking steps in milliseconds.") + .labelNames("env", "cuebot_host", "stage_desc") + .register(); + private static final Histogram bookingDurationMillisHistogramMetric = Histogram.build() + .name("cue_booking_durations_histogram_in_millis") + .help("Register a summary of duration of booking steps in milliseconds.") + .labelNames("env", "cuebot_host", "stage_desc") + .register(); + + private static final Counter frameOomKilledCounter = Counter.build() + .name("cue_frame_oom_killed_counter") + .help("Number of frames killed for being above memory on a host under OOM") + .labelNames("env", "cuebot_host", "render_node") + .register(); + + private String deployment_environment; + private String cuebot_host; + + public PrometheusMetrics() { + this.cuebot_host = System.getenv("NODE_HOSTNAME"); + if (this.cuebot_host == null) { + this.cuebot_host = "undefined"; + } + // Use the same environment set for SENTRY as the prometheus environment + this.deployment_environment = System.getenv("SENTRY_ENVIRONMENT"); + if (this.deployment_environment == null) { + this.deployment_environment = "undefined"; + } + } + + public void setBookingDurationMetric(String stage_desc, double value) { + bookingDurationMillisMetric.labels(this.deployment_environment, this.cuebot_host, stage_desc).set(value); + bookingDurationMillisHistogramMetric.labels(this.deployment_environment, this.cuebot_host, stage_desc).observe(value); + } + + public void incrementFindJobsByShowQueryCountMetric() { + findJobsByShowQueryCountMetric.labels(this.deployment_environment, this.cuebot_host).inc(); + } + + public void incrementFrameOomKilledCounter(String renderNode) { + frameOomKilledCounter.labels(this.deployment_environment, this.cuebot_host, renderNode).inc(); + } +} \ No newline at end of file diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java b/cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java index 768bcdbd2..95036b15e 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java @@ -244,15 +244,6 @@ public interface HostDao { */ void updateThreadMode(HostInterface host, ThreadMode mode); - /** - * When a host is in kill mode that means its 256MB+ into the swap and the - * the worst memory offender is killed. - * - * @param h HostInterface - * @return boolean - */ - boolean isKillMode(HostInterface h); - /** * Update the specified host's hardware information. * diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java index 5c106335c..8e626303d 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/HostDaoJdbc.java @@ -605,15 +605,6 @@ public void updateHostOs(HostInterface host, String os) { os, host.getHostId()); } - @Override - public boolean isKillMode(HostInterface h) { - return getJdbcTemplate().queryForObject( - "SELECT COUNT(1) FROM host_stat WHERE pk_host = ? " + - "AND int_swap_total - int_swap_free > ? AND int_mem_free < ?", - Integer.class, h.getHostId(), Dispatcher.KILL_MODE_SWAP_THRESHOLD, - Dispatcher.KILL_MODE_MEM_THRESHOLD) > 0; - } - @Override public int getStrandedCoreUnits(HostInterface h) { try { diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java index 5af292fb3..8f6322690 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/ProcDaoJdbc.java @@ -564,7 +564,7 @@ public boolean increaseReservedMemory(ProcInterface p, long value) { value, p.getProcId(), value) == 1; } catch (Exception e) { // check by trigger erify_host_resources - throw new ResourceReservationFailureException("failed to increase memory reserveration for proc " + throw new ResourceReservationFailureException("failed to increase memory reservation for proc " + p.getProcId() + " to " + value + ", proc does not have that much memory to spare."); } } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java index 6ac703a41..fa7679830 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java @@ -108,13 +108,11 @@ public interface Dispatcher { // without being penalized for it. public static final long VIRTUAL_MEM_THRESHHOLD = CueUtil.GB2; - // The amount of swap that must be used before a host can go - // into kill mode. - public static final long KILL_MODE_SWAP_THRESHOLD = CueUtil.MB128; + // Percentage of used memory to consider a risk for triggering oom-killer + public static final double OOM_MAX_SAFE_USED_MEMORY_THRESHOLD = 0.95; - // When the amount of free memory drops below this point, the - // host can go into kill mode. - public static final long KILL_MODE_MEM_THRESHOLD = CueUtil.MB512; + // How much can a frame exceed its reserved memory + public static final double OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD = 0.25; // A higher number gets more deep booking but less spread on the cue. public static final int DEFAULT_MAX_FRAMES_PER_PASS = 4; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java index d763cce53..e9b163b0c 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java @@ -21,26 +21,19 @@ import java.sql.Timestamp; import java.util.ArrayList; -import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; +import com.imageworks.spcue.*; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.task.TaskRejectedException; import org.springframework.dao.DataAccessException; import org.springframework.dao.EmptyResultDataAccessException; -import com.imageworks.spcue.DispatchHost; -import com.imageworks.spcue.FrameInterface; -import com.imageworks.spcue.JobEntity; -import com.imageworks.spcue.LayerEntity; -import com.imageworks.spcue.LayerDetail; -import com.imageworks.spcue.LocalHostAssignment; -import com.imageworks.spcue.Source; -import com.imageworks.spcue.VirtualProc; import com.imageworks.spcue.dao.JobDao; import com.imageworks.spcue.dao.LayerDao; import com.imageworks.spcue.dispatcher.commands.DispatchBookHost; @@ -63,6 +56,8 @@ import com.imageworks.spcue.util.CueExceptionUtil; import com.imageworks.spcue.util.CueUtil; +import static com.imageworks.spcue.dispatcher.Dispatcher.*; + public class HostReportHandler { private static final Logger logger = LogManager.getLogger(HostReportHandler.class); @@ -199,9 +194,9 @@ public void handleHostReport(HostReport report, boolean isBoot) { killTimedOutFrames(report); /* - * Increase/decreased reserved memory. + * Prevent OOM (Out-Of-Memory) issues on the host and manage frame reserved memory */ - handleMemoryReservations(host, report); + handleMemoryUsage(host, report); /* * The checks are done in order of least CPU intensive to @@ -389,103 +384,187 @@ private void changeLockState(DispatchHost host, CoreDetail coreInfo) { } /** - * Handle memory reservations for the given host. This will re-balance memory - * reservations on the machine and kill and frames that are out of control. - * + * Prevent host from entering an OOM state where oom-killer might start killing important OS processes. + * The kill logic will kick in one of the following conditions is met: + * - Host has less than OOM_MEMORY_LEFT_THRESHOLD_PERCENT memory available + * - A frame is taking more than OOM_FRAME_OVERBOARD_PERCENT of what it had reserved + * For frames that are using more than they had reserved but not above the threshold, negotiate expanding + * the reservations with other frames on the same host * @param host * @param report */ - private void handleMemoryReservations(final DispatchHost host, final HostReport report) { + private void handleMemoryUsage(final DispatchHost host, final HostReport report) { + RenderHost renderHost = report.getHost(); + List runningFrames = report.getFramesList(); + + boolean memoryWarning = renderHost.getTotalMem() > 0 && + ((double)renderHost.getFreeMem()/renderHost.getTotalMem() < + (1.0 - OOM_MAX_SAFE_USED_MEMORY_THRESHOLD)); + + if (memoryWarning) { + VirtualProc killedProc = killWorstMemoryOffender(host); + long memoryAvailable = renderHost.getFreeMem(); + long minSafeMemoryAvailable = (long)(renderHost.getTotalMem() * (1.0 - OOM_MAX_SAFE_USED_MEMORY_THRESHOLD)); + // Some extra protection for this possibly unbound loop + int unboundProtectionLimit = 10; + while (killedProc != null && unboundProtectionLimit > 0) { + memoryAvailable = memoryAvailable + killedProc.memoryUsed; + + // If killing this proc solved the memory issue, stop the attack + if (memoryAvailable >= minSafeMemoryAvailable) { + break; + } + killedProc = killWorstMemoryOffender(host); + unboundProtectionLimit -= 1; + } + } else { + // When no mass cleaning was required, check for frames going overboard + // if frames didn't go overboard, manage its reservations trying to increase + // them accordingly + for (final RunningFrameInfo frame: runningFrames) { + if (isFrameOverboard(frame)) { + if (!killFrame(frame, host.getName())) { + logger.warn("Frame " + frame.getJobName() + "." + frame.getFrameName() + + " is overboard but could not be killed"); + } + } else { + handleMemoryReservations(frame); + } + } + } + } - // TODO: GPU: Need to keep frames from growing into space reserved for GPU frames - // However all this is done in the database without a chance to edit the values here + private boolean killFrame(RunningFrameInfo frame, String hostName) { + try { + VirtualProc proc = hostManager.getVirtualProc(frame.getResourceId()); - /* - * Check to see if we enable kill mode to free up memory. - */ - boolean killMode = hostManager.isSwapping(host); + // Don't mess with localDispatch procs + if (proc.isLocalDispatch) { + return false; + } - for (final RunningFrameInfo f: report.getFramesList()) { + logger.info("Killing frame on " + frame.getJobName() + "." + frame.getFrameName() + + ", using too much memory."); + DispatchSupport.killedOffenderProcs.incrementAndGet(); - VirtualProc proc = null; - try { - proc = hostManager.getVirtualProc(f.getResourceId()); + if (!dispatcher.isTestMode()) { + jobManagerSupport.kill(proc, new Source("This frame is using way more than it had reserved.")); + } + return true; + } catch (EmptyResultDataAccessException e) { + return false; + } + } - // TODO: handle memory management for local dispatches - // Skip local dispatches for now. - if (proc.isLocalDispatch) { - continue; - } + /** + * Kill proc with the worst user/reserved memory ratio. + * + * @param host + * @return killed proc, or null if none could be found + */ + private VirtualProc killWorstMemoryOffender(final DispatchHost host) { + VirtualProc proc; + try { + proc = hostManager.getWorstMemoryOffender(host); + } + catch (EmptyResultDataAccessException e) { + logger.error(host.name + " is under OOM and no proc is running on it."); + return null; + } + logger.info("Killing frame on " + proc.getName() + ", host is under stress."); + DispatchSupport.killedOffenderProcs.incrementAndGet(); - if (f.getRss() > host.memory) { - try{ - logger.info("Killing frame " + f.getJobName() + "/" + f.getFrameName() + ", " - + proc.getName() + " was OOM"); - try { - killQueue.execute(new DispatchRqdKillFrame(proc, "The frame required " + - CueUtil.KbToMb(f.getRss()) + " but the machine only has " + - CueUtil.KbToMb(host.memory), rqdClient)); - } catch (TaskRejectedException e) { - logger.warn("Unable to queue RQD kill, task rejected, " + e); - } - DispatchSupport.killedOomProcs.incrementAndGet(); - } catch (Exception e) { - logger.info("failed to kill frame on " + proc.getName() + - "," + e); - } - } + if (!dispatcher.isTestMode()) { + jobManagerSupport.kill(proc, new Source("The host was dangerously low on memory and swapping.")); + prometheusMetrics.incrementFrameOomKilledCounter(host.getName()); + } - if (dispatchSupport.increaseReservedMemory(proc, f.getRss())) { - proc.memoryReserved = f.getRss(); - logger.info("frame " + f.getFrameName() + " on job " + f.getJobName() - + " increased its reserved memory to " + - CueUtil.KbToMb(f.getRss())); - } + return proc; + } - } catch (ResourceReservationFailureException e) { + /** + * Check frame memory usage comparing the amount used with the amount it had reserved + * @param frame + * @return + */ + private boolean isFrameOverboard(final RunningFrameInfo frame) { + double rss = (double)frame.getRss(); + double maxRss = (double)frame.getMaxRss(); + final double MAX_RSS_OVERBOARD_THRESHOLD = OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD * 2; + final double RSS_AVAILABLE_FOR_MAX_RSS_TRIGGER = 0.1; + + try { + VirtualProc proc = hostManager.getVirtualProc(frame.getResourceId()); + double reserved = (double)proc.memoryReserved; + + // Last memory report is higher than the threshold + if (isOverboard(rss, reserved, OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD)) { + return true; + } + // If rss is not overboard, handle the situation where the frame might be going overboard from + // time to time but the last report wasn't during a spike. For this case, consider a combination + // of rss and maxRss. maxRss > 2 * threshold and rss > 0.9 + else { + return isOverboard(maxRss, reserved, MAX_RSS_OVERBOARD_THRESHOLD) && + isOverboard(rss, reserved, -RSS_AVAILABLE_FOR_MAX_RSS_TRIGGER); + } + } catch (EmptyResultDataAccessException e) { + logger.info("HostReportHandler(isFrameOverboard): Virtual proc for frame " + + frame.getFrameName() + " on job " + frame.getJobName() + " doesn't exist on the database"); + // Not able to mark the frame overboard is it couldn't be found on the db. + // Proc accounting (verifyRunningProc) should take care of it + return false; + } + } + + private boolean isOverboard(double value, double total, double threshold) { + return value/total >= (1 + threshold); + } + + /** + * Handle memory reservations for the given frame + * + * @param frame + */ + private void handleMemoryReservations(final RunningFrameInfo frame) { + VirtualProc proc = null; + try { + proc = hostManager.getVirtualProc(frame.getResourceId()); - long memNeeded = f.getRss() - proc.memoryReserved; + if (proc.isLocalDispatch) { + return; + } - logger.info("frame " + f.getFrameName() + " on job " + f.getJobName() + if (dispatchSupport.increaseReservedMemory(proc, frame.getRss())) { + proc.memoryReserved = frame.getRss(); + logger.info("frame " + frame.getFrameName() + " on job " + frame.getJobName() + + " increased its reserved memory to " + + CueUtil.KbToMb(frame.getRss())); + } + } catch (ResourceReservationFailureException e) { + if (proc != null) { + long memNeeded = frame.getRss() - proc.memoryReserved; + logger.info("frame " + frame.getFrameName() + " on job " + frame.getJobName() + "was unable to reserve an additional " + CueUtil.KbToMb(memNeeded) + "on proc " + proc.getName() + ", " + e); - try { if (dispatchSupport.balanceReservedMemory(proc, memNeeded)) { - proc.memoryReserved = f.getRss(); + proc.memoryReserved = frame.getRss(); logger.info("was able to balance host: " + proc.getName()); - } - else { + } else { logger.info("failed to balance host: " + proc.getName()); } } catch (Exception ex) { logger.warn("failed to balance host: " + proc.getName() + ", " + e); } - } catch (EmptyResultDataAccessException e) { - logger.info("HostReportHandler: frame " + f.getFrameName() + - " on job " + f.getJobName() + - " was unable be processed" + - " because the proc could not be found"); - } - } - - if (killMode) { - VirtualProc proc; - try { - proc = hostManager.getWorstMemoryOffender(host); - } - catch (EmptyResultDataAccessException e) { - logger.info(host.name + " is swapping and no proc is running on it."); - return; + } else { + logger.info("frame " + frame.getFrameName() + " on job " + frame.getJobName() + + "was unable to reserve an additional memory. Proc could not be found"); } - - logger.info("Killing frame on " + - proc.getName() + ", host is distressed."); - - DispatchSupport.killedOffenderProcs.incrementAndGet(); - jobManagerSupport.kill(proc, new Source( - "The host was dangerously low on memory and swapping.")); + } catch (EmptyResultDataAccessException e) { + logger.info("HostReportHandler: Memory reservations for frame " + frame.getFrameName() + + " on job " + frame.getJobName() + " proc could not be found"); } } diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java b/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java index 8b176c77e..00b90654c 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java @@ -63,13 +63,12 @@ public interface HostManager { void setHostState(HostInterface host, HardwareState state); /** - * Return true if the host is swapping hard enough - * that killing frames will save the entire machine. + * Updates the freeMcp of a host. * - * @param host - * @return + * @param host HostInterface + * @param freeMcp Long */ - boolean isSwapping(HostInterface host); + void setHostFreeMcp(HostInterface host, Long freeMcp); DispatchHost createHost(HostReport report); DispatchHost createHost(RenderHost host); diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java b/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java index a7c5b0729..da17458e2 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java @@ -94,9 +94,8 @@ public void setHostState(HostInterface host, HardwareState state) { } @Override - @Transactional(propagation = Propagation.REQUIRED, readOnly=true) - public boolean isSwapping(HostInterface host) { - return hostDao.isKillMode(host); + public void setHostFreeMcp(HostInterface host, Long freeMcp) { + hostDao.updateHostFreeMcp(host, freeMcp); } public void rebootWhenIdle(HostInterface host) { diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/HostDaoTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/HostDaoTests.java index df965893b..ea9fbe0c2 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/HostDaoTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dao/postgres/HostDaoTests.java @@ -330,24 +330,6 @@ public void testIsHostLocked() { assertEquals(hostDao.isHostLocked(host),true); } - @Test - @Transactional - @Rollback(true) - public void testIsKillMode() { - hostDao.insertRenderHost(buildRenderHost(TEST_HOST), - hostManager.getDefaultAllocationDetail(), - false); - - HostEntity host = hostDao.findHostDetail(TEST_HOST); - assertFalse(hostDao.isKillMode(host)); - - jdbcTemplate.update( - "UPDATE host_stat SET int_swap_free = ?, int_mem_free = ? WHERE pk_host = ?", - CueUtil.MB256, CueUtil.MB256, host.getHostId()); - - assertTrue(hostDao.isKillMode(host)); - } - @Test @Transactional @Rollback(true) diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerGpuTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerGpuTests.java index dee9d0792..5daf14fe8 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerGpuTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerGpuTests.java @@ -81,13 +81,14 @@ private static RenderHost getRenderHost() { return RenderHost.newBuilder() .setName(HOSTNAME) .setBootTime(1192369572) - .setFreeMcp(76020) - .setFreeMem(53500) - .setFreeSwap(20760) + // The minimum amount of free space in the /mcp directory to book a host. + .setFreeMcp(CueUtil.GB) + .setFreeMem(CueUtil.GB8) + .setFreeSwap(CueUtil.GB2) .setLoad(0) - .setTotalMcp(195430) - .setTotalMem(1048576L * 4096) - .setTotalSwap(20960) + .setTotalMcp(CueUtil.GB4) + .setTotalMem(CueUtil.GB8) + .setTotalSwap(CueUtil.GB2) .setNimbyEnabled(false) .setNumProcs(2) .setCoresPerProc(100) @@ -114,7 +115,7 @@ public void testHandleHostReport() { hostReportHandler.handleHostReport(report, true); DispatchHost host = getHost(); assertEquals(host.lockState, LockState.OPEN); - assertEquals(host.memory, 4294443008L); + assertEquals(host.memory, CueUtil.GB8 - 524288); assertEquals(host.gpus, 64); assertEquals(host.idleGpus, 64); assertEquals(host.gpuMemory, 1048576L * 2048); diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java index d27f76c32..93b943a3d 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java @@ -21,9 +21,12 @@ import java.io.File; import java.sql.Timestamp; +import java.util.Arrays; import java.util.List; import javax.annotation.Resource; +import com.imageworks.spcue.dispatcher.DispatchSupport; +import com.imageworks.spcue.dispatcher.HostReportQueue; import org.junit.Before; import org.junit.Test; import org.springframework.test.annotation.Rollback; @@ -31,6 +34,7 @@ import org.springframework.transaction.annotation.Transactional; import com.imageworks.spcue.AllocationEntity; +import com.imageworks.spcue.CommentDetail; import com.imageworks.spcue.DispatchHost; import com.imageworks.spcue.dispatcher.Dispatcher; import com.imageworks.spcue.dispatcher.HostReportHandler; @@ -43,6 +47,7 @@ import com.imageworks.spcue.grpc.report.RenderHost; import com.imageworks.spcue.grpc.report.RunningFrameInfo; import com.imageworks.spcue.service.AdminManager; +import com.imageworks.spcue.service.CommentManager; import com.imageworks.spcue.service.HostManager; import com.imageworks.spcue.service.JobLauncher; import com.imageworks.spcue.service.JobManager; @@ -50,7 +55,10 @@ import com.imageworks.spcue.util.CueUtil; import com.imageworks.spcue.VirtualProc; +import java.util.UUID; + import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; @ContextConfiguration public class HostReportHandlerTests extends TransactionalTest { @@ -73,8 +81,16 @@ public class HostReportHandlerTests extends TransactionalTest { @Resource JobManager jobManager; + @Resource + CommentManager commentManager; + private static final String HOSTNAME = "beta"; private static final String NEW_HOSTNAME = "gamma"; + private String hostname; + private String hostname2; + private static final String SUBJECT_COMMENT_FULL_MCP_DIR = "Host set to REPAIR for not having enough storage " + + "space on /mcp"; + private static final String CUEBOT_COMMENT_USER = "cuebot"; @Before public void setTestMode() { @@ -83,7 +99,11 @@ public void setTestMode() { @Before public void createHost() { - hostManager.createHost(getRenderHost(), + hostname = UUID.randomUUID().toString().substring(0, 8); + hostname2 = UUID.randomUUID().toString().substring(0, 8); + hostManager.createHost(getRenderHost(hostname), + adminManager.findAllocationDetail("spi","general")); + hostManager.createHost(getRenderHost(hostname2), adminManager.findAllocationDetail("spi","general")); } @@ -96,44 +116,50 @@ private static CoreDetail getCoreDetail(int total, int idle, int booked, int loc .build(); } - private DispatchHost getHost() { - return hostManager.findDispatchHost(HOSTNAME); + private DispatchHost getHost(String hostname) { + return hostManager.findDispatchHost(hostname); } - private static RenderHost getRenderHost() { + private static RenderHost.Builder getRenderHostBuilder(String hostname) { return RenderHost.newBuilder() - .setName(HOSTNAME) + .setName(hostname) .setBootTime(1192369572) - .setFreeMcp(76020) - .setFreeMem((int) CueUtil.GB8) - .setFreeSwap(20760) + // The minimum amount of free space in the /mcp directory to book a host. + .setFreeMcp(CueUtil.GB) + .setFreeMem(CueUtil.GB8) + .setFreeSwap(CueUtil.GB2) .setLoad(0) - .setTotalMcp(195430) + .setTotalMcp(CueUtil.GB4) .setTotalMem(CueUtil.GB8) .setTotalSwap(CueUtil.GB2) .setNimbyEnabled(false) - .setNumProcs(2) + .setNumProcs(16) .setCoresPerProc(100) .addTags("test") .setState(HardwareState.UP) .setFacility("spi") .putAttributes("SP_OS", "Linux") - .setFreeGpuMem((int) CueUtil.MB512) - .setTotalGpuMem((int) CueUtil.MB512) - .build(); + .setNumGpus(0) + .setFreeGpuMem(0) + .setTotalGpuMem(0); + } + + private static RenderHost getRenderHost(String hostname) { + return getRenderHostBuilder(hostname).build(); } private static RenderHost getNewRenderHost(String tags) { return RenderHost.newBuilder() .setName(NEW_HOSTNAME) .setBootTime(1192369572) - .setFreeMcp(76020) - .setFreeMem(53500) - .setFreeSwap(20760) + // The minimum amount of free space in the /mcp directory to book a host. + .setFreeMcp(CueUtil.GB) + .setFreeMem(CueUtil.GB8) + .setFreeSwap(CueUtil.GB2) .setLoad(0) .setTotalMcp(195430) - .setTotalMem(8173264) - .setTotalSwap(20960) + .setTotalMem(CueUtil.GB8) + .setTotalSwap(CueUtil.GB2) .setNimbyEnabled(false) .setNumProcs(2) .setCoresPerProc(100) @@ -149,17 +175,40 @@ private static RenderHost getNewRenderHost(String tags) { @Test @Transactional @Rollback(true) - public void testHandleHostReport() { - boolean isBoot = false; + public void testHandleHostReport() throws InterruptedException { CoreDetail cores = getCoreDetail(200, 200, 0, 0); - HostReport report = HostReport.newBuilder() - .setHost(getRenderHost()) + HostReport report1 = HostReport.newBuilder() + .setHost(getRenderHost(hostname)) .setCoreInfo(cores) .build(); + HostReport report2 = HostReport.newBuilder() + .setHost(getRenderHost(hostname2)) + .setCoreInfo(cores) + .build(); + HostReport report1_2 = HostReport.newBuilder() + .setHost(getRenderHost(hostname)) + .setCoreInfo(getCoreDetail(200, 200, 100, 0)) + .build(); - hostReportHandler.handleHostReport(report, isBoot); - DispatchHost host = getHost(); - assertEquals(host.lockState, LockState.OPEN); + hostReportHandler.handleHostReport(report1, false, System.currentTimeMillis()); + DispatchHost host = getHost(hostname); + assertEquals(LockState.OPEN, host.lockState); + assertEquals(HardwareState.UP, host.hardwareState); + hostReportHandler.handleHostReport(report1_2, false, System.currentTimeMillis()); + host = getHost(hostname); + assertEquals(HardwareState.UP, host.hardwareState); + + // Test Queue thread handling + HostReportQueue queue = hostReportHandler.getReportQueue(); + // Make sure jobs flow normally without any nullpointer exception + // Expecting results from a ThreadPool based class on JUnit is tricky + // A future test will be developed in the future to better address the behavior of + // this feature + hostReportHandler.queueHostReport(report1); // HOSTNAME + hostReportHandler.queueHostReport(report2); // HOSTNAME2 + hostReportHandler.queueHostReport(report1); // HOSTNAME + hostReportHandler.queueHostReport(report1); // HOSTNAME + hostReportHandler.queueHostReport(report1_2); // HOSTNAME } @Test @@ -183,7 +232,7 @@ public void testHandleHostReportWithNewAllocation() { .setCoreInfo(cores) .build(); - hostReportHandler.handleHostReport(report, isBoot); + hostReportHandler.handleHostReport(report, isBoot, System.currentTimeMillis()); DispatchHost host = hostManager.findDispatchHost(NEW_HOSTNAME); assertEquals(host.getAllocationId(), detail.id); } @@ -203,7 +252,7 @@ public void testHandleHostReportWithExistentAllocation() { .setCoreInfo(cores) .build(); - hostReportHandler.handleHostReport(report, isBoot); + hostReportHandler.handleHostReport(report, isBoot, System.currentTimeMillis()); DispatchHost host = hostManager.findDispatchHost(NEW_HOSTNAME); assertEquals(host.getAllocationId(), alloc.id); } @@ -223,11 +272,143 @@ public void testHandleHostReportWithNonExistentTags() { .setCoreInfo(cores) .build(); - hostReportHandler.handleHostReport(report, isBoot); + hostReportHandler.handleHostReport(report, isBoot, System.currentTimeMillis()); DispatchHost host = hostManager.findDispatchHost(NEW_HOSTNAME); assertEquals(host.getAllocationId(), alloc.id); } + @Test + @Transactional + @Rollback(true) + public void testHandleHostReportWithFullMCPDirectories() { + // Create CoreDetail + CoreDetail cores = getCoreDetail(200, 200, 0, 0); + + /* + * Test 1: + * Precondition: + * - HardwareState=UP + * Action: + * - Receives a HostReport with freeMCP < dispatcher.min_bookable_free_mcp_kb (opencue.properties) + * Postcondition: + * - Host hardwareState changes to REPAIR + * - A comment is created with subject=SUBJECT_COMMENT_FULL_MCP_DIR and user=CUEBOT_COMMENT_USER + * */ + // Create HostReport + HostReport report1 = HostReport.newBuilder() + .setHost(getRenderHostBuilder(hostname).setFreeMcp(1024L).build()) + .setCoreInfo(cores) + .build(); + // Call handleHostReport() => Create the comment with subject=SUBJECT_COMMENT_FULL_MCP_DIR and change the host's + // hardwareState to REPAIR + hostReportHandler.handleHostReport(report1, false, System.currentTimeMillis()); + // Get host + DispatchHost host = getHost(hostname); + // Get list of comments by host, user, and subject + List comments = commentManager.getCommentsByHostUserAndSubject(host, CUEBOT_COMMENT_USER, + SUBJECT_COMMENT_FULL_MCP_DIR); + // Check if there is 1 comment + assertEquals(comments.size(), 1); + // Get host comment + CommentDetail comment = comments.get(0); + // Check if the comment has the user = CUEBOT_COMMENT_USER + assertEquals(comment.user, CUEBOT_COMMENT_USER); + // Check if the comment has the subject = SUBJECT_COMMENT_FULL_MCP_DIR + assertEquals(comment.subject, SUBJECT_COMMENT_FULL_MCP_DIR); + // Check host lock state + assertEquals(LockState.OPEN, host.lockState); + // Check if host hardware state is REPAIR + assertEquals(HardwareState.REPAIR, host.hardwareState); + // Test Queue thread handling + HostReportQueue queue = hostReportHandler.getReportQueue(); + // Make sure jobs flow normally without any nullpointer exception + hostReportHandler.queueHostReport(report1); // HOSTNAME + hostReportHandler.queueHostReport(report1); // HOSTNAME + + /* + * Test 2: + * Precondition: + * - HardwareState=REPAIR + * - There is a comment for the host with subject=SUBJECT_COMMENT_FULL_MCP_DIR and user=CUEBOT_COMMENT_USER + * Action: + * - Receives a HostReport with freeMCP >= dispatcher.min_bookable_free_mcp_kb (opencue.properties) + * Postcondition: + * - Host hardwareState changes to UP + * - Comment with subject=SUBJECT_COMMENT_FULL_MCP_DIR and user=CUEBOT_COMMENT_USER gets deleted + * */ + // Set the host freeMcp to the minimum size required = 1GB (1048576 KB) + HostReport report2 = HostReport.newBuilder() + .setHost(getRenderHostBuilder(hostname).setFreeMcp(CueUtil.GB).build()) + .setCoreInfo(cores) + .build(); + // Call handleHostReport() => Delete the comment with subject=SUBJECT_COMMENT_FULL_MCP_DIR and change the host's + // hardwareState to UP + hostReportHandler.handleHostReport(report2, false, System.currentTimeMillis()); + // Get host + host = getHost(hostname); + // Get list of comments by host, user, and subject + comments = commentManager.getCommentsByHostUserAndSubject(host, CUEBOT_COMMENT_USER, + SUBJECT_COMMENT_FULL_MCP_DIR); + // Check if there is no comment associated with the host + assertEquals(comments.size(), 0); + // Check host lock state + assertEquals(LockState.OPEN, host.lockState); + // Check if host hardware state is UP + assertEquals(HardwareState.UP, host.hardwareState); + // Test Queue thread handling + queue = hostReportHandler.getReportQueue(); + // Make sure jobs flow normally without any nullpointer exception + hostReportHandler.queueHostReport(report1); // HOSTNAME + hostReportHandler.queueHostReport(report1); // HOSTNAME + } + + @Test + @Transactional + @Rollback(true) + public void testHandleHostReportWithHardwareStateRepairNotRelatedToFullMCPdirectories() { + // Create CoreDetail + CoreDetail cores = getCoreDetail(200, 200, 0, 0); + + /* + * Test if host.hardwareState == HardwareState.REPAIR + * (Not related to freeMcp < dispatcher.min_bookable_free_mcp_kb (opencue.properties)) + * + * - There is no comment with subject=SUBJECT_COMMENT_FULL_MCP_DIR and user=CUEBOT_COMMENT_USER associated with + * the host + * The host.hardwareState continue as HardwareState.REPAIR + * */ + // Create HostReport + HostReport report = HostReport.newBuilder() + .setHost(getRenderHostBuilder(hostname).setFreeMcp(CueUtil.GB).build()) + .setCoreInfo(cores) + .build(); + // Get host + DispatchHost host = getHost(hostname); + // Host's HardwareState set to REPAIR + hostManager.setHostState(host, HardwareState.REPAIR); + host.hardwareState = HardwareState.REPAIR; + // Get list of comments by host, user, and subject + List hostComments = commentManager.getCommentsByHostUserAndSubject(host, CUEBOT_COMMENT_USER, + SUBJECT_COMMENT_FULL_MCP_DIR); + // Check if there is no comment + assertEquals(hostComments.size(), 0); + // There is no comment to delete + boolean commentsDeleted = commentManager.deleteCommentByHostUserAndSubject(host, + CUEBOT_COMMENT_USER, SUBJECT_COMMENT_FULL_MCP_DIR); + assertFalse(commentsDeleted); + // Call handleHostReport() + hostReportHandler.handleHostReport(report, false, System.currentTimeMillis()); + // Check host lock state + assertEquals(LockState.OPEN, host.lockState); + // Check if host hardware state is REPAIR + assertEquals(HardwareState.REPAIR, host.hardwareState); + // Test Queue thread handling + HostReportQueue queueThread = hostReportHandler.getReportQueue(); + // Make sure jobs flow normally without any nullpointer exception + hostReportHandler.queueHostReport(report); // HOSTNAME + hostReportHandler.queueHostReport(report); // HOSTNAME + } + @Test @Transactional @Rollback(true) @@ -235,7 +416,7 @@ public void testMemoryAndLlu() { jobLauncher.testMode = true; jobLauncher.launch(new File("src/test/resources/conf/jobspec/jobspec_simple.xml")); - DispatchHost host = getHost(); + DispatchHost host = getHost(hostname); List procs = dispatcher.dispatchHost(host); assertEquals(1, procs.size()); VirtualProc proc = procs.get(0); @@ -252,16 +433,145 @@ public void testMemoryAndLlu() { .setMaxRss(420000) .build(); HostReport report = HostReport.newBuilder() - .setHost(getRenderHost()) + .setHost(getRenderHost(hostname)) .setCoreInfo(cores) .addFrames(info) .build(); - hostReportHandler.handleHostReport(report, false); + hostReportHandler.handleHostReport(report, false, System.currentTimeMillis()); FrameDetail frame = jobManager.getFrameDetail(proc.getFrameId()); assertEquals(frame.dateLLU, new Timestamp(now / 1000 * 1000)); assertEquals(420000, frame.maxRss); } + + @Test + @Transactional + @Rollback(true) + public void testMemoryAggressionRss() { + jobLauncher.testMode = true; + jobLauncher.launch(new File("src/test/resources/conf/jobspec/jobspec_simple.xml")); + + DispatchHost host = getHost(hostname); + List procs = dispatcher.dispatchHost(host); + assertEquals(1, procs.size()); + VirtualProc proc = procs.get(0); + + long memoryOverboard = (long) Math.ceil((double) proc.memoryReserved * + (1.0 + Dispatcher.OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD)); + + // Test rss overboard + RunningFrameInfo info = RunningFrameInfo.newBuilder() + .setJobId(proc.getJobId()) + .setLayerId(proc.getLayerId()) + .setFrameId(proc.getFrameId()) + .setResourceId(proc.getProcId()) + .setRss(memoryOverboard) + .setMaxRss(memoryOverboard) + .build(); + HostReport report = HostReport.newBuilder() + .setHost(getRenderHost(hostname)) + .setCoreInfo(getCoreDetail(200, 200, 0, 0)) + .addFrames(info) + .build(); + + long killCount = DispatchSupport.killedOffenderProcs.get(); + hostReportHandler.handleHostReport(report, false, System.currentTimeMillis()); + assertEquals(killCount + 1, DispatchSupport.killedOffenderProcs.get()); + } + + @Test + @Transactional + @Rollback(true) + public void testMemoryAggressionMaxRss() { + jobLauncher.testMode = true; + jobLauncher.launch(new File("src/test/resources/conf/jobspec/jobspec_simple.xml")); + + DispatchHost host = getHost(hostname); + List procs = dispatcher.dispatchHost(host); + assertEquals(1, procs.size()); + VirtualProc proc = procs.get(0); + + long memoryOverboard = (long) Math.ceil((double) proc.memoryReserved * + (1.0 + (2 * Dispatcher.OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD))); + + // Test rss>90% and maxRss overboard + RunningFrameInfo info = RunningFrameInfo.newBuilder() + .setJobId(proc.getJobId()) + .setLayerId(proc.getLayerId()) + .setFrameId(proc.getFrameId()) + .setResourceId(proc.getProcId()) + .setRss((long)Math.ceil(0.95 * proc.memoryReserved)) + .setMaxRss(memoryOverboard) + .build(); + HostReport report = HostReport.newBuilder() + .setHost(getRenderHost(hostname)) + .setCoreInfo(getCoreDetail(200, 200, 0, 0)) + .addFrames(info) + .build(); + + long killCount = DispatchSupport.killedOffenderProcs.get(); + hostReportHandler.handleHostReport(report, false, System.currentTimeMillis()); + assertEquals(killCount + 1, DispatchSupport.killedOffenderProcs.get()); + } + + @Test + @Transactional + @Rollback(true) + public void testMemoryAggressionMemoryWarning() { + jobLauncher.testMode = true; + dispatcher.setTestMode(true); + jobLauncher.launch(new File("src/test/resources/conf/jobspec/jobspec_multiple_frames.xml")); + + DispatchHost host = getHost(hostname); + List procs = dispatcher.dispatchHost(host); + assertEquals(3, procs.size()); + VirtualProc proc1 = procs.get(0); + VirtualProc proc2 = procs.get(1); + VirtualProc proc3 = procs.get(2); + + // Ok + RunningFrameInfo info1 = RunningFrameInfo.newBuilder() + .setJobId(proc1.getJobId()) + .setLayerId(proc1.getLayerId()) + .setFrameId(proc1.getFrameId()) + .setResourceId(proc1.getProcId()) + .setRss(CueUtil.GB2) + .setMaxRss(CueUtil.GB2) + .build(); + + // Overboard Rss + RunningFrameInfo info2 = RunningFrameInfo.newBuilder() + .setJobId(proc2.getJobId()) + .setLayerId(proc2.getLayerId()) + .setFrameId(proc2.getFrameId()) + .setResourceId(proc2.getProcId()) + .setRss(CueUtil.GB4) + .setMaxRss(CueUtil.GB4) + .build(); + + // Overboard Rss + RunningFrameInfo info3 = RunningFrameInfo.newBuilder() + .setJobId(proc3.getJobId()) + .setLayerId(proc3.getLayerId()) + .setFrameId(proc3.getFrameId()) + .setResourceId(proc3.getProcId()) + .setRss(CueUtil.GB4) + .setMaxRss(CueUtil.GB4) + .build(); + + RenderHost hostAfterUpdate = getRenderHostBuilder(hostname).setFreeMem(0).build(); + + HostReport report = HostReport.newBuilder() + .setHost(hostAfterUpdate) + .setCoreInfo(getCoreDetail(200, 200, 0, 0)) + .addAllFrames(Arrays.asList(info1, info2, info3)) + .build(); + + // In this case, killing one job should be enough to ge the machine to a safe state + long killCount = DispatchSupport.killedOffenderProcs.get(); + hostReportHandler.handleHostReport(report, false, System.currentTimeMillis()); + assertEquals(killCount + 1, DispatchSupport.killedOffenderProcs.get()); + } } diff --git a/cuebot/src/test/resources/conf/jobspec/jobspec_multiple_frames.xml b/cuebot/src/test/resources/conf/jobspec/jobspec_multiple_frames.xml new file mode 100644 index 000000000..3baa0b22b --- /dev/null +++ b/cuebot/src/test/resources/conf/jobspec/jobspec_multiple_frames.xml @@ -0,0 +1,48 @@ + + + + + + + + + spi + pipe + default + testuser + 9860 + + + False + 2 + False + + + + echo hello + 1-3 + 1 + 2gb + + + shell + + + + + + diff --git a/cuebot/src/test/resources/opencue.properties b/cuebot/src/test/resources/opencue.properties index 334408470..0203ee6f7 100644 --- a/cuebot/src/test/resources/opencue.properties +++ b/cuebot/src/test/resources/opencue.properties @@ -38,7 +38,7 @@ dispatcher.job_query_max=20 dispatcher.job_lock_expire_seconds=2 dispatcher.job_lock_concurrency_level=3 dispatcher.frame_query_max=10 -dispatcher.job_frame_dispatch_max=2 +dispatcher.job_frame_dispatch_max=3 dispatcher.host_frame_dispatch_max=12 dispatcher.launch_queue.core_pool_size=1 From 8138e1226e358ce95b6643eaec3faaa34db13887 Mon Sep 17 00:00:00 2001 From: Diego Tavares Date: Thu, 14 Sep 2023 15:47:06 -0700 Subject: [PATCH 2/6] Remove wrongly commited file Signed-off-by: Diego Tavares --- .../imageworks/spcue/PrometheusMetrics.java | 57 ------------------- 1 file changed, 57 deletions(-) delete mode 100644 cuebot/src/main/java/com/imageworks/spcue/PrometheusMetrics.java diff --git a/cuebot/src/main/java/com/imageworks/spcue/PrometheusMetrics.java b/cuebot/src/main/java/com/imageworks/spcue/PrometheusMetrics.java deleted file mode 100644 index 14f79cea5..000000000 --- a/cuebot/src/main/java/com/imageworks/spcue/PrometheusMetrics.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.imageworks.spcue; - -import io.prometheus.client.Counter; -import io.prometheus.client.Gauge; -import io.prometheus.client.Histogram; - -public class PrometheusMetrics { - private static final Counter findJobsByShowQueryCountMetric = Counter.build() - .name("cue_find_jobs_by_show_count") - .help("Count the occurrences of the query FIND_JOBS_BY_SHOW.") - .labelNames("env", "cuebot_hosts") - .register(); - private static final Gauge bookingDurationMillisMetric = Gauge.build() - .name("cue_booking_durations_in_millis") - .help("Register duration of booking steps in milliseconds.") - .labelNames("env", "cuebot_host", "stage_desc") - .register(); - private static final Histogram bookingDurationMillisHistogramMetric = Histogram.build() - .name("cue_booking_durations_histogram_in_millis") - .help("Register a summary of duration of booking steps in milliseconds.") - .labelNames("env", "cuebot_host", "stage_desc") - .register(); - - private static final Counter frameOomKilledCounter = Counter.build() - .name("cue_frame_oom_killed_counter") - .help("Number of frames killed for being above memory on a host under OOM") - .labelNames("env", "cuebot_host", "render_node") - .register(); - - private String deployment_environment; - private String cuebot_host; - - public PrometheusMetrics() { - this.cuebot_host = System.getenv("NODE_HOSTNAME"); - if (this.cuebot_host == null) { - this.cuebot_host = "undefined"; - } - // Use the same environment set for SENTRY as the prometheus environment - this.deployment_environment = System.getenv("SENTRY_ENVIRONMENT"); - if (this.deployment_environment == null) { - this.deployment_environment = "undefined"; - } - } - - public void setBookingDurationMetric(String stage_desc, double value) { - bookingDurationMillisMetric.labels(this.deployment_environment, this.cuebot_host, stage_desc).set(value); - bookingDurationMillisHistogramMetric.labels(this.deployment_environment, this.cuebot_host, stage_desc).observe(value); - } - - public void incrementFindJobsByShowQueryCountMetric() { - findJobsByShowQueryCountMetric.labels(this.deployment_environment, this.cuebot_host).inc(); - } - - public void incrementFrameOomKilledCounter(String renderNode) { - frameOomKilledCounter.labels(this.deployment_environment, this.cuebot_host, renderNode).inc(); - } -} \ No newline at end of file From f1e282439657d8adcff6b2b3598864876bd5e7da Mon Sep 17 00:00:00 2001 From: Diego Tavares Date: Mon, 18 Sep 2023 16:53:08 -0700 Subject: [PATCH 3/6] Frames killed for OOM should be retried (cherry picked from commit b88f7bcb1ad43f83fb8357576c33483dc2bf4952) --- .../com/imageworks/spcue/dao/FrameDao.java | 7 ++ .../spcue/dao/postgres/FrameDaoJdbc.java | 18 ++++ .../spcue/dispatcher/DispatchSupport.java | 7 ++ .../dispatcher/DispatchSupportService.java | 6 ++ .../dispatcher/FrameCompleteHandler.java | 72 +++++++-------- .../spcue/dispatcher/HostReportHandler.java | 88 ++++++++++--------- .../commands/DispatchRqdKillFrame.java | 20 +---- .../commands/DispatchRqdKillFrameMemory.java | 69 +++++++++++++++ .../spring/applicationContext-service.xml | 1 - .../dispatcher/FrameCompleteHandlerTests.java | 6 +- .../dispatcher/HostReportHandlerTests.java | 43 ++++++++- 11 files changed, 237 insertions(+), 100 deletions(-) create mode 100644 cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java b/cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java index 7c2e3c050..8b68f970e 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/FrameDao.java @@ -202,6 +202,13 @@ boolean updateFrameStopped(FrameInterface frame, FrameState state, int exitStatu * @return */ boolean updateFrameCleared(FrameInterface frame); + /** + * Sets a frame exitStatus to EXIT_STATUS_MEMORY_FAILURE + * + * @param frame + * @return + */ + boolean updateFrameMemoryError(FrameInterface frame); /** * Sets a frame to an unreserved waiting state. diff --git a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java index 3c752ad96..21c197a3b 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dao/postgres/FrameDaoJdbc.java @@ -155,6 +155,24 @@ public boolean updateFrameCleared(FrameInterface frame) { return updateFrame(frame, Dispatcher.EXIT_STATUS_FRAME_CLEARED) > 0; } + private static final String UPDATE_FRAME_MEMORY_ERROR = + "UPDATE "+ + "frame "+ + "SET " + + "int_exit_status = ?, " + + "int_version = int_version + 1 " + + "WHERE " + + "frame.pk_frame = ? "; + @Override + public boolean updateFrameMemoryError(FrameInterface frame) { + int result = getJdbcTemplate().update( + UPDATE_FRAME_MEMORY_ERROR, + Dispatcher.EXIT_STATUS_MEMORY_FAILURE, + frame.getFrameId()); + + return result > 0; + } + private static final String UPDATE_FRAME_STARTED = "UPDATE " + "frame " + diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java index 4accd0f8d..eca3939ed 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java @@ -415,6 +415,13 @@ List findNextDispatchFrames(LayerInterface layer, VirtualProc pro */ void clearFrame(DispatchFrame frame); + /** + * Sets the frame state exitStatus to EXIT_STATUS_MEMORY_FAILURE + * + * @param frame + */ + void updateFrameMemoryError(FrameInterface frame); + /** * Update Memory usage data and LLU time for the given frame. * diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java index 887d0c29e..1d2fead26 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java @@ -343,6 +343,12 @@ public void clearFrame(DispatchFrame frame) { frameDao.updateFrameCleared(frame); } + @Override + @Transactional(propagation = Propagation.REQUIRED) + public void updateFrameMemoryError(FrameInterface frame) { + frameDao.updateFrameMemoryError(frame); + } + @Transactional(propagation = Propagation.SUPPORTS) public RunFrame prepareRqdRunFrame(VirtualProc proc, DispatchFrame frame) { int threads = proc.coresReserved / 100; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java index 55336aaf4..6e2653ac8 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java @@ -143,49 +143,35 @@ public void handleFrameCompleteReport(final FrameCompleteReport report) { } try { - - final VirtualProc proc; - - try { - - proc = hostManager.getVirtualProc( - report.getFrame().getResourceId()); - } - catch (EmptyResultDataAccessException e) { - /* - * Do not propagate this exception to RQD. This - * usually means the cue lost connectivity to - * the host and cleared out the record of the proc. - * If this is propagated back to RQD, RQD will - * keep retrying the operation forever. - */ - logger.info("failed to acquire data needed to " + - "process completed frame: " + - report.getFrame().getFrameName() + " in job " + - report.getFrame().getJobName() + "," + e); - return; - } - + final VirtualProc proc = hostManager.getVirtualProc(report.getFrame().getResourceId()); final DispatchJob job = jobManager.getDispatchJob(proc.getJobId()); final LayerDetail layer = jobManager.getLayerDetail(report.getFrame().getLayerId()); + final FrameDetail frameDetail = jobManager.getFrameDetail(report.getFrame().getFrameId()); final DispatchFrame frame = jobManager.getDispatchFrame(report.getFrame().getFrameId()); final FrameState newFrameState = determineFrameState(job, layer, frame, report); final String key = proc.getJobId() + "_" + report.getFrame().getLayerId() + "_" + report.getFrame().getFrameId(); + if (dispatchSupport.stopFrame(frame, newFrameState, report.getExitStatus(), report.getFrame().getMaxRss())) { - dispatchQueue.execute(new KeyRunnable(key) { - @Override - public void run() { - try { - handlePostFrameCompleteOperations(proc, report, job, frame, - newFrameState); - } catch (Exception e) { - logger.warn("Exception during handlePostFrameCompleteOperations " + - "in handleFrameCompleteReport" + CueExceptionUtil.getStackTrace(e)); + if (dispatcher.isTestMode()) { + // Database modifications on a threadpool cannot be captured by the test thread + handlePostFrameCompleteOperations(proc, report, job, frame, + newFrameState, frameDetail); + } else { + dispatchQueue.execute(new KeyRunnable(key) { + @Override + public void run() { + try { + handlePostFrameCompleteOperations(proc, report, job, frame, + newFrameState, frameDetail); + } catch (Exception e) { + logger.warn("Exception during handlePostFrameCompleteOperations " + + "in handleFrameCompleteReport" + CueExceptionUtil.getStackTrace(e)); + } } - } - }); + }); + } } else { /* @@ -222,6 +208,19 @@ public void run() { } } } + catch (EmptyResultDataAccessException e) { + /* + * Do not propagate this exception to RQD. This + * usually means the cue lost connectivity to + * the host and cleared out the record of the proc. + * If this is propagated back to RQD, RQD will + * keep retrying the operation forever. + */ + logger.info("failed to acquire data needed to " + + "process completed frame: " + + report.getFrame().getFrameName() + " in job " + + report.getFrame().getJobName() + "," + e); + } catch (Exception e) { /* @@ -259,7 +258,7 @@ public void run() { */ public void handlePostFrameCompleteOperations(VirtualProc proc, FrameCompleteReport report, DispatchJob job, DispatchFrame frame, - FrameState newFrameState) { + FrameState newFrameState, FrameDetail frameDetail) { try { /* @@ -313,7 +312,8 @@ public void handlePostFrameCompleteOperations(VirtualProc proc, * specified in the show's service override, service or 2GB. */ if (report.getExitStatus() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE - || report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE) { + || report.getExitSignal() == Dispatcher.EXIT_STATUS_MEMORY_FAILURE + || frameDetail.exitStatus == Dispatcher.EXIT_STATUS_MEMORY_FAILURE) { long increase = CueUtil.GB2; // since there can be multiple services, just going for the diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java index e9b163b0c..2a3218e1d 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java @@ -30,6 +30,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; import org.springframework.core.task.TaskRejectedException; import org.springframework.dao.DataAccessException; import org.springframework.dao.EmptyResultDataAccessException; @@ -50,9 +51,9 @@ import com.imageworks.spcue.rqd.RqdClient; import com.imageworks.spcue.rqd.RqdClientException; import com.imageworks.spcue.service.BookingManager; +import com.imageworks.spcue.service.CommentManager; import com.imageworks.spcue.service.HostManager; import com.imageworks.spcue.service.JobManager; -import com.imageworks.spcue.service.JobManagerSupport; import com.imageworks.spcue.util.CueExceptionUtil; import com.imageworks.spcue.util.CueUtil; @@ -72,7 +73,6 @@ public class HostReportHandler { private Dispatcher localDispatcher; private RqdClient rqdClient; private JobManager jobManager; - private JobManagerSupport jobManagerSupport; private JobDao jobDao; private LayerDao layerDao; @@ -163,9 +163,8 @@ public void handleHostReport(HostReport report, boolean isBoot) { } dispatchSupport.determineIdleCores(host, report.getHost().getLoad()); - } catch (DataAccessException dae) { - logger.warn("Unable to find host " + rhost.getName() + "," + logger.info("Unable to find host " + rhost.getName() + "," + dae + " , creating host."); // TODO: Skip adding it if the host name is over 30 characters @@ -281,8 +280,8 @@ else if (!dispatchSupport.isCueBookable(host)) { } } finally { - if (reportQueue.getQueue().size() > 0 || - System.currentTimeMillis() - startTime > 100) { + long duration = System.currentTimeMillis() - startTime; + if (duration > 100) { /* * Write a log if the host report takes a long time to process. */ @@ -423,7 +422,7 @@ private void handleMemoryUsage(final DispatchHost host, final HostReport report) // them accordingly for (final RunningFrameInfo frame: runningFrames) { if (isFrameOverboard(frame)) { - if (!killFrame(frame, host.getName())) { + if (!killFrameOverusingMemory(frame, host.getName())) { logger.warn("Frame " + frame.getJobName() + "." + frame.getFrameName() + " is overboard but could not be killed"); } @@ -434,7 +433,7 @@ private void handleMemoryUsage(final DispatchHost host, final HostReport report) } } - private boolean killFrame(RunningFrameInfo frame, String hostName) { + private boolean killFrameOverusingMemory(RunningFrameInfo frame, String hostname) { try { VirtualProc proc = hostManager.getVirtualProc(frame.getResourceId()); @@ -445,13 +444,30 @@ private boolean killFrame(RunningFrameInfo frame, String hostName) { logger.info("Killing frame on " + frame.getJobName() + "." + frame.getFrameName() + ", using too much memory."); - DispatchSupport.killedOffenderProcs.incrementAndGet(); + return killProcForMemory(proc, hostname, "This frame is using way more than it had reserved."); + } catch (EmptyResultDataAccessException e) { + return false; + } + } - if (!dispatcher.isTestMode()) { - jobManagerSupport.kill(proc, new Source("This frame is using way more than it had reserved.")); + private boolean killProcForMemory(VirtualProc proc, String hostname, String reason) { + try { + FrameInterface frame = jobManager.getFrame(proc.frameId); + + if (dispatcher.isTestMode()) { + // For testing, don't run on a different threadpool, as different threads don't share + // the same database state + (new DispatchRqdKillFrameMemory(proc.hostName, frame, reason, rqdClient, + dispatchSupport, dispatcher.isTestMode())).run(); + } else { + killQueue.execute(new DispatchRqdKillFrameMemory(proc.hostName, frame, reason, rqdClient, + dispatchSupport, dispatcher.isTestMode())); + prometheusMetrics.incrementFrameOomKilledCounter(hostname); } + DispatchSupport.killedOffenderProcs.incrementAndGet(); return true; - } catch (EmptyResultDataAccessException e) { + } catch (TaskRejectedException e) { + logger.warn("Unable to queue RQD kill, task rejected, " + e); return false; } } @@ -460,27 +476,23 @@ private boolean killFrame(RunningFrameInfo frame, String hostName) { * Kill proc with the worst user/reserved memory ratio. * * @param host - * @return killed proc, or null if none could be found + * @return killed proc, or null if none could be found or failed to be killed */ private VirtualProc killWorstMemoryOffender(final DispatchHost host) { - VirtualProc proc; try { - proc = hostManager.getWorstMemoryOffender(host); + VirtualProc proc = hostManager.getWorstMemoryOffender(host); + logger.info("Killing frame on " + proc.getName() + ", host is under stress."); + + if (!killProcForMemory(proc, host.getName(), "The host was dangerously low on memory and swapping.")) { + // Returning null will prevent the caller from overflowing the kill queue with more messages + proc = null; + } + return proc; } catch (EmptyResultDataAccessException e) { logger.error(host.name + " is under OOM and no proc is running on it."); return null; } - - logger.info("Killing frame on " + proc.getName() + ", host is under stress."); - DispatchSupport.killedOffenderProcs.incrementAndGet(); - - if (!dispatcher.isTestMode()) { - jobManagerSupport.kill(proc, new Source("The host was dangerously low on memory and swapping.")); - prometheusMetrics.incrementFrameOomKilledCounter(host.getName()); - } - - return proc; } /** @@ -556,7 +568,7 @@ private void handleMemoryReservations(final RunningFrameInfo frame) { logger.info("failed to balance host: " + proc.getName()); } } catch (Exception ex) { - logger.warn("failed to balance host: " + proc.getName() + ", " + e); + logger.info("failed to balance host: " + proc.getName() + ", " + e); } } else { logger.info("frame " + frame.getFrameName() + " on job " + frame.getJobName() @@ -589,7 +601,7 @@ private void killTimedOutFrames(HostReport report) { "This frame has reached it timeout.", rqdClient)); } catch (TaskRejectedException e) { - logger.warn("Unable to queue RQD kill, task rejected, " + e); + logger.info("Unable to queue RQD kill, task rejected, " + e); } } @@ -611,7 +623,7 @@ private void killTimedOutFrames(HostReport report) { "This frame has reached it LLU timeout.", rqdClient)); } catch (TaskRejectedException e) { - logger.warn("Unable to queue RQD kill, task rejected, " + e); + logger.info("Unable to queue RQD kill, task rejected, " + e); } } } @@ -750,10 +762,11 @@ public void verifyRunningFrameInfo(HostReport report) { String msg; VirtualProc proc = null; + boolean procExists = true; try { proc = hostManager.getVirtualProc(runningFrame.getResourceId()); - msg = "Virutal proc " + proc.getProcId() + + msg = "Virtual proc " + proc.getProcId() + "is assigned to " + proc.getFrameId() + " not " + runningFrame.getFrameId(); } @@ -765,6 +778,7 @@ public void verifyRunningFrameInfo(HostReport report) { * do however kill the proc. */ msg = "Virtual proc did not exist."; + procExists = false; } logger.info("warning, the proc " + @@ -802,17 +816,17 @@ public void verifyRunningFrameInfo(HostReport report) { if (rqd_kill) { try { - killQueue.execute(new DispatchRqdKillFrame(report.getHost().getName(), + killQueue.execute(new DispatchRqdKillFrame(report.getHost().getName(), runningFrame.getFrameId(), "OpenCue could not verify this frame.", rqdClient)); } catch (TaskRejectedException e) { - logger.warn("Unable to queue RQD kill, task rejected, " + e); + logger.info("Unable to queue RQD kill, task rejected, " + e); } } } catch (RqdClientException rqde) { - logger.warn("failed to kill " + + logger.info("failed to kill " + runningFrame.getJobName() + "/" + runningFrame.getFrameName() + " when trying to clear a failed " + @@ -820,7 +834,7 @@ public void verifyRunningFrameInfo(HostReport report) { } catch (Exception e) { CueExceptionUtil.logStackTrace("failed", e); - logger.warn("failed to verify " + + logger.info("failed to verify " + runningFrame.getJobName() + "/" + runningFrame.getFrameName() + " was running but the frame was " + @@ -889,14 +903,6 @@ public void setJobManager(JobManager jobManager) { this.jobManager = jobManager; } - public JobManagerSupport getJobManagerSupport() { - return jobManagerSupport; - } - - public void setJobManagerSupport(JobManagerSupport jobManagerSupport) { - this.jobManagerSupport = jobManagerSupport; - } - public JobDao getJobDao() { return jobDao; } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrame.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrame.java index 61258a824..fe9bde60e 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrame.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrame.java @@ -31,9 +31,7 @@ public class DispatchRqdKillFrame extends KeyRunnable { private static final Logger logger = LogManager.getLogger(DispatchRqdKillFrame.class); - private VirtualProc proc = null; private String message; - private String hostname; private String frameId; @@ -47,28 +45,14 @@ public DispatchRqdKillFrame(String hostname, String frameId, String message, Rqd this.rqdClient = rqdClient; } - public DispatchRqdKillFrame(VirtualProc proc, String message, RqdClient rqdClient) { - super("disp_rqd_kill_frame_" + proc.getProcId() + "_" + rqdClient.toString()); - this.proc = proc; - this.hostname = proc.hostName; - this.message = message; - this.rqdClient = rqdClient; - } - @Override public void run() { long startTime = System.currentTimeMillis(); try { - if (proc != null) { - rqdClient.killFrame(proc, message); - } - else { - rqdClient.killFrame(hostname, frameId, message); - } + rqdClient.killFrame(hostname, frameId, message); } catch (RqdClientException e) { logger.info("Failed to contact host " + hostname + ", " + e); - } - finally { + } finally { long elapsedTime = System.currentTimeMillis() - startTime; logger.info("RQD communication with " + hostname + " took " + elapsedTime + "ms"); diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java new file mode 100644 index 000000000..83d5f188c --- /dev/null +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java @@ -0,0 +1,69 @@ + +/* + * Copyright Contributors to the OpenCue Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package com.imageworks.spcue.dispatcher.commands; + +import com.imageworks.spcue.FrameInterface; +import com.imageworks.spcue.VirtualProc; +import com.imageworks.spcue.dispatcher.DispatchSupport; +import com.imageworks.spcue.rqd.RqdClient; +import com.imageworks.spcue.rqd.RqdClientException; +import org.apache.log4j.Logger; + +public class DispatchRqdKillFrameMemory extends KeyRunnable { + + private static final Logger logger = Logger.getLogger(DispatchRqdKillFrameMemory.class); + + private String message; + private String hostname; + private DispatchSupport dispatchSupport; + private final RqdClient rqdClient; + private final boolean isTestMode; + + private FrameInterface frame; + + public DispatchRqdKillFrameMemory(String hostname, FrameInterface frame, String message, RqdClient rqdClient, + DispatchSupport dispatchSupport, boolean isTestMode) { + super("disp_rqd_kill_frame_" + frame.getFrameId() + "_" + rqdClient.toString()); + this.frame = frame; + this.hostname = hostname; + this.message = message; + this.rqdClient = rqdClient; + this.dispatchSupport = dispatchSupport; + this.isTestMode = isTestMode; + } + + @Override + public void run() { + long startTime = System.currentTimeMillis(); + try { + if (!isTestMode) { + rqdClient.killFrame(hostname, frame.getFrameId(), message); + } + dispatchSupport.updateFrameMemoryError(frame); + } catch (RqdClientException e) { + logger.info("Failed to contact host " + hostname + ", " + e); + } finally { + long elapsedTime = System.currentTimeMillis() - startTime; + logger.info("RQD communication with " + hostname + + " took " + elapsedTime + "ms"); + } + } +} + diff --git a/cuebot/src/main/resources/conf/spring/applicationContext-service.xml b/cuebot/src/main/resources/conf/spring/applicationContext-service.xml index 35bd5cbb8..5aedc91b3 100644 --- a/cuebot/src/main/resources/conf/spring/applicationContext-service.xml +++ b/cuebot/src/main/resources/conf/spring/applicationContext-service.xml @@ -384,7 +384,6 @@ - diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/FrameCompleteHandlerTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/FrameCompleteHandlerTests.java index f022fc687..c83eee88e 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/FrameCompleteHandlerTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/FrameCompleteHandlerTests.java @@ -308,10 +308,11 @@ private void executeDepend( DispatchJob dispatchJob = jobManager.getDispatchJob(proc.getJobId()); DispatchFrame dispatchFrame = jobManager.getDispatchFrame(report.getFrame().getFrameId()); + FrameDetail frameDetail = jobManager.getFrameDetail(report.getFrame().getFrameId()); dispatchSupport.stopFrame(dispatchFrame, frameState, report.getExitStatus(), report.getFrame().getMaxRss()); frameCompleteHandler.handlePostFrameCompleteOperations(proc, - report, dispatchJob, dispatchFrame, frameState); + report, dispatchJob, dispatchFrame, frameState, frameDetail); assertTrue(jobManager.isLayerComplete(layerFirst)); assertFalse(jobManager.isLayerComplete(layerSecond)); @@ -399,10 +400,11 @@ private void executeMinMemIncrease(int expected, boolean override) { DispatchJob dispatchJob = jobManager.getDispatchJob(proc.getJobId()); DispatchFrame dispatchFrame = jobManager.getDispatchFrame(report.getFrame().getFrameId()); + FrameDetail frameDetail = jobManager.getFrameDetail(report.getFrame().getFrameId()); dispatchSupport.stopFrame(dispatchFrame, FrameState.DEAD, report.getExitStatus(), report.getFrame().getMaxRss()); frameCompleteHandler.handlePostFrameCompleteOperations(proc, - report, dispatchJob, dispatchFrame, FrameState.WAITING); + report, dispatchJob, dispatchFrame, FrameState.WAITING, frameDetail); assertFalse(jobManager.isLayerComplete(layer)); diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java index 93b943a3d..4ab7f1646 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java @@ -27,6 +27,8 @@ import com.imageworks.spcue.dispatcher.DispatchSupport; import com.imageworks.spcue.dispatcher.HostReportQueue; +import com.imageworks.spcue.dispatcher.FrameCompleteHandler; +import com.imageworks.spcue.grpc.job.FrameState; import org.junit.Before; import org.junit.Test; import org.springframework.test.annotation.Rollback; @@ -46,6 +48,7 @@ import com.imageworks.spcue.grpc.report.HostReport; import com.imageworks.spcue.grpc.report.RenderHost; import com.imageworks.spcue.grpc.report.RunningFrameInfo; +import com.imageworks.spcue.grpc.report.FrameCompleteReport; import com.imageworks.spcue.service.AdminManager; import com.imageworks.spcue.service.CommentManager; import com.imageworks.spcue.service.HostManager; @@ -54,6 +57,7 @@ import com.imageworks.spcue.test.TransactionalTest; import com.imageworks.spcue.util.CueUtil; import com.imageworks.spcue.VirtualProc; +import com.imageworks.spcue.LayerDetail; import java.util.UUID; @@ -72,6 +76,9 @@ public class HostReportHandlerTests extends TransactionalTest { @Resource HostReportHandler hostReportHandler; + @Resource + FrameCompleteHandler frameCompleteHandler; + @Resource Dispatcher dispatcher; @@ -551,13 +558,14 @@ public void testMemoryAggressionMemoryWarning() { .build(); // Overboard Rss + long memoryUsedProc3 = CueUtil.GB8; RunningFrameInfo info3 = RunningFrameInfo.newBuilder() .setJobId(proc3.getJobId()) .setLayerId(proc3.getLayerId()) .setFrameId(proc3.getFrameId()) .setResourceId(proc3.getProcId()) - .setRss(CueUtil.GB4) - .setMaxRss(CueUtil.GB4) + .setRss(memoryUsedProc3) + .setMaxRss(memoryUsedProc3) .build(); RenderHost hostAfterUpdate = getRenderHostBuilder(hostname).setFreeMem(0).build(); @@ -568,10 +576,41 @@ public void testMemoryAggressionMemoryWarning() { .addAllFrames(Arrays.asList(info1, info2, info3)) .build(); + // Get layer state before report gets sent + LayerDetail layerBeforeIncrease = jobManager.getLayerDetail(proc3.getLayerId()); + // In this case, killing one job should be enough to ge the machine to a safe state long killCount = DispatchSupport.killedOffenderProcs.get(); hostReportHandler.handleHostReport(report, false, System.currentTimeMillis()); assertEquals(killCount + 1, DispatchSupport.killedOffenderProcs.get()); + + // Confirm the frame will be set to retry after it's completion has been processed + + RunningFrameInfo runningFrame = RunningFrameInfo.newBuilder() + .setFrameId(proc3.getFrameId()) + .setFrameName("frame_name") + .setLayerId(proc3.getLayerId()) + .setRss(memoryUsedProc3) + .setMaxRss(memoryUsedProc3) + .setResourceId(proc3.id) + .build(); + FrameCompleteReport completeReport = FrameCompleteReport.newBuilder() + .setHost(hostAfterUpdate) + .setFrame(runningFrame) + .setExitSignal(9) + .setRunTime(1) + .setExitStatus(1) + .build(); + + frameCompleteHandler.handleFrameCompleteReport(completeReport); + FrameDetail killedFrame = jobManager.getFrameDetail(proc3.getFrameId()); + LayerDetail layer = jobManager.getLayerDetail(proc3.getLayerId()); + assertEquals(FrameState.WAITING, killedFrame.state); + // Memory increases are processed in two different places one will set the new value to proc.reserved + 2GB + // and the other will set to the maximum reported proc.maxRss the end value will be whoever is higher. + // In this case, proc.maxRss + assertEquals(Math.max(memoryUsedProc3, layerBeforeIncrease.getMinimumMemory() + CueUtil.GB2), + layer.getMinimumMemory()); } } From 6d1e5d97f2c08f7d185484adc1631796b9f3bf3f Mon Sep 17 00:00:00 2001 From: Diego Tavares Date: Tue, 19 Sep 2023 11:48:36 -0700 Subject: [PATCH 4/6] OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD can be deactivated with -1 (cherry picked from commit 647e75e2254c7a7ff68c544e438080f412bf04c1) --- .../java/com/imageworks/spcue/dispatcher/Dispatcher.java | 7 ++++--- .../com/imageworks/spcue/dispatcher/HostReportHandler.java | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java index fa7679830..2e83d1b6b 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java @@ -1,4 +1,3 @@ - /* * Copyright Contributors to the OpenCue Project * @@ -111,8 +110,10 @@ public interface Dispatcher { // Percentage of used memory to consider a risk for triggering oom-killer public static final double OOM_MAX_SAFE_USED_MEMORY_THRESHOLD = 0.95; - // How much can a frame exceed its reserved memory - public static final double OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD = 0.25; + // How much can a frame exceed its reserved memory. + // - 0.5 means 50% above reserve + // - -1.0 makes the feature inactive + public static final double OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD = 0.6; // A higher number gets more deep booking but less spread on the cue. public static final int DEFAULT_MAX_FRAMES_PER_PASS = 4; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java index 2a3218e1d..42c28d45a 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java @@ -420,8 +420,8 @@ private void handleMemoryUsage(final DispatchHost host, final HostReport report) // When no mass cleaning was required, check for frames going overboard // if frames didn't go overboard, manage its reservations trying to increase // them accordingly - for (final RunningFrameInfo frame: runningFrames) { - if (isFrameOverboard(frame)) { + for (final RunningFrameInfo frame : runningFrames) { + if (OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD > 0 && isFrameOverboard(frame)) { if (!killFrameOverusingMemory(frame, host.getName())) { logger.warn("Frame " + frame.getJobName() + "." + frame.getFrameName() + " is overboard but could not be killed"); From 55283c53780338462d63229d2d8a46314772776b Mon Sep 17 00:00:00 2001 From: Diego Tavares Date: Wed, 4 Oct 2023 11:18:45 -0700 Subject: [PATCH 5/6] Limit the number of kill retries (cherry picked from commit aea4864ef66aca494fb455a7c103e4a832b63d41) --- .../com/imageworks/spcue/FrameInterface.java | 1 - .../spcue/dispatcher/DispatchSupport.java | 2 +- .../dispatcher/DispatchSupportService.java | 10 +- .../spcue/dispatcher/Dispatcher.java | 9 +- .../spcue/dispatcher/HostReportHandler.java | 311 ++++++++++-------- .../commands/DispatchRqdKillFrameMemory.java | 8 +- cuebot/src/main/resources/opencue.properties | 15 + .../dispatcher/HostReportHandlerTests.java | 10 +- cuebot/src/test/resources/opencue.properties | 4 + rqd/rqd/rqdservicers.py | 2 + rqd/rqd/rqnetwork.py | 1 + 11 files changed, 209 insertions(+), 164 deletions(-) diff --git a/cuebot/src/main/java/com/imageworks/spcue/FrameInterface.java b/cuebot/src/main/java/com/imageworks/spcue/FrameInterface.java index eff22768f..945685444 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/FrameInterface.java +++ b/cuebot/src/main/java/com/imageworks/spcue/FrameInterface.java @@ -1,4 +1,3 @@ - /* * Copyright Contributors to the OpenCue Project * diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java index eca3939ed..6d142e89d 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupport.java @@ -420,7 +420,7 @@ List findNextDispatchFrames(LayerInterface layer, VirtualProc pro * * @param frame */ - void updateFrameMemoryError(FrameInterface frame); + boolean updateFrameMemoryError(FrameInterface frame); /** * Update Memory usage data and LLU time for the given frame. diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java index 1d2fead26..018296265 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java @@ -184,7 +184,11 @@ public boolean increaseReservedMemory(ProcInterface p, long value) { @Override public boolean clearVirtualProcAssignement(ProcInterface proc) { - return procDao.clearVirtualProcAssignment(proc); + try { + return procDao.clearVirtualProcAssignment(proc); + } catch (DataAccessException e) { + return false; + } } @Transactional(propagation = Propagation.REQUIRED) @@ -345,8 +349,8 @@ public void clearFrame(DispatchFrame frame) { @Override @Transactional(propagation = Propagation.REQUIRED) - public void updateFrameMemoryError(FrameInterface frame) { - frameDao.updateFrameMemoryError(frame); + public boolean updateFrameMemoryError(FrameInterface frame) { + return frameDao.updateFrameMemoryError(frame); } @Transactional(propagation = Propagation.SUPPORTS) diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java index 2e83d1b6b..072b04113 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/Dispatcher.java @@ -107,13 +107,8 @@ public interface Dispatcher { // without being penalized for it. public static final long VIRTUAL_MEM_THRESHHOLD = CueUtil.GB2; - // Percentage of used memory to consider a risk for triggering oom-killer - public static final double OOM_MAX_SAFE_USED_MEMORY_THRESHOLD = 0.95; - - // How much can a frame exceed its reserved memory. - // - 0.5 means 50% above reserve - // - -1.0 makes the feature inactive - public static final double OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD = 0.6; + // How long to keep track of a frame kill request + public static final int FRAME_KILL_CACHE_EXPIRE_AFTER_WRITE_MINUTES = 3; // A higher number gets more deep booking but less spread on the cue. public static final int DEFAULT_MAX_FRAMES_PER_PASS = 4; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java index 42c28d45a..57e8f50c2 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java @@ -24,8 +24,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.imageworks.spcue.*; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; @@ -76,6 +80,10 @@ public class HostReportHandler { private JobDao jobDao; private LayerDao layerDao; + Cache killRequestCounterCache = CacheBuilder.newBuilder() + .expireAfterWrite(FRAME_KILL_CACHE_EXPIRE_AFTER_WRITE_MINUTES, TimeUnit.MINUTES) + .build(); + /** * Boolean to toggle if this class is accepting data or not. */ @@ -393,6 +401,10 @@ private void changeLockState(DispatchHost host, CoreDetail coreInfo) { * @param report */ private void handleMemoryUsage(final DispatchHost host, final HostReport report) { + final double OOM_MAX_SAFE_USED_MEMORY_THRESHOLD = + env.getRequiredProperty("dispatcher.oom_max_safe_used_memory_threshold", Double.class); + final double OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD = + env.getRequiredProperty("dispatcher.oom_frame_overboard_allowed_threshold", Double.class); RenderHost renderHost = report.getHost(); List runningFrames = report.getFramesList(); @@ -401,21 +413,17 @@ private void handleMemoryUsage(final DispatchHost host, final HostReport report) (1.0 - OOM_MAX_SAFE_USED_MEMORY_THRESHOLD)); if (memoryWarning) { - VirtualProc killedProc = killWorstMemoryOffender(host); long memoryAvailable = renderHost.getFreeMem(); long minSafeMemoryAvailable = (long)(renderHost.getTotalMem() * (1.0 - OOM_MAX_SAFE_USED_MEMORY_THRESHOLD)); - // Some extra protection for this possibly unbound loop - int unboundProtectionLimit = 10; - while (killedProc != null && unboundProtectionLimit > 0) { - memoryAvailable = memoryAvailable + killedProc.memoryUsed; - - // If killing this proc solved the memory issue, stop the attack - if (memoryAvailable >= minSafeMemoryAvailable) { - break; + // Only allow killing up to 10 frames at a time + int killAttemptsRemaining = 10; + do { + VirtualProc killedProc = killWorstMemoryOffender(host); + killAttemptsRemaining -= 1; + if (killedProc != null) { + memoryAvailable = memoryAvailable + killedProc.memoryUsed; } - killedProc = killWorstMemoryOffender(host); - unboundProtectionLimit -= 1; - } + } while (killAttemptsRemaining > 0 && memoryAvailable < minSafeMemoryAvailable); } else { // When no mass cleaning was required, check for frames going overboard // if frames didn't go overboard, manage its reservations trying to increase @@ -433,6 +441,23 @@ private void handleMemoryUsage(final DispatchHost host, final HostReport report) } } + public enum KillCause { + FrameOverboard("This frame is using way more than it had reserved."), + HostUnderOom("Frame killed by host under Oom pressure"), + FrameTimedOut("Frame timed out"), + FrameLluTimedOut("Frame LLU timed out"), + FrameVerificationFailure("Frame failed to be verified on the database"); + private final String message; + + private KillCause(String message) { + this.message = message; + } + @Override + public String toString() { + return message; + } + } + private boolean killFrameOverusingMemory(RunningFrameInfo frame, String hostname) { try { VirtualProc proc = hostManager.getVirtualProc(frame.getResourceId()); @@ -444,32 +469,87 @@ private boolean killFrameOverusingMemory(RunningFrameInfo frame, String hostname logger.info("Killing frame on " + frame.getJobName() + "." + frame.getFrameName() + ", using too much memory."); - return killProcForMemory(proc, hostname, "This frame is using way more than it had reserved."); + return killProcForMemory(proc, hostname, KillCause.FrameOverboard); } catch (EmptyResultDataAccessException e) { return false; } } - private boolean killProcForMemory(VirtualProc proc, String hostname, String reason) { + private boolean getKillClearance(String hostname, String frameId) { + String cacheKey = hostname + "-" + frameId; + final int FRAME_KILL_RETRY_LIMIT = + env.getRequiredProperty("dispatcher.frame_kill_retry_limit", Integer.class); + + // Cache frame+host receiving a killRequest and count how many times the request is being retried + // meaning rqd is probably failing at attempting to kill the related proc + long cachedCount; try { - FrameInterface frame = jobManager.getFrame(proc.frameId); + cachedCount = 1 + killRequestCounterCache.get(cacheKey, () -> 0L); + } catch (ExecutionException e) { + return false; + } + killRequestCounterCache.put(cacheKey, cachedCount); + if (cachedCount > FRAME_KILL_RETRY_LIMIT) { + // If the kill retry limit has been reached, notify prometheus of the issue and give up + if (!dispatcher.isTestMode()) { + FrameInterface frame = jobManager.getFrame(frameId); + JobInterface job = jobManager.getJob(frame.getJobId()); + prometheusMetrics.incrementFrameKillFailureCounter( + hostname, + job.getName(), + frame.getName(), + frameId); + } + return false; + } + return true; + } - if (dispatcher.isTestMode()) { - // For testing, don't run on a different threadpool, as different threads don't share - // the same database state - (new DispatchRqdKillFrameMemory(proc.hostName, frame, reason, rqdClient, - dispatchSupport, dispatcher.isTestMode())).run(); - } else { - killQueue.execute(new DispatchRqdKillFrameMemory(proc.hostName, frame, reason, rqdClient, + private boolean killProcForMemory(VirtualProc proc, String hostname, KillCause killCause) { + if (!getKillClearance(hostname, proc.frameId)) { + return false; + } + + FrameInterface frame = jobManager.getFrame(proc.frameId); + if (dispatcher.isTestMode()) { + // Different threads don't share the same database state on the test environment + (new DispatchRqdKillFrameMemory(hostname, frame, killCause.toString(), rqdClient, + dispatchSupport, dispatcher.isTestMode())).run(); + } else { + try { + killQueue.execute(new DispatchRqdKillFrameMemory(hostname, frame, killCause.toString(), rqdClient, dispatchSupport, dispatcher.isTestMode())); - prometheusMetrics.incrementFrameOomKilledCounter(hostname); + prometheusMetrics.incrementFrameKilledCounter(hostname, killCause); + } catch (TaskRejectedException e) { + logger.warn("Unable to add a DispatchRqdKillFrame request, task rejected, " + e); + return false; } - DispatchSupport.killedOffenderProcs.incrementAndGet(); - return true; - } catch (TaskRejectedException e) { - logger.warn("Unable to queue RQD kill, task rejected, " + e); + } + DispatchSupport.killedOffenderProcs.incrementAndGet(); + return true; + } + + private boolean killFrame(String frameId, String hostname, KillCause killCause) { + if (!getKillClearance(hostname, frameId)) { return false; } + + if (dispatcher.isTestMode()) { + // Different threads don't share the same database state on the test environment + (new DispatchRqdKillFrame(hostname, frameId, killCause.toString(), rqdClient)).run(); + } else { + try { + killQueue.execute(new DispatchRqdKillFrame(hostname, + frameId, + killCause.toString(), + rqdClient)); + prometheusMetrics.incrementFrameKilledCounter(hostname, killCause); + } catch (TaskRejectedException e) { + logger.warn("Unable to add a DispatchRqdKillFrame request, task rejected, " + e); + } + } + DispatchSupport.killedOffenderProcs.incrementAndGet(); + return true; } /** @@ -483,8 +563,7 @@ private VirtualProc killWorstMemoryOffender(final DispatchHost host) { VirtualProc proc = hostManager.getWorstMemoryOffender(host); logger.info("Killing frame on " + proc.getName() + ", host is under stress."); - if (!killProcForMemory(proc, host.getName(), "The host was dangerously low on memory and swapping.")) { - // Returning null will prevent the caller from overflowing the kill queue with more messages + if (!killProcForMemory(proc, host.getName(), KillCause.HostUnderOom)) { proc = null; } return proc; @@ -503,6 +582,8 @@ private VirtualProc killWorstMemoryOffender(final DispatchHost host) { private boolean isFrameOverboard(final RunningFrameInfo frame) { double rss = (double)frame.getRss(); double maxRss = (double)frame.getMaxRss(); + final double OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD = + env.getRequiredProperty("dispatcher.oom_frame_overboard_allowed_threshold", Double.class); final double MAX_RSS_OVERBOARD_THRESHOLD = OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD * 2; final double RSS_AVAILABLE_FOR_MAX_RSS_TRIGGER = 0.1; @@ -586,7 +667,6 @@ private void handleMemoryReservations(final RunningFrameInfo frame) { * @param rFrames */ private void killTimedOutFrames(HostReport report) { - final Map layers = new HashMap(5); for (RunningFrameInfo frame: report.getFramesList()) { @@ -594,36 +674,16 @@ private void killTimedOutFrames(HostReport report) { LayerDetail layer = layerDao.getLayerDetail(layerId); long runtimeMinutes = ((System.currentTimeMillis() - frame.getStartTime()) / 1000l) / 60; - if (layer.timeout != 0 && runtimeMinutes > layer.timeout){ - try { - killQueue.execute(new DispatchRqdKillFrame(report.getHost().getName(), - frame.getFrameId(), - "This frame has reached it timeout.", - rqdClient)); - } catch (TaskRejectedException e) { - logger.info("Unable to queue RQD kill, task rejected, " + e); - } - } - - if (layer.timeout_llu == 0){ - continue; - } - - if (frame.getLluTime() == 0){ - continue; - } + String hostname = report.getHost().getName(); - long r = System.currentTimeMillis() / 1000; - long lastUpdate = (r - frame.getLluTime()) / 60; + if (layer.timeout != 0 && runtimeMinutes > layer.timeout){ + killFrame(frame.getFrameId(), hostname, KillCause.FrameTimedOut); + } else if (layer.timeout_llu != 0 && frame.getLluTime() != 0) { + long r = System.currentTimeMillis() / 1000; + long lastUpdate = (r - frame.getLluTime()) / 60; - if (layer.timeout_llu != 0 && lastUpdate > (layer.timeout_llu -1)){ - try { - killQueue.execute(new DispatchRqdKillFrame(report.getHost().getName(), - frame.getFrameId(), - "This frame has reached it LLU timeout.", - rqdClient)); - } catch (TaskRejectedException e) { - logger.info("Unable to queue RQD kill, task rejected, " + e); + if (layer.timeout_llu != 0 && lastUpdate > (layer.timeout_llu - 1)){ + killFrame(frame.getFrameId(), hostname, KillCause.FrameLluTimedOut); } } } @@ -749,100 +809,59 @@ public void verifyRunningFrameInfo(HostReport report) { continue; } + if (hostManager.verifyRunningProc(runningFrame.getResourceId(), runningFrame.getFrameId())) { + runningFrames.add(runningFrame); + continue; + } - if (!hostManager.verifyRunningProc(runningFrame.getResourceId(), - runningFrame.getFrameId())) { - - /* - * The frame this proc is running is no longer - * assigned to this proc. Don't ever touch - * the frame record. If we make it here that means - * the proc has been running for over 2 min. - */ - - String msg; - VirtualProc proc = null; - boolean procExists = true; + /* + * The frame this proc is running is no longer + * assigned to this proc. Don't ever touch + * the frame record. If we make it here that means + * the proc has been running for over 2 min. + */ + String msg; + VirtualProc proc = null; - try { - proc = hostManager.getVirtualProc(runningFrame.getResourceId()); - msg = "Virtual proc " + proc.getProcId() + + try { + proc = hostManager.getVirtualProc(runningFrame.getResourceId()); + msg = "Virtual proc " + proc.getProcId() + "is assigned to " + proc.getFrameId() + " not " + runningFrame.getFrameId(); - } - catch (Exception e) { - /* - * This will happen if the host goes off line and then - * comes back. In this case, we don't touch the frame - * since it might already be running somewhere else. We - * do however kill the proc. - */ - msg = "Virtual proc did not exist."; - procExists = false; - } - - logger.info("warning, the proc " + - runningFrame.getResourceId() + " on host " + - report.getHost().getName() + " was running for " + - (runtimeSeconds / 60.0f) + " minutes " + - runningFrame.getJobName() + "/" + runningFrame.getFrameName() + - "but the DB did not " + - "reflect this " + - msg); - - DispatchSupport.accountingErrors.incrementAndGet(); - - try { - /* - * If the proc did exist unbook it if we can't - * verify its running something. - */ - boolean rqd_kill = false; - if (proc != null) { - - /* - * Check to see if the proc is an orphan. - */ - if (hostManager.isOprhan(proc)) { - dispatchSupport.clearVirtualProcAssignement(proc); - dispatchSupport.unbookProc(proc); - rqd_kill = true; - } - } - else { - /* Proc doesn't exist so a kill won't hurt */ - rqd_kill = true; - } - - if (rqd_kill) { - try { - killQueue.execute(new DispatchRqdKillFrame(report.getHost().getName(), - runningFrame.getFrameId(), - "OpenCue could not verify this frame.", - rqdClient)); - } catch (TaskRejectedException e) { - logger.info("Unable to queue RQD kill, task rejected, " + e); - } - } + } + catch (Exception e) { + /* + * This will happen if the host goes offline and then + * comes back. In this case, we don't touch the frame + * since it might already be running somewhere else. We + * do however kill the proc. + */ + msg = "Virtual proc did not exist."; + } - } catch (RqdClientException rqde) { - logger.info("failed to kill " + - runningFrame.getJobName() + "/" + - runningFrame.getFrameName() + - " when trying to clear a failed " + - " frame verification, " + rqde); - - } catch (Exception e) { - CueExceptionUtil.logStackTrace("failed", e); - logger.info("failed to verify " + - runningFrame.getJobName() + "/" + - runningFrame.getFrameName() + - " was running but the frame was " + - " unable to be killed, " + e); - } + DispatchSupport.accountingErrors.incrementAndGet(); + if (proc != null && hostManager.isOprhan(proc)) { + dispatchSupport.clearVirtualProcAssignement(proc); + dispatchSupport.unbookProc(proc); + proc = null; } - else { - runningFrames.add(runningFrame); + if (proc == null) { + if (killFrame(runningFrame.getFrameId(), + report.getHost().getName(), + KillCause.FrameVerificationFailure)) { + logger.info("FrameVerificationError, the proc " + + runningFrame.getResourceId() + " on host " + + report.getHost().getName() + " was running for " + + (runtimeSeconds / 60.0f) + " minutes " + + runningFrame.getJobName() + "/" + runningFrame.getFrameName() + + "but the DB did not " + + "reflect this. " + + msg); + } else { + logger.warn("FrameStuckWarning: frameId=" + runningFrame.getFrameId() + + " render_node=" + report.getHost().getName() + " - " + + runningFrame.getJobName() + "/" + runningFrame.getFrameName()); + } } } } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java index 83d5f188c..53aff120e 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java @@ -53,12 +53,14 @@ public DispatchRqdKillFrameMemory(String hostname, FrameInterface frame, String public void run() { long startTime = System.currentTimeMillis(); try { - if (!isTestMode) { + if (dispatchSupport.updateFrameMemoryError(frame) && !isTestMode) { rqdClient.killFrame(hostname, frame.getFrameId(), message); + } else { + logger.warn("Could not update frame " + frame.getFrameId() + + " status to EXIT_STATUS_MEMORY_FAILURE. Canceling kill request!"); } - dispatchSupport.updateFrameMemoryError(frame); } catch (RqdClientException e) { - logger.info("Failed to contact host " + hostname + ", " + e); + logger.warn("Failed to contact host " + hostname + ", " + e); } finally { long elapsedTime = System.currentTimeMillis() - startTime; logger.info("RQD communication with " + hostname + diff --git a/cuebot/src/main/resources/opencue.properties b/cuebot/src/main/resources/opencue.properties index a08522eb1..788f75fb0 100644 --- a/cuebot/src/main/resources/opencue.properties +++ b/cuebot/src/main/resources/opencue.properties @@ -125,6 +125,21 @@ dispatcher.booking_queue.max_pool_size=6 dispatcher.booking_queue.queue_capacity=1000 # Whether or not to satisfy dependents (*_ON_FRAME and *_ON_LAYER) only on Frame success +# Percentage of used memory to consider a risk for triggering oom-killer +dispatcher.oom_max_safe_used_memory_threshold=0.95 + +# How much can a frame exceed its reserved memory. +# - 0.5 means 50% above reserve +# - -1.0 makes the feature inactive +# This feature is being kept inactive for now as we work on improving the +# frame retry logic (See commit comment for more details). +dispatcher.oom_frame_overboard_allowed_threshold=-1.0 + +# How many times should cuebot send a kill request for the same frame-host before reporting +# the frame as stuck +dispatcher.frame_kill_retry_limit=3 + +# Whether to satisfy dependents (*_ON_FRAME and *_ON_LAYER) only on Frame success depend.satisfy_only_on_frame_success=true # Jobs will be archived to the history tables after being completed for this long. diff --git a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java index 4ab7f1646..2e6367efe 100644 --- a/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java +++ b/cuebot/src/test/java/com/imageworks/spcue/test/dispatcher/HostReportHandlerTests.java @@ -457,6 +457,8 @@ public void testMemoryAndLlu() { @Rollback(true) public void testMemoryAggressionRss() { jobLauncher.testMode = true; + dispatcher.setTestMode(true); + jobLauncher.launch(new File("src/test/resources/conf/jobspec/jobspec_simple.xml")); DispatchHost host = getHost(hostname); @@ -464,8 +466,8 @@ public void testMemoryAggressionRss() { assertEquals(1, procs.size()); VirtualProc proc = procs.get(0); - long memoryOverboard = (long) Math.ceil((double) proc.memoryReserved * - (1.0 + Dispatcher.OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD)); + // 1.6 = 1 + dispatcher.oom_frame_overboard_allowed_threshold + long memoryOverboard = (long) Math.ceil((double) proc.memoryReserved * 1.6); // Test rss overboard RunningFrameInfo info = RunningFrameInfo.newBuilder() @@ -492,6 +494,7 @@ public void testMemoryAggressionRss() { @Rollback(true) public void testMemoryAggressionMaxRss() { jobLauncher.testMode = true; + dispatcher.setTestMode(true); jobLauncher.launch(new File("src/test/resources/conf/jobspec/jobspec_simple.xml")); DispatchHost host = getHost(hostname); @@ -499,8 +502,9 @@ public void testMemoryAggressionMaxRss() { assertEquals(1, procs.size()); VirtualProc proc = procs.get(0); + // 0.6 = dispatcher.oom_frame_overboard_allowed_threshold long memoryOverboard = (long) Math.ceil((double) proc.memoryReserved * - (1.0 + (2 * Dispatcher.OOM_FRAME_OVERBOARD_ALLOWED_THRESHOLD))); + (1.0 + (2 * 0.6))); // Test rss>90% and maxRss overboard RunningFrameInfo info = RunningFrameInfo.newBuilder() diff --git a/cuebot/src/test/resources/opencue.properties b/cuebot/src/test/resources/opencue.properties index 0203ee6f7..f6ea7fd44 100644 --- a/cuebot/src/test/resources/opencue.properties +++ b/cuebot/src/test/resources/opencue.properties @@ -64,3 +64,7 @@ dispatcher.kill_queue.queue_capacity=1000 dispatcher.booking_queue.core_pool_size=6 dispatcher.booking_queue.max_pool_size=6 dispatcher.booking_queue.queue_capacity=1000 + +dispatcher.oom_max_safe_used_memory_threshold=0.95 +dispatcher.oom_frame_overboard_allowed_threshold=0.6 +dispatcher.frame_kill_retry_limit=3 \ No newline at end of file diff --git a/rqd/rqd/rqdservicers.py b/rqd/rqd/rqdservicers.py index 98ab358ac..b736ef43b 100644 --- a/rqd/rqd/rqdservicers.py +++ b/rqd/rqd/rqdservicers.py @@ -67,6 +67,8 @@ def KillRunningFrame(self, request, context): frame = self.rqCore.getRunningFrame(request.frame_id) if frame: frame.kill(message=request.message) + else: + log.warning("Wasn't able to find frame(%s) to kill", request.frame_id) return rqd.compiled_proto.rqd_pb2.RqdStaticKillRunningFrameResponse() def ShutdownRqdNow(self, request, context): diff --git a/rqd/rqd/rqnetwork.py b/rqd/rqd/rqnetwork.py index 9f33e64a2..9d3591fdd 100644 --- a/rqd/rqd/rqnetwork.py +++ b/rqd/rqd/rqnetwork.py @@ -162,6 +162,7 @@ def kill(self, message=""): else: os.killpg(self.pid, rqd.rqconstants.KILL_SIGNAL) finally: + log.warning("kill() successfully killed frameId=%s pid=%s", self.frameId, self.pid) rqd.rqutil.permissionsLow() except OSError as e: log.warning( From 250caca8ac31a877e7626bcc7753a88b16f9b928 Mon Sep 17 00:00:00 2001 From: DiegoTavares Date: Wed, 4 Oct 2023 11:34:26 -0700 Subject: [PATCH 6/6] Fix merge conflicts --- .../spcue/dispatcher/DispatchSupportService.java | 2 ++ .../spcue/dispatcher/FrameCompleteHandler.java | 1 + .../spcue/dispatcher/HostReportHandler.java | 12 ++++-------- .../commands/DispatchRqdKillFrameMemory.java | 6 ++++-- .../com/imageworks/spcue/service/HostManager.java | 8 -------- .../imageworks/spcue/service/HostManagerService.java | 5 ----- 6 files changed, 11 insertions(+), 23 deletions(-) diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java index 018296265..b86faaea0 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/DispatchSupportService.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; + import com.imageworks.spcue.AllocationInterface; import com.imageworks.spcue.DispatchFrame; import com.imageworks.spcue.DispatchHost; @@ -42,6 +43,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import org.springframework.dao.EmptyResultDataAccessException; +import org.springframework.dao.DataAccessException; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java index 6e2653ac8..145953c46 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/FrameCompleteHandler.java @@ -35,6 +35,7 @@ import com.imageworks.spcue.DispatchJob; import com.imageworks.spcue.JobDetail; import com.imageworks.spcue.LayerDetail; +import com.imageworks.spcue.FrameDetail; import com.imageworks.spcue.LayerInterface; import com.imageworks.spcue.Source; import com.imageworks.spcue.VirtualProc; diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java index 57e8f50c2..0c9f37ac2 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/HostReportHandler.java @@ -45,6 +45,7 @@ import com.imageworks.spcue.dispatcher.commands.DispatchBookHostLocal; import com.imageworks.spcue.dispatcher.commands.DispatchHandleHostReport; import com.imageworks.spcue.dispatcher.commands.DispatchRqdKillFrame; +import com.imageworks.spcue.dispatcher.commands.DispatchRqdKillFrameMemory; import com.imageworks.spcue.grpc.host.HardwareState; import com.imageworks.spcue.grpc.host.LockState; import com.imageworks.spcue.grpc.report.BootReport; @@ -79,6 +80,8 @@ public class HostReportHandler { private JobManager jobManager; private JobDao jobDao; private LayerDao layerDao; + @Autowired + private Environment env; Cache killRequestCounterCache = CacheBuilder.newBuilder() .expireAfterWrite(FRAME_KILL_CACHE_EXPIRE_AFTER_WRITE_MINUTES, TimeUnit.MINUTES) @@ -490,15 +493,10 @@ private boolean getKillClearance(String hostname, String frameId) { } killRequestCounterCache.put(cacheKey, cachedCount); if (cachedCount > FRAME_KILL_RETRY_LIMIT) { - // If the kill retry limit has been reached, notify prometheus of the issue and give up + // If the kill retry limit has been reached if (!dispatcher.isTestMode()) { FrameInterface frame = jobManager.getFrame(frameId); JobInterface job = jobManager.getJob(frame.getJobId()); - prometheusMetrics.incrementFrameKillFailureCounter( - hostname, - job.getName(), - frame.getName(), - frameId); } return false; } @@ -519,7 +517,6 @@ private boolean killProcForMemory(VirtualProc proc, String hostname, KillCause k try { killQueue.execute(new DispatchRqdKillFrameMemory(hostname, frame, killCause.toString(), rqdClient, dispatchSupport, dispatcher.isTestMode())); - prometheusMetrics.incrementFrameKilledCounter(hostname, killCause); } catch (TaskRejectedException e) { logger.warn("Unable to add a DispatchRqdKillFrame request, task rejected, " + e); return false; @@ -543,7 +540,6 @@ private boolean killFrame(String frameId, String hostname, KillCause killCause) frameId, killCause.toString(), rqdClient)); - prometheusMetrics.incrementFrameKilledCounter(hostname, killCause); } catch (TaskRejectedException e) { logger.warn("Unable to add a DispatchRqdKillFrame request, task rejected, " + e); } diff --git a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java index 53aff120e..f50017361 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java +++ b/cuebot/src/main/java/com/imageworks/spcue/dispatcher/commands/DispatchRqdKillFrameMemory.java @@ -24,11 +24,13 @@ import com.imageworks.spcue.dispatcher.DispatchSupport; import com.imageworks.spcue.rqd.RqdClient; import com.imageworks.spcue.rqd.RqdClientException; -import org.apache.log4j.Logger; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.LogManager; + public class DispatchRqdKillFrameMemory extends KeyRunnable { - private static final Logger logger = Logger.getLogger(DispatchRqdKillFrameMemory.class); + private static final Logger logger = LogManager.getLogger(DispatchRqdKillFrameMemory.class); private String message; private String hostname; diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java b/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java index 00b90654c..bba2e0401 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/HostManager.java @@ -62,14 +62,6 @@ public interface HostManager { */ void setHostState(HostInterface host, HardwareState state); - /** - * Updates the freeMcp of a host. - * - * @param host HostInterface - * @param freeMcp Long - */ - void setHostFreeMcp(HostInterface host, Long freeMcp); - DispatchHost createHost(HostReport report); DispatchHost createHost(RenderHost host); diff --git a/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java b/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java index da17458e2..74c8fa2d1 100644 --- a/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java +++ b/cuebot/src/main/java/com/imageworks/spcue/service/HostManagerService.java @@ -93,11 +93,6 @@ public void setHostState(HostInterface host, HardwareState state) { hostDao.updateHostState(host, state); } - @Override - public void setHostFreeMcp(HostInterface host, Long freeMcp) { - hostDao.updateHostFreeMcp(host, freeMcp); - } - public void rebootWhenIdle(HostInterface host) { try { hostDao.updateHostState(host, HardwareState.REBOOT_WHEN_IDLE);