Skip to content

Commit c73963f

Browse files
authored
fix(bigquery): handle storage read api Recv call errors (#8666)
Initial bug was found when the Storage Read API is called with a more restrict IAM/Role, which can cause an user to be able to create a ReadSession but not read from it (missing `bigquery.readsessions.getData` permission). This would make the process of reading the `read_streams` enter a retry loop because errors coming from the `Recv` calls are not handled properly, just the `ReadRows` call. This PR fixes this behavior. Was reported on #8660 and tested locally by creating a custom role with the given configuration: ![image](https://togithub.com/googleapis/google-cloud-go/assets/1615543/b6dfdecf-5bb0-497f-8fcb-df8a8bdf1e3b) Example of error: ``` failed to fetch via storage API: failed to read rows on stream projects/xxx/locations/us/sessions/yyy/streams/zzz: failed to consume rows on stream projects/xxx/locations/us/sessions/yyy/streams/zzz: rpc error: code = PermissionDenied desc = there was an error operating on 'projects/xxx/locations/us/sessions/yyy/streams/zzz': the user does not have 'bigquery.readsessions.getData' permission for 'projects/xxx/locations/us/sessions/yyy/streams/zzz ``` With the fix on this PR, now the processing of the stream stops and errors can be returned (like the PERMISSION_DENIED error in this scenario).
1 parent 65cb8bd commit c73963f

File tree

2 files changed

+63
-32
lines changed

2 files changed

+63
-32
lines changed

bigquery/storage_iterator.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,14 @@ func (it *arrowIterator) processStream(readStream string) {
270270
if it.session.ctx.Err() != nil { // context cancelled, don't queue error
271271
return
272272
}
273+
backoff, shouldRetry := retryReadRows(bo, err)
274+
if shouldRetry {
275+
if err := gax.Sleep(it.ctx, backoff); err != nil {
276+
return // context cancelled
277+
}
278+
continue
279+
}
280+
it.errs <- fmt.Errorf("failed to read rows on stream %s: %w", readStream, err)
273281
// try to re-open row stream with updated offset
274282
}
275283
}

bigquery/storage_iterator_test.go

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestStorageIteratorRetry(t *testing.T) {
3939
}{
4040
{
4141
desc: "no error",
42-
errors: []error{nil},
42+
errors: []error{},
4343
wantFail: false,
4444
},
4545
{
@@ -49,10 +49,16 @@ func TestStorageIteratorRetry(t *testing.T) {
4949
status.Errorf(codes.Unavailable, "try 2"),
5050
status.Errorf(codes.Canceled, "try 3"),
5151
status.Errorf(codes.Internal, "try 4"),
52-
nil,
5352
},
5453
wantFail: false,
5554
},
55+
{
56+
desc: "not enough permission",
57+
errors: []error{
58+
status.Errorf(codes.PermissionDenied, "the user does not have 'bigquery.readsessions.getData' permission"),
59+
},
60+
wantFail: true,
61+
},
5662
{
5763
desc: "permanent error",
5864
errors: []error{
@@ -71,18 +77,12 @@ func TestStorageIteratorRetry(t *testing.T) {
7177
wantFail: true,
7278
},
7379
}
74-
7580
for _, tc := range testCases {
76-
baseCtx := tc.ctx
77-
if baseCtx == nil {
78-
baseCtx = context.Background()
81+
rrc := &testReadRowsClient{
82+
errors: tc.errors,
7983
}
80-
ctx, cancel := context.WithTimeout(baseCtx, 5*time.Second)
81-
defer cancel()
82-
it, err := newRawStorageRowIterator(&readSession{
83-
ctx: ctx,
84-
settings: defaultReadClientSettings(),
85-
readRowsFunc: func(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
84+
readRowsFuncs := map[string]func(context.Context, *storagepb.ReadRowsRequest, ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error){
85+
"readRows fail on first call": func(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
8686
if len(tc.errors) == 0 {
8787
return &testReadRowsClient{}, nil
8888
}
@@ -93,39 +93,62 @@ func TestStorageIteratorRetry(t *testing.T) {
9393
}
9494
return &testReadRowsClient{}, nil
9595
},
96-
bqSession: &storagepb.ReadSession{},
97-
})
98-
if err != nil {
99-
t.Fatalf("case %s: newRawStorageRowIterator: %v", tc.desc, err)
96+
"readRows fails on Recv": func(ctx context.Context, req *storagepb.ReadRowsRequest, opts ...gax.CallOption) (storagepb.BigQueryRead_ReadRowsClient, error) {
97+
return rrc, nil
98+
},
10099
}
100+
for readRowsFuncType, readRowsFunc := range readRowsFuncs {
101+
baseCtx := tc.ctx
102+
if baseCtx == nil {
103+
baseCtx = context.Background()
104+
}
105+
ctx, cancel := context.WithTimeout(baseCtx, 5*time.Second)
106+
defer cancel()
101107

102-
it.processStream("test-stream")
108+
it, err := newRawStorageRowIterator(&readSession{
109+
ctx: ctx,
110+
settings: defaultReadClientSettings(),
111+
readRowsFunc: readRowsFunc,
112+
bqSession: &storagepb.ReadSession{},
113+
})
114+
if err != nil {
115+
t.Fatalf("case %s: newRawStorageRowIterator: %v", tc.desc, err)
116+
}
117+
118+
it.processStream("test-stream")
103119

104-
if errors.Is(it.ctx.Err(), context.Canceled) || errors.Is(it.ctx.Err(), context.DeadlineExceeded) {
105-
if tc.wantFail {
106-
continue
120+
if errors.Is(it.ctx.Err(), context.Canceled) || errors.Is(it.ctx.Err(), context.DeadlineExceeded) {
121+
if tc.wantFail {
122+
continue
123+
}
124+
t.Fatalf("case %s(%s): deadline exceeded", tc.desc, readRowsFuncType)
125+
}
126+
if tc.wantFail && len(it.errs) == 0 {
127+
t.Fatalf("case %s(%s):want test to fail, but found no errors", tc.desc, readRowsFuncType)
128+
}
129+
if !tc.wantFail && len(it.errs) > 0 {
130+
t.Fatalf("case %s(%s):test should not fail, but found %d errors", tc.desc, readRowsFuncType, len(it.errs))
107131
}
108-
t.Fatalf("case %s: deadline exceeded", tc.desc)
109-
}
110-
if tc.wantFail && len(it.errs) == 0 {
111-
t.Fatalf("case %s:want test to fail, but found no errors", tc.desc)
112-
}
113-
if !tc.wantFail && len(it.errs) > 0 {
114-
t.Fatalf("case %s:test should not fail, but found %d errors", tc.desc, len(it.errs))
115132
}
116133
}
117134
}
118135

119136
type testReadRowsClient struct {
120137
storagepb.BigQueryRead_ReadRowsClient
121138
responses []*storagepb.ReadRowsResponse
139+
errors []error
122140
}
123141

124142
func (trrc *testReadRowsClient) Recv() (*storagepb.ReadRowsResponse, error) {
125-
if len(trrc.responses) == 0 {
126-
return nil, io.EOF
143+
if len(trrc.errors) > 0 {
144+
err := trrc.errors[0]
145+
trrc.errors = trrc.errors[1:]
146+
return nil, err
147+
}
148+
if len(trrc.responses) > 0 {
149+
r := trrc.responses[0]
150+
trrc.responses = trrc.responses[:1]
151+
return r, nil
127152
}
128-
r := trrc.responses[0]
129-
trrc.responses = trrc.responses[:1]
130-
return r, nil
153+
return nil, io.EOF
131154
}

0 commit comments

Comments
 (0)