42
42
import com .google .common .base .MoreObjects ;
43
43
import com .google .common .collect .ImmutableList ;
44
44
import com .google .common .collect .ImmutableSet ;
45
+ import com .google .common .collect .Iterables ;
45
46
import com .google .common .util .concurrent .MoreExecutors ;
46
47
import com .google .monitoring .v3 .CreateTimeSeriesRequest ;
47
48
import com .google .monitoring .v3 .ProjectName ;
53
54
import io .opentelemetry .sdk .metrics .data .MetricData ;
54
55
import io .opentelemetry .sdk .metrics .export .MetricExporter ;
55
56
import java .io .IOException ;
57
+ import java .util .ArrayList ;
56
58
import java .util .Arrays ;
57
59
import java .util .Collection ;
58
60
import java .util .List ;
@@ -85,6 +87,10 @@ public final class BigtableCloudMonitoringExporter implements MetricExporter {
85
87
86
88
private static final String APPLICATION_RESOURCE_PROJECT_ID = "project_id" ;
87
89
90
+ // This the quota limit from Cloud Monitoring. More details in
91
+ // https://cloud.google.com/monitoring/quotas#custom_metrics_quotas.
92
+ private static final int EXPORT_BATCH_SIZE_LIMIT = 200 ;
93
+
88
94
private final MetricServiceClient client ;
89
95
90
96
private final String bigtableProjectId ;
@@ -216,19 +222,12 @@ private CompletableResultCode exportBigtableResourceMetrics(Collection<MetricDat
216
222
}
217
223
218
224
ProjectName projectName = ProjectName .of (bigtableProjectId );
219
- CreateTimeSeriesRequest bigtableRequest =
220
- CreateTimeSeriesRequest .newBuilder ()
221
- .setName (projectName .toString ())
222
- .addAllTimeSeries (bigtableTimeSeries )
223
- .build ();
224
-
225
- ApiFuture <Empty > future =
226
- this .client .createServiceTimeSeriesCallable ().futureCall (bigtableRequest );
225
+ ApiFuture <List <Empty >> future = exportTimeSeries (projectName , bigtableTimeSeries );
227
226
228
227
CompletableResultCode bigtableExportCode = new CompletableResultCode ();
229
228
ApiFutures .addCallback (
230
229
future ,
231
- new ApiFutureCallback <Empty >() {
230
+ new ApiFutureCallback <List < Empty > >() {
232
231
@ Override
233
232
public void onFailure (Throwable throwable ) {
234
233
if (bigtableExportFailureLogged .compareAndSet (false , true )) {
@@ -245,7 +244,7 @@ public void onFailure(Throwable throwable) {
245
244
}
246
245
247
246
@ Override
248
- public void onSuccess (Empty empty ) {
247
+ public void onSuccess (List < Empty > emptyList ) {
249
248
// When an export succeeded reset the export failure flag to false so if there's a
250
249
// transient failure it'll be logged.
251
250
bigtableExportFailureLogged .set (false );
@@ -290,22 +289,17 @@ private CompletableResultCode exportApplicationResourceMetrics(
290
289
291
290
// Construct the request. The project id will be the project id of the detected monitored
292
291
// resource.
293
- ApiFuture <Empty > gceOrGkeFuture ;
292
+ ApiFuture <List < Empty > > gceOrGkeFuture ;
294
293
CompletableResultCode exportCode = new CompletableResultCode ();
295
294
try {
296
295
ProjectName projectName =
297
296
ProjectName .of (applicationResource .getLabelsOrThrow (APPLICATION_RESOURCE_PROJECT_ID ));
298
- CreateTimeSeriesRequest request =
299
- CreateTimeSeriesRequest .newBuilder ()
300
- .setName (projectName .toString ())
301
- .addAllTimeSeries (timeSeries )
302
- .build ();
303
297
304
- gceOrGkeFuture = this . client . createServiceTimeSeriesCallable (). futureCall ( request );
298
+ gceOrGkeFuture = exportTimeSeries ( projectName , timeSeries );
305
299
306
300
ApiFutures .addCallback (
307
301
gceOrGkeFuture ,
308
- new ApiFutureCallback <Empty >() {
302
+ new ApiFutureCallback <List < Empty > >() {
309
303
@ Override
310
304
public void onFailure (Throwable throwable ) {
311
305
if (applicationExportFailureLogged .compareAndSet (false , true )) {
@@ -322,7 +316,7 @@ public void onFailure(Throwable throwable) {
322
316
}
323
317
324
318
@ Override
325
- public void onSuccess (Empty empty ) {
319
+ public void onSuccess (List < Empty > emptyList ) {
326
320
// When an export succeeded reset the export failure flag to false so if there's a
327
321
// transient failure it'll be logged.
328
322
applicationExportFailureLogged .set (false );
@@ -341,6 +335,23 @@ public void onSuccess(Empty empty) {
341
335
return exportCode ;
342
336
}
343
337
338
+ private ApiFuture <List <Empty >> exportTimeSeries (
339
+ ProjectName projectName , List <TimeSeries > timeSeries ) {
340
+ List <ApiFuture <Empty >> batchResults = new ArrayList <>();
341
+
342
+ for (List <TimeSeries > batch : Iterables .partition (timeSeries , EXPORT_BATCH_SIZE_LIMIT )) {
343
+ CreateTimeSeriesRequest req =
344
+ CreateTimeSeriesRequest .newBuilder ()
345
+ .setName (projectName .toString ())
346
+ .addAllTimeSeries (batch )
347
+ .build ();
348
+ ApiFuture <Empty > f = this .client .createServiceTimeSeriesCallable ().futureCall (req );
349
+ batchResults .add (f );
350
+ }
351
+
352
+ return ApiFutures .allAsList (batchResults );
353
+ }
354
+
344
355
@ Override
345
356
public CompletableResultCode flush () {
346
357
if (lastExportCode != null ) {
0 commit comments