Skip to content

Commit ae25253

Browse files
authored
feat(bigquery/storage/managedwriter/adapt): add RANGE support to adapt (#9836)
This PR adds support for schema conversions and proto conversions to the adapt subpackage. For proto conversion, we build a static name for range message types rather than using the normal first-encountered-message, as the schema for a range field will also be the consistent for a given element type. This PR also moves adapt to using subtests for table-driven testing to make it easier to isolate specific test cases. Towards: https://togithub.com/googleapis/google-cloud-go/issues/9017
1 parent bd18728 commit ae25253

File tree

4 files changed

+449
-80
lines changed

4 files changed

+449
-80
lines changed

bigquery/storage/managedwriter/adapt/protoconversion.go

Lines changed: 123 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,13 @@ var bqTypeToFieldTypeMap = map[storagepb.TableFieldSchema_Type]descriptorpb.Fiel
6262
storagepb.TableFieldSchema_STRUCT: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE,
6363
storagepb.TableFieldSchema_TIME: descriptorpb.FieldDescriptorProto_TYPE_INT64,
6464
storagepb.TableFieldSchema_TIMESTAMP: descriptorpb.FieldDescriptorProto_TYPE_INT64,
65+
storagepb.TableFieldSchema_RANGE: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE,
66+
}
67+
68+
var allowedRangeTypes = []storagepb.TableFieldSchema_Type{
69+
storagepb.TableFieldSchema_DATE,
70+
storagepb.TableFieldSchema_DATETIME,
71+
storagepb.TableFieldSchema_TIMESTAMP,
6572
}
6673

