Skip to content

Commit 8c4050f

Browse files
pujaganidiemol
andauthored
[grid] Purge timed out requests sitting the queue at regular intervals (#9283)
* [grid] Purge timed out requests sitting the queue at regular intervals * [grid] Add error message as a constant Co-authored-by: Diego Molina <[email protected]>
1 parent f1cb54f commit 8c4050f

File tree

3 files changed

+53
-4
lines changed

3 files changed

+53
-4
lines changed

java/server/src/org/openqa/selenium/grid/sessionqueue/local/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ java_library(
1212
deps = [
1313
"//java/client/src/org/openqa/selenium/remote",
1414
"//java/server/src/org/openqa/selenium/events",
15+
"//java/server/src/org/openqa/selenium/concurrent",
1516
"//java/server/src/org/openqa/selenium/grid/config",
1617
"//java/server/src/org/openqa/selenium/grid/data",
1718
"//java/server/src/org/openqa/selenium/grid/jmx",

java/server/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueue.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.openqa.selenium.grid.sessionqueue.local;
1919

20+
import org.openqa.selenium.concurrent.Regularly;
2021
import org.openqa.selenium.events.EventBus;
2122
import org.openqa.selenium.grid.config.Config;
2223
import org.openqa.selenium.grid.data.NewSessionErrorResponse;
@@ -73,12 +74,20 @@ public class LocalNewSessionQueue extends NewSessionQueue {
7374
private final ScheduledExecutorService executorService =
7475
Executors.newSingleThreadScheduledExecutor();
7576
private final Thread shutdownHook = new Thread(this::callExecutorShutdown);
77+
private final String timedOutErrorMessage =
78+
String.format( "New session request rejected after being in the queue for more than %s",
79+
requestTimeout);
7680

7781
public LocalNewSessionQueue(Tracer tracer, EventBus bus, Duration retryInterval,
7882
Duration requestTimeout) {
7983
super(tracer, retryInterval, requestTimeout);
8084
this.bus = Require.nonNull("Event bus", bus);
8185
Runtime.getRuntime().addShutdownHook(shutdownHook);
86+
87+
Regularly regularly = new Regularly("New Session Queue Clean up");
88+
regularly.submit(this::purgeTimedOutRequests, Duration.ofSeconds(60),
89+
Duration.ofSeconds(30));
90+
8291
new JMXHelper().register(this);
8392
}
8493

@@ -194,8 +203,7 @@ private void retryRequest(SessionRequest sessionRequest) {
194203
LOG.log(Level.INFO, "Request {0} timed out", requestId);
195204
sessionRequests.remove(sessionRequest);
196205
bus.fire(new NewSessionRejectedEvent(
197-
new NewSessionErrorResponse(requestId, String.format(
198-
"New session request rejected after being in the queue for more than %s", requestTimeout))));
206+
new NewSessionErrorResponse(requestId, timedOutErrorMessage)));
199207
} else {
200208
LOG.log(Level.INFO,
201209
"Adding request back to the queue. All slots are busy. Request: {0}",
@@ -239,8 +247,7 @@ public Optional<HttpRequest> remove(RequestId id) {
239247
HttpRequest request = httpRequest.get();
240248
if (hasRequestTimedOut(request)) {
241249
bus.fire(new NewSessionRejectedEvent(
242-
new NewSessionErrorResponse(id, String.format(
243-
"New session request rejected after being in the queue for more than %s", requestTimeout))));
250+
new NewSessionErrorResponse(id, timedOutErrorMessage)));
244251
return Optional.empty();
245252
}
246253
}
@@ -272,6 +279,24 @@ public int clear() {
272279
}
273280
}
274281

282+
private void purgeTimedOutRequests() {
283+
Lock writeLock = lock.writeLock();
284+
writeLock.lock();
285+
try {
286+
Iterator<SessionRequest> iterator = sessionRequests.iterator();
287+
while (iterator.hasNext()) {
288+
SessionRequest sessionRequest = iterator.next();
289+
if (hasRequestTimedOut(sessionRequest.getHttpRequest())) {
290+
iterator.remove();
291+
bus.fire(new NewSessionRejectedEvent(
292+
new NewSessionErrorResponse(sessionRequest.getRequestId(), timedOutErrorMessage)));
293+
}
294+
}
295+
} finally {
296+
writeLock.unlock();
297+
}
298+
}
299+
275300
public void callExecutorShutdown() {
276301
LOG.info("Shutting down session queue executor service");
277302
executorService.shutdown();

java/server/test/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueueTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.openqa.selenium.ImmutableCapabilities;
2424
import org.openqa.selenium.events.EventBus;
2525
import org.openqa.selenium.events.local.GuavaEventBus;
26+
import org.openqa.selenium.grid.data.NewSessionErrorResponse;
27+
import org.openqa.selenium.grid.data.NewSessionRejectedEvent;
2628
import org.openqa.selenium.grid.data.NewSessionRequestEvent;
2729
import org.openqa.selenium.grid.data.RequestId;
2830
import org.openqa.selenium.grid.sessionqueue.NewSessionQueue;
@@ -259,6 +261,27 @@ public void shouldBeAbleToGetQueueContents() {
259261
assertEquals(firefoxCaps, response.get(1));
260262
}
261263

264+
@Test
265+
public void shouldBeAbleToRemoveRequestsOnTimeout() throws InterruptedException {
266+
NewSessionQueue localSessionQueue = new LocalNewSessionQueue(
267+
DefaultTestTracer.createTracer(),
268+
bus,
269+
Duration.ofSeconds(30),
270+
Duration.ofSeconds(1));
271+
272+
CountDownLatch latch = new CountDownLatch(1);
273+
274+
bus.addListener(NewSessionRejectedEvent.listener(reqId -> latch.countDown()));
275+
276+
boolean added = localSessionQueue.offerLast(expectedSessionRequest, requestId);
277+
assertTrue(added);
278+
279+
boolean requestExpired = latch.await(2, TimeUnit.MINUTES);
280+
281+
assertThat(requestExpired).isTrue();
282+
assertThat(localSessionQueue.getQueueSize()).isZero();
283+
}
284+
262285
private HttpRequest createRequest(NewSessionPayload payload, HttpMethod httpMethod, String uri) {
263286
StringBuilder builder = new StringBuilder();
264287
try {

0 commit comments

Comments
 (0)