Skip to content

Commit ce1b3b1

Browse files
committed
[grid] Add batch size flag for session queue
1 parent 40669b6 commit ce1b3b1

File tree

15 files changed

+107
-43
lines changed

15 files changed

+107
-43
lines changed

java/src/org/openqa/selenium/grid/commands/Hub.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,8 @@ protected Handlers createHandlers(Config config) {
151151
distributorOptions.getSlotMatcher(),
152152
newSessionRequestOptions.getSessionRequestTimeoutPeriod(),
153153
newSessionRequestOptions.getSessionRequestTimeout(),
154-
secret);
154+
secret,
155+
newSessionRequestOptions.getBatchSize());
155156
handler.addHandler(queue);
156157

157158
Distributor distributor = new LocalDistributor(

java/src/org/openqa/selenium/grid/commands/Standalone.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ protected Handlers createHandlers(Config config) {
150150
distributorOptions.getSlotMatcher(),
151151
newSessionRequestOptions.getSessionRequestTimeoutPeriod(),
152152
newSessionRequestOptions.getSessionRequestTimeout(),
153-
registrationSecret);
153+
registrationSecret,
154+
newSessionRequestOptions.getBatchSize());
154155
combinedHandler.addHandler(queue);
155156

156157
Distributor distributor = new LocalDistributor(

java/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueFlags.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Set;
3131

3232
import static org.openqa.selenium.grid.config.StandardGridRoles.SESSION_QUEUE_ROLE;
33+
import static org.openqa.selenium.grid.sessionqueue.config.NewSessionQueueOptions.DEFAULT_BATCH_SIZE;
3334
import static org.openqa.selenium.grid.sessionqueue.config.NewSessionQueueOptions.DEFAULT_REQUEST_TIMEOUT;
3435
import static org.openqa.selenium.grid.sessionqueue.config.NewSessionQueueOptions.DEFAULT_REQUEST_TIMEOUT_PERIOD;
3536
import static org.openqa.selenium.grid.sessionqueue.config.NewSessionQueueOptions.DEFAULT_RETRY_INTERVAL;
@@ -77,6 +78,12 @@ public class NewSessionQueueFlags implements HasRoles {
7778
@ConfigValue(section = SESSION_QUEUE_SECTION, name = "session-retry-interval", example = "15")
7879
private int sessionRetryInterval = DEFAULT_RETRY_INTERVAL;
7980

81+
@Parameter(
82+
names = {"--sessionqueue-batch-size"},
83+
description = "Maximum batch size that can consumed from queue based on the available slots.")
84+
@ConfigValue(section = SESSION_QUEUE_SECTION, name = "sessionqueue-batch-size", example = "20")
85+
private int batchSize = DEFAULT_BATCH_SIZE;
86+
8087
@Override
8188
public Set<Role> getRoles() {
8289
return Collections.singleton(SESSION_QUEUE_ROLE);

java/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class NewSessionQueueOptions {
3737
static final int DEFAULT_REQUEST_TIMEOUT = 300;
3838
static final int DEFAULT_REQUEST_TIMEOUT_PERIOD = 10;
3939
static final int DEFAULT_RETRY_INTERVAL = 15;
40+
static final int DEFAULT_BATCH_SIZE = 10;
4041

4142
private final Config config;
4243

@@ -116,6 +117,16 @@ public Duration getSessionRequestRetryInterval() {
116117
return Duration.ofMillis(interval);
117118
}
118119

120+
public int getBatchSize() {
121+
// If the user sets 0 or less, we default to 10.
122+
int batchSize = Math.max(
123+
config.getInt(SESSION_QUEUE_SECTION, "sessionqueue-batch-size")
124+
.orElse(DEFAULT_BATCH_SIZE),
125+
1);
126+
127+
return batchSize;
128+
}
129+
119130
@ManagedAttribute(name = "RequestTimeoutSeconds")
120131
public long getRequestTimeoutSeconds() {
121132
return getSessionRequestTimeout().getSeconds();

java/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueue.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.common.collect.ImmutableSet;
2323

2424
import org.openqa.selenium.Capabilities;
25+
import org.openqa.selenium.ImmutableCapabilities;
2526
import org.openqa.selenium.SessionNotCreatedException;
2627
import org.openqa.selenium.concurrent.GuardedRunnable;
2728
import org.openqa.selenium.grid.config.Config;
@@ -99,6 +100,7 @@ public class LocalNewSessionQueue extends NewSessionQueue implements Closeable {
99100
private static final String NAME = "Local New Session Queue";
100101
private final SlotMatcher slotMatcher;
101102
private final Duration requestTimeout;
103+
private final int batchSize;
102104
private final Map<RequestId, Data> requests;
103105
private final Map<RequestId, TraceContext> contexts;
104106
private final Deque<SessionRequest> queue;
@@ -115,7 +117,8 @@ public LocalNewSessionQueue(
115117
SlotMatcher slotMatcher,
116118
Duration requestTimeoutCheck,
117119
Duration requestTimeout,
118-
Secret registrationSecret) {
120+
Secret registrationSecret,
121+
int batchSize) {
119122
super(tracer, registrationSecret);
120123

121124
this.slotMatcher = Require.nonNull("Slot matcher", slotMatcher);
@@ -127,6 +130,8 @@ public LocalNewSessionQueue(
127130
this.queue = new ConcurrentLinkedDeque<>();
128131
this.contexts = new ConcurrentHashMap<>();
129132

133+
this.batchSize = Require.positive("Batch size", batchSize);
134+
130135
service.scheduleAtFixedRate(
131136
GuardedRunnable.guard(this::timeoutSessions),
132137
requestTimeoutCheck.toMillis(),
@@ -149,7 +154,8 @@ public static NewSessionQueue create(Config config) {
149154
slotMatcher,
150155
newSessionQueueOptions.getSessionRequestTimeoutPeriod(),
151156
newSessionQueueOptions.getSessionRequestTimeout(),
152-
secretOptions.getRegistrationSecret());
157+
secretOptions.getRegistrationSecret(),
158+
newSessionQueueOptions.getBatchSize());
153159
}
154160

155161
private void timeoutSessions() {
@@ -316,7 +322,7 @@ public List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes
316322
try {
317323
List<SessionRequest> availableRequests = queue.stream()
318324
.filter(req -> req.getDesiredCapabilities().stream().anyMatch(matchesStereotype))
319-
.limit(10) // TODO: Batch size should be configurable via a flag
325+
.limit(batchSize)
320326
.collect(Collectors.toList());
321327

322328
availableRequests.forEach(req -> this.remove(req.getRequestId()));

java/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ public void setUpDistributor() throws MalformedURLException {
110110
new DefaultSlotMatcher(),
111111
Duration.ofSeconds(2),
112112
Duration.ofSeconds(2),
113-
registrationSecret);
113+
registrationSecret,
114+
5);
114115

115116
stereotype = new ImmutableCapabilities("browserName", "gouda");
116117

java/test/org/openqa/selenium/grid/distributor/DistributorTest.java

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ public void setUp() throws URISyntaxException {
136136
new DefaultSlotMatcher(),
137137
Duration.ofSeconds(2),
138138
Duration.ofSeconds(2),
139-
registrationSecret);
139+
registrationSecret,
140+
5);
140141

141142
stereotype = new ImmutableCapabilities("browserName", "cheese");
142143
caps = new ImmutableCapabilities("browserName", "cheese");
@@ -206,7 +207,8 @@ void shouldBeAbleToAddANodeAndCreateASession() {
206207
new DefaultSlotMatcher(),
207208
Duration.ofSeconds(2),
208209
Duration.ofSeconds(2),
209-
registrationSecret);
210+
registrationSecret,
211+
5);
210212
LocalNode node = LocalNode.builder(tracer, bus, routableUri, routableUri, registrationSecret)
211213
.add(
212214
caps,
@@ -248,7 +250,8 @@ void creatingASessionAddsItToTheSessionMap() {
248250
new DefaultSlotMatcher(),
249251
Duration.ofSeconds(2),
250252
Duration.ofSeconds(2),
251-
registrationSecret);
253+
registrationSecret,
254+
5);
252255

253256
LocalNode node = LocalNode.builder(tracer, bus, routableUri, routableUri, registrationSecret)
254257
.add(
@@ -292,7 +295,8 @@ void shouldBeAbleToRemoveANode() throws MalformedURLException {
292295
new DefaultSlotMatcher(),
293296
Duration.ofSeconds(2),
294297
Duration.ofSeconds(2),
295-
registrationSecret);
298+
registrationSecret,
299+
5);
296300

297301
LocalNode node = LocalNode.builder(tracer, bus, routableUri, routableUri, registrationSecret)
298302
.add(
@@ -333,7 +337,8 @@ void testDrainingNodeDoesNotAcceptNewSessions() {
333337
new DefaultSlotMatcher(),
334338
Duration.ofSeconds(2),
335339
Duration.ofSeconds(2),
336-
registrationSecret);
340+
registrationSecret,
341+
5);
337342
LocalNode node = LocalNode.builder(tracer, bus, routableUri, routableUri, registrationSecret)
338343
.add(
339344
caps,
@@ -369,7 +374,8 @@ void testDrainedNodeShutsDownOnceEmpty() throws InterruptedException {
369374
new DefaultSlotMatcher(),
370375
Duration.ofSeconds(2),
371376
Duration.ofSeconds(2),
372-
registrationSecret);
377+
registrationSecret,
378+
5);
373379
LocalNode node = LocalNode.builder(tracer, bus, routableUri, routableUri, registrationSecret)
374380
.add(
375381
caps,
@@ -415,7 +421,8 @@ void drainedNodeDoesNotShutDownIfNotEmpty() throws InterruptedException {
415421
new DefaultSlotMatcher(),
416422
Duration.ofSeconds(2),
417423
Duration.ofSeconds(2),
418-
registrationSecret);
424+
registrationSecret,
425+
5);
419426
LocalNode node = LocalNode.builder(tracer, bus, routableUri, routableUri, registrationSecret)
420427
.add(
421428
caps,
@@ -461,7 +468,8 @@ void drainedNodeShutsDownAfterSessionsFinish() throws InterruptedException {
461468
new DefaultSlotMatcher(),
462469
Duration.ofSeconds(2),
463470
Duration.ofSeconds(2),
464-
registrationSecret);
471+
registrationSecret,
472+
5);
465473
LocalNode node = LocalNode.builder(tracer, bus, routableUri, routableUri, registrationSecret)
466474
.add(
467475
caps,
@@ -551,7 +559,8 @@ void theMostLightlyLoadedNodeIsSelectedFirst() {
551559
new DefaultSlotMatcher(),
552560
Duration.ofSeconds(2),
553561
Duration.ofSeconds(2),
554-
registrationSecret);
562+
registrationSecret,
563+
5);
555564

556565
Node lightest = createNode(caps, 10, 0);
557566
Node medium = createNode(caps, 10, 4);
@@ -600,7 +609,8 @@ void shouldUseLastSessionCreatedTimeAsTieBreaker() {
600609
new DefaultSlotMatcher(),
601610
Duration.ofSeconds(2),
602611
Duration.ofSeconds(2),
603-
registrationSecret);
612+
registrationSecret,
613+
5);
604614
Node leastRecent = createNode(caps, 5, 0);
605615

606616
CombinedHandler handler = new CombinedHandler();
@@ -679,7 +689,8 @@ void shouldIncludeHostsThatAreUpInHostList() {
679689
new DefaultSlotMatcher(),
680690
Duration.ofSeconds(2),
681691
Duration.ofSeconds(2),
682-
registrationSecret);
692+
registrationSecret,
693+
5);
683694
handler.addHandler(sessions);
684695

685696
URI uri = createUri();
@@ -737,7 +748,8 @@ void shouldNotScheduleAJobIfAllSlotsAreBeingUsed() {
737748
new DefaultSlotMatcher(),
738749
Duration.ofSeconds(2),
739750
Duration.ofSeconds(2),
740-
registrationSecret);
751+
registrationSecret,
752+
5);
741753

742754
LocalNode node = LocalNode.builder(tracer, bus, routableUri, routableUri, registrationSecret)
743755
.add(caps, new TestSessionFactory((id, c) -> new Session(
@@ -777,7 +789,8 @@ void shouldReleaseSlotOnceSessionEnds() {
777789
new DefaultSlotMatcher(),
778790
Duration.ofSeconds(2),
779791
Duration.ofSeconds(2),
780-
registrationSecret);
792+
registrationSecret,
793+
5);
781794

782795
LocalNode node = LocalNode.builder(tracer, bus, routableUri, routableUri, registrationSecret)
783796
.add(caps, new TestSessionFactory((id, c) -> new Session(
@@ -837,7 +850,8 @@ void shouldNotStartASessionIfTheCapabilitiesAreNotSupported() {
837850
new DefaultSlotMatcher(),
838851
Duration.ofSeconds(2),
839852
Duration.ofSeconds(2),
840-
registrationSecret);
853+
registrationSecret,
854+
5);
841855
handler.addHandler(sessions);
842856

843857
Distributor distributor = new LocalDistributor(
@@ -873,7 +887,8 @@ void attemptingToStartASessionWhichFailsMarksAsTheSlotAsAvailable() {
873887
new DefaultSlotMatcher(),
874888
Duration.ofSeconds(2),
875889
Duration.ofSeconds(2),
876-
registrationSecret);
890+
registrationSecret,
891+
5);
877892

878893
LocalNode node = LocalNode.builder(tracer, bus, routableUri, routableUri, registrationSecret)
879894
.add(caps, new TestSessionFactory((id, caps) -> {
@@ -915,7 +930,8 @@ void shouldReturnNodesThatWereDownToPoolOfNodesOnceTheyMarkTheirHealthCheckPasse
915930
new DefaultSlotMatcher(),
916931
Duration.ofSeconds(2),
917932
Duration.ofSeconds(2),
918-
registrationSecret);
933+
registrationSecret,
934+
5);
919935

920936
URI uri = createUri();
921937
Node node = LocalNode.builder(tracer, bus, uri, uri, registrationSecret)
@@ -973,7 +989,8 @@ void shouldNotRemoveNodeWhoseHealthCheckPassesBeforeThreshold()
973989
new DefaultSlotMatcher(),
974990
Duration.ofSeconds(2),
975991
Duration.ofSeconds(2),
976-
registrationSecret);
992+
registrationSecret,
993+
5);
977994

978995
URI uri = createUri();
979996
Node node = LocalNode.builder(tracer, bus, uri, uri, registrationSecret)
@@ -1050,7 +1067,8 @@ void shouldPrioritizeHostsWithTheMostSlotsAvailableForASessionType() {
10501067
new DefaultSlotMatcher(),
10511068
Duration.ofSeconds(2),
10521069
Duration.ofSeconds(2),
1053-
registrationSecret);
1070+
registrationSecret,
1071+
5);
10541072

10551073
LocalDistributor distributor = new LocalDistributor(
10561074
tracer,

java/test/org/openqa/selenium/grid/distributor/local/LocalDistributorTest.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,8 @@ void testAddNodeToDistributor() {
113113
new DefaultSlotMatcher(),
114114
Duration.ofSeconds(2),
115115
Duration.ofSeconds(2),
116-
registrationSecret);
116+
registrationSecret,
117+
5);
117118
Distributor distributor = new LocalDistributor(
118119
tracer,
119120
bus,
@@ -146,7 +147,8 @@ void testRemoveNodeFromDistributor() {
146147
new DefaultSlotMatcher(),
147148
Duration.ofSeconds(2),
148149
Duration.ofSeconds(2),
149-
registrationSecret);
150+
registrationSecret,
151+
5);
150152
Distributor distributor = new LocalDistributor(
151153
tracer,
152154
bus,
@@ -180,7 +182,8 @@ void testAddSameNodeTwice() {
180182
new DefaultSlotMatcher(),
181183
Duration.ofSeconds(2),
182184
Duration.ofSeconds(2),
183-
registrationSecret);
185+
registrationSecret,
186+
5);
184187
Distributor distributor = new LocalDistributor(
185188
tracer,
186189
bus,
@@ -209,7 +212,8 @@ void shouldBeAbleToAddMultipleSessionsConcurrently() throws Exception {
209212
new DefaultSlotMatcher(),
210213
Duration.ofSeconds(2),
211214
Duration.ofSeconds(2),
212-
registrationSecret);
215+
registrationSecret,
216+
5);
213217

214218

215219
// Add one node to ensure that everything is created in that.
@@ -298,7 +302,8 @@ void testDrainNodeFromDistributor() {
298302
new DefaultSlotMatcher(),
299303
Duration.ofSeconds(2),
300304
Duration.ofSeconds(2),
301-
registrationSecret);
305+
registrationSecret,
306+
5);
302307
Distributor distributor = new LocalDistributor(
303308
tracer,
304309
bus,
@@ -339,7 +344,8 @@ void testDrainNodeFromNode() {
339344
new DefaultSlotMatcher(),
340345
Duration.ofSeconds(2),
341346
Duration.ofSeconds(2),
342-
registrationSecret);
347+
registrationSecret,
348+
5);
343349
Distributor distributor = new LocalDistributor(
344350
tracer,
345351
bus,
@@ -365,7 +371,8 @@ void slowStartingNodesShouldNotCauseReservationsToBeSerialized() {
365371
new DefaultSlotMatcher(),
366372
Duration.ofSeconds(2),
367373
Duration.ofSeconds(2),
368-
registrationSecret);
374+
registrationSecret,
375+
5);
369376

370377
LocalDistributor distributor = new LocalDistributor(
371378
tracer,

java/test/org/openqa/selenium/grid/graphql/GraphqlHandlerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ public void setupGrid() {
122122
new DefaultSlotMatcher(),
123123
Duration.ofSeconds(2),
124124
Duration.ofSeconds(2),
125-
registrationSecret);
125+
registrationSecret,
126+
5);
126127

127128
distributor = new LocalDistributor(
128129
tracer,

0 commit comments

Comments
 (0)