Skip to content

Commit eb21c64

Browse files
authored
api, core: make scheduled executor service accessible for NameResolver.Args (#6455)
Added new API on NameResolver.Args to access ScheduledExecutorService, which is wrapped transport executor.
1 parent 81efecd commit eb21c64

File tree

3 files changed

+54
-12
lines changed

3 files changed

+54
-12
lines changed

api/src/main/java/io/grpc/NameResolver.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.List;
3131
import java.util.Map;
3232
import java.util.concurrent.Executor;
33+
import java.util.concurrent.ScheduledExecutorService;
3334
import javax.annotation.Nullable;
3435
import javax.annotation.concurrent.ThreadSafe;
3536

@@ -412,6 +413,7 @@ public static final class Args {
412413
private final ProxyDetector proxyDetector;
413414
private final SynchronizationContext syncContext;
414415
private final ServiceConfigParser serviceConfigParser;
416+
@Nullable private final ScheduledExecutorService scheduledExecutorService;
415417
@Nullable private final ChannelLogger channelLogger;
416418
@Nullable private final Executor executor;
417419

@@ -420,12 +422,14 @@ private Args(
420422
ProxyDetector proxyDetector,
421423
SynchronizationContext syncContext,
422424
ServiceConfigParser serviceConfigParser,
425+
@Nullable ScheduledExecutorService scheduledExecutorService,
423426
@Nullable ChannelLogger channelLogger,
424427
@Nullable Executor executor) {
425428
this.defaultPort = checkNotNull(defaultPort, "defaultPort not set");
426429
this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set");
427430
this.syncContext = checkNotNull(syncContext, "syncContext not set");
428431
this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser not set");
432+
this.scheduledExecutorService = scheduledExecutorService;
429433
this.channelLogger = channelLogger;
430434
this.executor = executor;
431435
}
@@ -460,6 +464,25 @@ public SynchronizationContext getSynchronizationContext() {
460464
return syncContext;
461465
}
462466

467+
/**
468+
* Returns a {@link ScheduledExecutorService} for scheduling delayed tasks.
469+
*
470+
* <p>This service is a shared resource and is only meant for quick tasks. DO NOT block or run
471+
* time-consuming tasks.
472+
*
473+
* <p>The returned service doesn't support {@link ScheduledExecutorService#shutdown shutdown()}
474+
* and {@link ScheduledExecutorService#shutdownNow shutdownNow()}. They will throw if called.
475+
*
476+
* @since 1.26.0
477+
*/
478+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/6454")
479+
public ScheduledExecutorService getScheduledExecutorService() {
480+
if (scheduledExecutorService == null) {
481+
throw new IllegalStateException("ScheduledExecutorService not set in Builder");
482+
}
483+
return scheduledExecutorService;
484+
}
485+
463486
/**
464487
* Returns the {@link ServiceConfigParser}.
465488
*
@@ -501,6 +524,7 @@ public String toString() {
501524
.add("proxyDetector", proxyDetector)
502525
.add("syncContext", syncContext)
503526
.add("serviceConfigParser", serviceConfigParser)
527+
.add("scheduledExecutorService", scheduledExecutorService)
504528
.add("channelLogger", channelLogger)
505529
.add("executor", executor)
506530
.toString();
@@ -517,6 +541,7 @@ public Builder toBuilder() {
517541
builder.setProxyDetector(proxyDetector);
518542
builder.setSynchronizationContext(syncContext);
519543
builder.setServiceConfigParser(serviceConfigParser);
544+
builder.setScheduledExecutorService(scheduledExecutorService);
520545
builder.setChannelLogger(channelLogger);
521546
builder.setOffloadExecutor(executor);
522547
return builder;
@@ -541,6 +566,7 @@ public static final class Builder {
541566
private ProxyDetector proxyDetector;
542567
private SynchronizationContext syncContext;
543568
private ServiceConfigParser serviceConfigParser;
569+
private ScheduledExecutorService scheduledExecutorService;
544570
private ChannelLogger channelLogger;
545571
private Executor executor;
546572

@@ -577,6 +603,16 @@ public Builder setSynchronizationContext(SynchronizationContext syncContext) {
577603
return this;
578604
}
579605

606+
/**
607+
* See {@link Args#getScheduledExecutorService}.
608+
*/
609+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/6454")
610+
public Builder setScheduledExecutorService(
611+
ScheduledExecutorService scheduledExecutorService) {
612+
this.scheduledExecutorService = checkNotNull(scheduledExecutorService);
613+
return this;
614+
}
615+
580616
/**
581617
* See {@link Args#getServiceConfigParser}. This is a required field.
582618
*
@@ -618,7 +654,7 @@ public Args build() {
618654
return
619655
new Args(
620656
defaultPort, proxyDetector, syncContext, serviceConfigParser,
621-
channelLogger, executor);
657+
scheduledExecutorService, channelLogger, executor);
622658
}
623659
}
624660
}

api/src/test/java/io/grpc/NameResolverTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Map;
3232
import java.util.concurrent.Executor;
3333
import java.util.concurrent.Executors;
34+
import java.util.concurrent.ScheduledExecutorService;
3435
import java.util.concurrent.atomic.AtomicReference;
3536
import org.junit.Before;
3637
import org.junit.Test;
@@ -45,6 +46,8 @@ public class NameResolverTest {
4546
private final SynchronizationContext syncContext =
4647
new SynchronizationContext(mock(UncaughtExceptionHandler.class));
4748
private final ServiceConfigParser parser = mock(ServiceConfigParser.class);
49+
private final ScheduledExecutorService scheduledExecutorService =
50+
mock(ScheduledExecutorService.class);
4851
private final ChannelLogger channelLogger = mock(ChannelLogger.class);
4952
private final Executor executor = Executors.newSingleThreadExecutor();
5053
private URI uri;
@@ -62,6 +65,7 @@ public void args() {
6265
assertThat(args.getProxyDetector()).isSameInstanceAs(proxyDetector);
6366
assertThat(args.getSynchronizationContext()).isSameInstanceAs(syncContext);
6467
assertThat(args.getServiceConfigParser()).isSameInstanceAs(parser);
68+
assertThat(args.getScheduledExecutorService()).isSameInstanceAs(scheduledExecutorService);
6569
assertThat(args.getChannelLogger()).isSameInstanceAs(channelLogger);
6670
assertThat(args.getOffloadExecutor()).isSameInstanceAs(executor);
6771

@@ -70,6 +74,7 @@ public void args() {
7074
assertThat(args2.getProxyDetector()).isSameInstanceAs(proxyDetector);
7175
assertThat(args2.getSynchronizationContext()).isSameInstanceAs(syncContext);
7276
assertThat(args2.getServiceConfigParser()).isSameInstanceAs(parser);
77+
assertThat(args2.getScheduledExecutorService()).isSameInstanceAs(scheduledExecutorService);
7378
assertThat(args2.getChannelLogger()).isSameInstanceAs(channelLogger);
7479
assertThat(args2.getOffloadExecutor()).isSameInstanceAs(executor);
7580

@@ -254,6 +259,7 @@ private NameResolver.Args createArgs() {
254259
.setProxyDetector(proxyDetector)
255260
.setSynchronizationContext(syncContext)
256261
.setServiceConfigParser(parser)
262+
.setScheduledExecutorService(scheduledExecutorService)
257263
.setChannelLogger(channelLogger)
258264
.setOffloadExecutor(executor)
259265
.build();

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
137137
private final NameResolver.Args nameResolverArgs;
138138
private final AutoConfiguredLoadBalancerFactory loadBalancerFactory;
139139
private final ClientTransportFactory transportFactory;
140-
private final ScheduledExecutorForBalancer scheduledExecutorForBalancer;
140+
private final RestrictedScheduledExecutor scheduledExecutor;
141141
private final Executor executor;
142142
private final ObjectPool<? extends Executor> executorPool;
143143
private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
@@ -562,6 +562,12 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new
562562
this.target = checkNotNull(builder.target, "target");
563563
this.logId = InternalLogId.allocate("Channel", target);
564564
this.timeProvider = checkNotNull(timeProvider, "timeProvider");
565+
this.executorPool = checkNotNull(builder.executorPool, "executorPool");
566+
this.executor = checkNotNull(executorPool.getObject(), "executor");
567+
this.transportFactory =
568+
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
569+
this.scheduledExecutor =
570+
new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
565571
maxTraceEvents = builder.maxTraceEvents;
566572
channelTracer = new ChannelTracer(
567573
logId, builder.maxTraceEvents, timeProvider.currentTimeNanos(),
@@ -581,6 +587,7 @@ ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata new
581587
.setDefaultPort(builder.getDefaultPort())
582588
.setProxyDetector(proxyDetector)
583589
.setSynchronizationContext(syncContext)
590+
.setScheduledExecutorService(scheduledExecutor)
584591
.setServiceConfigParser(
585592
new ScParser(
586593
retryEnabled,
@@ -598,18 +605,11 @@ public void execute(Runnable command) {
598605
})
599606
.build();
600607
this.nameResolver = getNameResolver(target, nameResolverFactory, nameResolverArgs);
601-
this.executorPool = checkNotNull(builder.executorPool, "executorPool");
602608
this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
603609
this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
604-
this.executor = checkNotNull(executorPool.getObject(), "executor");
605610
this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
606611
this.delayedTransport.start(delayedTransportListener);
607612
this.backoffPolicyProvider = backoffPolicyProvider;
608-
this.transportFactory =
609-
new CallCredentialsApplyingTransportFactory(clientTransportFactory, this.executor);
610-
this.scheduledExecutorForBalancer =
611-
new ScheduledExecutorForBalancer(transportFactory.getScheduledExecutorService());
612-
613613
serviceConfigInterceptor = new ServiceConfigInterceptor(
614614
retryEnabled, builder.maxRetryAttempts, builder.maxHedgedAttempts);
615615
this.defaultServiceConfig = builder.defaultServiceConfig;
@@ -1269,7 +1269,7 @@ public SynchronizationContext getSynchronizationContext() {
12691269

12701270
@Override
12711271
public ScheduledExecutorService getScheduledExecutorService() {
1272-
return scheduledExecutorForBalancer;
1272+
return scheduledExecutor;
12731273
}
12741274

12751275
@Override
@@ -1736,10 +1736,10 @@ synchronized void release() {
17361736
}
17371737
}
17381738

1739-
private static final class ScheduledExecutorForBalancer implements ScheduledExecutorService {
1739+
private static final class RestrictedScheduledExecutor implements ScheduledExecutorService {
17401740
final ScheduledExecutorService delegate;
17411741

1742-
private ScheduledExecutorForBalancer(ScheduledExecutorService delegate) {
1742+
private RestrictedScheduledExecutor(ScheduledExecutorService delegate) {
17431743
this.delegate = checkNotNull(delegate, "delegate");
17441744
}
17451745

0 commit comments

Comments
 (0)