diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index b73357524e3f4..be1b9be801cb5 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -56,6 +56,9 @@ * use {@link ExchangeSourceHandler#addRemoteSink(RemoteSink, boolean, Runnable, int, ActionListener)}. */ public final class ExchangeService extends AbstractLifecycleComponent { + + private static final Logger logger = LogManager.getLogger(ExchangeService.class); + // TODO: Make this a child action of the data node transport to ensure that exchanges // are accessed only by the user initialized the session. public static final String EXCHANGE_ACTION_NAME = "internal:data/read/esql/exchange"; @@ -71,8 +74,6 @@ public final class ExchangeService extends AbstractLifecycleComponent { public static final String INACTIVE_SINKS_INTERVAL_SETTING = "esql.exchange.sink_inactive_interval"; public static final TimeValue INACTIVE_SINKS_INTERVAL_DEFAULT = TimeValue.timeValueMinutes(5); - private static final Logger LOGGER = LogManager.getLogger(ExchangeService.class); - private final ThreadPool threadPool; private final Executor executor; private final BlockFactory blockFactory; @@ -87,7 +88,7 @@ public ExchangeService(Settings settings, ThreadPool threadPool, String executor final var inactiveInterval = settings.getAsTime(INACTIVE_SINKS_INTERVAL_SETTING, INACTIVE_SINKS_INTERVAL_DEFAULT); // Run the reaper every half of the keep_alive interval this.threadPool.scheduleWithFixedDelay( - new InactiveSinksReaper(LOGGER, threadPool, inactiveInterval), + new InactiveSinksReaper(threadPool, inactiveInterval), TimeValue.timeValueMillis(Math.max(1, inactiveInterval.millis() / 2)), executor ); @@ -249,12 +250,10 @@ public void messageReceived(ExchangeRequest request, TransportChannel channel, T } private final class InactiveSinksReaper extends AbstractRunnable { - private final Logger logger; private final TimeValue keepAlive; private final ThreadPool threadPool; - InactiveSinksReaper(Logger logger, ThreadPool threadPool, TimeValue keepAlive) { - this.logger = logger; + InactiveSinksReaper(ThreadPool threadPool, TimeValue keepAlive) { this.keepAlive = keepAlive; this.threadPool = threadPool; }