Skip to content

Commit 54fcf36

Browse files
authored
feat(bigquery/storage/managedwriter): refine connection metrics (#8324)
This PR changes metrics instrumentation in two ways: The AppendClientOpenView is now tagged with an error dimension, so that failures to open a connection are clearer. We use rpc status for the value, with the expectation that non-rpc errors are tagged as Unknown. A new metric (and view), AppendRequestReconnectsView provides additional visibility into when errors during Send trigger reconnection of the underlying connection. We attempt to also attribute this to the origin writer, so it may be tagged by error and source stream. Towards: #8311
1 parent 04d6264 commit 54fcf36

File tree

3 files changed

+30
-3
lines changed

3 files changed

+30
-3
lines changed

bigquery/storage/managedwriter/connection.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,18 @@ func (cp *connectionPool) mergeCallOptions(co *connection) []gax.CallOption {
136136
func (cp *connectionPool) openWithRetry(co *connection) (storagepb.BigQueryWrite_AppendRowsClient, chan *pendingWrite, error) {
137137
r := &unaryRetryer{}
138138
for {
139-
recordStat(cp.ctx, AppendClientOpenCount, 1)
140139
arc, err := cp.open(co.ctx, cp.mergeCallOptions(co)...)
140+
metricCtx := cp.ctx
141+
if err == nil {
142+
// accumulate AppendClientOpenCount for the success case.
143+
recordStat(metricCtx, AppendClientOpenCount, 1)
144+
}
141145
if err != nil {
146+
if tagCtx, tagErr := tag.New(cp.ctx, tag.Insert(keyError, grpcstatus.Code(err).String())); tagErr == nil {
147+
metricCtx = tagCtx
148+
}
149+
// accumulate AppendClientOpenCount for the error case.
150+
recordStat(metricCtx, AppendClientOpenCount, 1)
142151
bo, shouldRetry := r.Retry(err)
143152
if shouldRetry {
144153
recordStat(cp.ctx, AppendClientOpenRetryCount, 1)
@@ -396,6 +405,14 @@ func (co *connection) lockingAppend(pw *pendingWrite) error {
396405
}
397406
if err != nil {
398407
if shouldReconnect(err) {
408+
metricCtx := co.ctx // start with the ctx that must be present
409+
if pw.writer != nil {
410+
metricCtx = pw.writer.ctx // the writer ctx bears the stream/origin tagging, so prefer it.
411+
}
412+
if tagCtx, tagErr := tag.New(metricCtx, tag.Insert(keyError, grpcstatus.Code(err).String())); tagErr == nil {
413+
metricCtx = tagCtx
414+
}
415+
recordStat(metricCtx, AppendRequestReconnects, 1)
399416
// if we think this connection is unhealthy, force a reconnect on the next send.
400417
co.reconnect = true
401418
}

bigquery/storage/managedwriter/instrumentation.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ var (
6161
// It is EXPERIMENTAL and subject to change or removal without notice.
6262
AppendRequestErrors = stats.Int64(statsPrefix+"append_request_errors", "Number of append requests that yielded immediate error", stats.UnitDimensionless)
6363

64+
// AppendRequestReconnects is a measure of the number of times that sending an append request triggered reconnect.
65+
// It is EXPERIMENTAL and subject to change or removal without notice.
66+
AppendRequestReconnects = stats.Int64(statsPrefix+"append_reconnections", "Number of append rows reconnections", stats.UnitDimensionless)
67+
6468
// AppendRequestRows is a measure of the number of append rows sent.
6569
// It is EXPERIMENTAL and subject to change or removal without notice.
6670
AppendRequestRows = stats.Int64(statsPrefix+"append_rows", "Number of append rows sent", stats.UnitDimensionless)
@@ -105,6 +109,10 @@ var (
105109
// It is EXPERIMENTAL and subject to change or removal without notice.
106110
AppendRequestErrorsView *view.View
107111

112+
// AppendRequestReconnectsView is a cumulative sum of AppendRequestReconnects.
113+
// It is EXPERIMENTAL and subject to change or removal without notice.
114+
AppendRequestReconnectsView *view.View
115+
108116
// AppendRequestRowsView is a cumulative sum of AppendRows.
109117
// It is EXPERIMENTAL and subject to change or removal without notice.
110118
AppendRequestRowsView *view.View
@@ -127,12 +135,13 @@ var (
127135
)
128136

129137
func init() {
130-
AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount))
138+
AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyError)
131139
AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount))
132140

133141
AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin)
134142
AppendRequestBytesView = createSumView(stats.Measure(AppendRequestBytes), keyStream, keyDataOrigin)
135143
AppendRequestErrorsView = createSumView(stats.Measure(AppendRequestErrors), keyStream, keyDataOrigin, keyError)
144+
AppendRequestReconnectsView = createSumView(stats.Measure(AppendRequestReconnects), keyStream, keyDataOrigin, keyError)
136145
AppendRequestRowsView = createSumView(stats.Measure(AppendRequestRows), keyStream, keyDataOrigin)
137146

138147
AppendResponsesView = createSumView(stats.Measure(AppendResponses), keyStream, keyDataOrigin)
@@ -147,6 +156,7 @@ func init() {
147156
AppendRequestsView,
148157
AppendRequestBytesView,
149158
AppendRequestErrorsView,
159+
AppendRequestReconnectsView,
150160
AppendRequestRowsView,
151161

152162
AppendResponsesView,

bigquery/storage/managedwriter/integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1066,7 +1066,7 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq
10661066

10671067
// metric to key tag names
10681068
wantTags := map[string][]string{
1069-
"cloud.google.com/go/bigquery/storage/managedwriter/stream_open_count": nil,
1069+
"cloud.google.com/go/bigquery/storage/managedwriter/stream_open_count": {"error"},
10701070
"cloud.google.com/go/bigquery/storage/managedwriter/stream_open_retry_count": nil,
10711071
"cloud.google.com/go/bigquery/storage/managedwriter/append_requests": {"streamID"},
10721072
"cloud.google.com/go/bigquery/storage/managedwriter/append_request_bytes": {"streamID"},

0 commit comments

Comments
 (0)