Skip to content

Commit 60ad7f3

Browse files
authored
fix(bigquery): processStream check ctx done when queuing non retryable err (#10675)
1 parent 1afb9ee commit 60ad7f3

File tree

2 files changed

+41
-21
lines changed

2 files changed

+41
-21
lines changed

bigquery/storage_iterator.go

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -244,40 +244,49 @@ func (it *storageArrowIterator) processStream(readStream string) {
244244
Offset: offset,
245245
})
246246
if err != nil {
247-
if it.rs.ctx.Err() != nil { // context cancelled, don't try again
247+
serr := it.handleProcessStreamError(readStream, bo, err)
248+
if serr != nil {
248249
return
249250
}
250-
backoff, shouldRetry := retryReadRows(bo, err)
251-
if shouldRetry {
252-
if err := gax.Sleep(it.rs.ctx, backoff); err != nil {
253-
return // context cancelled
254-
}
255-
continue
256-
}
257-
it.errs <- fmt.Errorf("failed to read rows on stream %s: %w", readStream, err)
258251
continue
259252
}
260253
offset, err = it.consumeRowStream(readStream, rowStream, offset)
261254
if errors.Is(err, io.EOF) {
262255
return
263256
}
264257
if err != nil {
265-
if it.rs.ctx.Err() != nil { // context cancelled, don't queue error
258+
serr := it.handleProcessStreamError(readStream, bo, err)
259+
if serr != nil {
266260
return
267261
}
268-
backoff, shouldRetry := retryReadRows(bo, err)
269-
if shouldRetry {
270-
if err := gax.Sleep(it.rs.ctx, backoff); err != nil {
271-
return // context cancelled
272-
}
273-
continue
274-
}
275-
it.errs <- fmt.Errorf("failed to read rows on stream %s: %w", readStream, err)
276262
// try to re-open row stream with updated offset
277263
}
278264
}
279265
}
280266

267+
// handleProcessStreamError check if err is retryable,
268+
// waiting with exponential backoff in that scenario.
269+
// If error is not retryable, queue up err to be sent to user.
270+
// Return error if should exit the goroutine.
271+
func (it *storageArrowIterator) handleProcessStreamError(readStream string, bo gax.Backoff, err error) error {
272+
if it.rs.ctx.Err() != nil { // context cancelled, don't try again
273+
return it.rs.ctx.Err()
274+
}
275+
backoff, shouldRetry := retryReadRows(bo, err)
276+
if shouldRetry {
277+
if err := gax.Sleep(it.rs.ctx, backoff); err != nil {
278+
return err // context cancelled
279+
}
280+
return nil
281+
}
282+
select {
283+
case it.errs <- fmt.Errorf("failed to read rows on stream %s: %w", readStream, err):
284+
return nil
285+
case <-it.rs.ctx.Done():
286+
return context.Canceled
287+
}
288+
}
289+
281290
func retryReadRows(bo gax.Backoff, err error) (time.Duration, bool) {
282291
s, ok := status.FromError(err)
283292
if !ok {

bigquery/storage_iterator_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ import (
2929
)
3030

3131
func TestStorageIteratorRetry(t *testing.T) {
32+
settings := defaultReadClientSettings()
33+
randomErrors := []error{} // generate more errors than the # of workers
34+
for i := 0; i < settings.maxWorkerCount+2; i++ {
35+
randomErrors = append(randomErrors, fmt.Errorf("random error %d", i))
36+
}
3237
cancelledCtx, cancel := context.WithCancel(context.Background())
3338
cancel()
3439
testCases := []struct {
@@ -84,6 +89,11 @@ func TestStorageIteratorRetry(t *testing.T) {
8489
},
8590
wantFail: true,
8691
},
92+
{
93+
desc: "filled with non-retryable errors and context cancelled",
94+
errors: randomErrors,
95+
wantFail: true,
96+
},
8797
}
8898
for _, tc := range testCases {
8999
rrc := &testReadRowsClient{
@@ -115,17 +125,18 @@ func TestStorageIteratorRetry(t *testing.T) {
115125

116126
it, err := newRawStorageRowIterator(&readSession{
117127
ctx: ctx,
118-
settings: defaultReadClientSettings(),
119-
readRowsFunc: readRowsFunc,
128+
settings: settings,
120129
bqSession: &storagepb.ReadSession{},
130+
readRowsFunc: readRowsFunc,
121131
}, Schema{})
122132
if err != nil {
123133
t.Fatalf("case %s: newRawStorageRowIterator: %v", tc.desc, err)
124134
}
125135

126136
it.processStream("test-stream")
127137

128-
if errors.Is(it.rs.ctx.Err(), context.Canceled) || errors.Is(it.rs.ctx.Err(), context.DeadlineExceeded) {
138+
if errors.Is(it.rs.ctx.Err(), context.Canceled) ||
139+
errors.Is(it.rs.ctx.Err(), context.DeadlineExceeded) {
129140
if tc.wantFail {
130141
continue
131142
}

0 commit comments

Comments
 (0)