6774
// Primitive types which can leverage packed encoding when repeated/arrays.
@@ -105,12 +112,26 @@ var bqTypeToWrapperMap = map[storagepb.TableFieldSchema_Type]string{
105112
// filename used by well known types proto
106113
var wellKnownTypesWrapperName = "google/protobuf/wrappers.proto"
107114

115+
var rangeTypesPrefix = "rangemessage_range_"
116+
108117
// dependencyCache is used to reduce the number of unique messages we generate by caching based on the tableschema.
109118
//
110119
// Keys are based on the base64-encoded serialized tableschema value.
111-
type dependencyCache map[string]protoreflect.MessageDescriptor
120+
type dependencyCache struct {
121+
// keyed by element type
122+
rangeTypes map[storagepb.TableFieldSchema_Type]protoreflect.MessageDescriptor
123+
// general cache
124+
msgs map[string]protoreflect.MessageDescriptor
125+
}
126+
127+
func newDependencyCache() *dependencyCache {
128+
return &dependencyCache{
129+
rangeTypes: make(map[storagepb.TableFieldSchema_Type]protoreflect.MessageDescriptor),
130+
msgs: make(map[string]protoreflect.MessageDescriptor),
131+
}
132+
}
112133

113-
func (dm dependencyCache) get(schema *storagepb.TableSchema) protoreflect.MessageDescriptor {
134+
func (dm *dependencyCache) get(schema *storagepb.TableSchema) protoreflect.MessageDescriptor {
114135
if dm == nil {
115136
return nil
116137
}
@@ -119,15 +140,23 @@ func (dm dependencyCache) get(schema *storagepb.TableSchema) protoreflect.Messag
119140
return nil
120141
}
121142
encoded := base64.StdEncoding.EncodeToString(b)
122-
if desc, ok := (dm)[encoded]; ok {
143+
if desc, ok := dm.msgs[encoded]; ok {
123144
return desc
124145
}
125146
return nil
126147
}
127148

128-
func (dm dependencyCache) getFileDescriptorProtos() []*descriptorpb.FileDescriptorProto {
149+
func (dm *dependencyCache) getFileDescriptorProtos() []*descriptorpb.FileDescriptorProto {
129150
var fdpList []*descriptorpb.FileDescriptorProto
130-
for _, d := range dm {
151+
// emit encountered messages.
152+
for _, d := range dm.msgs {
153+
if fd := d.ParentFile(); fd != nil {
154+
fdp := protodesc.ToFileDescriptorProto(fd)
155+
fdpList = append(fdpList, fdp)
156+
}
157+
}
158+
// emit any range value types used.
159+
for _, d := range dm.rangeTypes {
131160
if fd := d.ParentFile(); fd != nil {
132161
fdp := protodesc.ToFileDescriptorProto(fd)
133162
fdpList = append(fdpList, fdp)
@@ -136,7 +165,7 @@ func (dm dependencyCache) getFileDescriptorProtos() []*descriptorpb.FileDescript
136165
return fdpList
137166
}
138167

139-
func (dm dependencyCache) add(schema *storagepb.TableSchema, descriptor protoreflect.MessageDescriptor) error {
168+
func (dm *dependencyCache) add(schema *storagepb.TableSchema, descriptor protoreflect.MessageDescriptor) error {
140169
if dm == nil {
141170
return fmt.Errorf("cache is nil")
142171
}
@@ -145,24 +174,72 @@ func (dm dependencyCache) add(schema *storagepb.TableSchema, descriptor protoref
145174
return fmt.Errorf("failed to serialize tableschema: %w", err)
146175
}
147176
encoded := base64.StdEncoding.EncodeToString(b)
148-
(dm)[encoded] = descriptor
177+
dm.msgs[encoded] = descriptor
149178
return nil
150179
}
151180

181+
func (dm *dependencyCache) addRangeByElementType(typ storagepb.TableFieldSchema_Type, useProto3 bool) (protoreflect.MessageDescriptor, error) {
182+
if md, present := dm.rangeTypes[typ]; present {
183+
// already added, do nothing.
184+
return md, nil
185+
}
186+
// Not yet present. Build the message.
187+
allowed := false
188+
for _, a := range allowedRangeTypes {
189+
if typ == a {
190+
allowed = true
191+
}
192+
}
193+
if !allowed {
194+
return nil, fmt.Errorf("range does not support %q as a valid element type", typ.String())
195+
}
196+
ts := &storagepb.TableSchema{
197+
Fields: []*storagepb.TableFieldSchema{
198+
{
199+
Name: "start",
200+
Type: typ,
201+
Mode: storagepb.TableFieldSchema_NULLABLE,
202+
},
203+
{
204+
Name: "end",
205+
Type: typ,
206+
Mode: storagepb.TableFieldSchema_NULLABLE,
207+
},
208+
},
209+
}
210+
// we put the range types outside the hierarchical namespace as they're effectively BQ-specific well-known types.
211+
msgTypeName := fmt.Sprintf("%s%s", rangeTypesPrefix, strings.ToLower(typ.String()))
212+
// use a new dependency cache, as we don't want to taint the main one due to matching schema
213+
md, err := storageSchemaToDescriptorInternal(ts, msgTypeName, newDependencyCache(), useProto3)
214+
if err != nil {
215+
return nil, fmt.Errorf("failed to generate range descriptor %q: %v", msgTypeName, err)
216+
}
217+
dm.rangeTypes[typ] = md
218+
return md, nil
219+
}
220+
221+
func (dm *dependencyCache) getRange(typ storagepb.TableFieldSchema_Type) protoreflect.MessageDescriptor {
222+
md, ok := dm.rangeTypes[typ]
223+
if !ok {
224+
return nil
225+
}
226+
return md
227+
}
228+
152229
// StorageSchemaToProto2Descriptor builds a protoreflect.Descriptor for a given table schema using proto2 syntax.
153230
func StorageSchemaToProto2Descriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
154-
dc := make(dependencyCache)
231+
dc := newDependencyCache()
155232
// TODO: b/193064992 tracks support for wrapper types. In the interim, disable wrapper usage.
156-
return storageSchemaToDescriptorInternal(inSchema, scope, &dc, false)
233+
return storageSchemaToDescriptorInternal(inSchema, scope, dc, false)
157234
}
158235

159236
// StorageSchemaToProto3Descriptor builds a protoreflect.Descriptor for a given table schema using proto3 syntax.
160237
//
161238
// NOTE: Currently the write API doesn't yet support proto3 behaviors (default value, wrapper types, etc), but this is provided for
162239
// completeness.
163240
func StorageSchemaToProto3Descriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
164-
dc := make(dependencyCache)
165-
return storageSchemaToDescriptorInternal(inSchema, scope, &dc, true)
241+
dc := newDependencyCache()
242+
return storageSchemaToDescriptorInternal(inSchema, scope, dc, true)
166243
}
167244

168245
// Internal implementation of the conversion code.
@@ -178,10 +255,11 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
178255
for _, f := range inSchema.GetFields() {
179256
fNumber = fNumber + 1
180257
currentScope := fmt.Sprintf("%s__%s", scope, f.GetName())
181-
// If we're dealing with a STRUCT type, we must deal with sub messages.
182-
// As multiple submessages may share the same type definition, we use a dependency cache
183-
// and interrogate it / populate it as we're going.
258+
184259
if f.Type == storagepb.TableFieldSchema_STRUCT {
260+
// If we're dealing with a STRUCT type, we must deal with sub messages.
261+
// As multiple submessages may share the same type definition, we use a dependency cache
262+
// and interrogate it / populate it as we're going.
185263
foundDesc := cache.get(&storagepb.TableSchema{Fields: f.GetFields()})
186264
if foundDesc != nil {
187265
// check to see if we already have this in current dependency list
@@ -225,6 +303,30 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
225303
fields = append(fields, fdp)
226304
}
227305
} else {
306+
if f.Type == storagepb.TableFieldSchema_RANGE {
307+
// Range handling is a special case of general struct handling.
308+
ret := f.GetRangeElementType()
309+
if ret == nil {
310+
return nil, fmt.Errorf("field %q is a RANGE, but doesn't include RangeElementType info", f.GetName())
311+
}
312+
foundDesc, err := cache.addRangeByElementType(ret.GetType(), useProto3)
313+
if err != nil {
314+
return nil, err
315+
}
316+
if foundDesc != nil {
317+
haveDep := false
318+
for _, dep := range deps {
319+
if messageDependsOnFile(foundDesc, dep) {
320+
haveDep = true
321+
break
322+
}
323+
}
324+
// If dep is missing, add to current dependencies.
325+
if !haveDep {
326+
deps = append(deps, foundDesc.ParentFile())
327+
}
328+
}
329+
}
228330
fd, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
229331
if err != nil {
230332
return nil, newConversionError(currentScope, err)
@@ -318,6 +420,13 @@ func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, i
318420
TypeName: proto.String(scope),
319421
Label: convertModeToLabel(field.GetMode(), useProto3),
320422
}
423+
} else if field.GetType() == storagepb.TableFieldSchema_RANGE {
424+
fdp = &descriptorpb.FieldDescriptorProto{
425+
Name: proto.String(name),
426+
Number: proto.Int32(idx),
427+
TypeName: proto.String(fmt.Sprintf("%s%s", rangeTypesPrefix, strings.ToLower(field.GetRangeElementType().GetType().String()))),
428+
Label: convertModeToLabel(field.GetMode(), useProto3),
429+
}
321430
} else {
322431
// For (REQUIRED||REPEATED) fields for proto3, or all cases for proto2, we can use the expected scalar types.
323432
if field.GetMode() != storagepb.TableFieldSchema_NULLABLE || !useProto3 {

0 commit comments

Comments
 (0)