Skip to content

Commit 8676885

Browse files
authored
Fix bigquery type error when export format is parquet (#16027)
1 parent faf4caf commit 8676885

File tree

2 files changed

+11
-5
lines changed

2 files changed

+11
-5
lines changed

airflow/providers/google/cloud/transfers/sql_to_gcs.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import json
2121
import warnings
2222
from tempfile import NamedTemporaryFile
23-
from typing import Optional, Sequence, Union
23+
from typing import Dict, Optional, Sequence, Union
2424

2525
import pyarrow as pa
2626
import pyarrow.parquet as pq
@@ -278,7 +278,8 @@ def _convert_parquet_schema(self, cursor):
278278
}
279279

280280
columns = [field[0] for field in cursor.description]
281-
bq_types = [self.field_to_bigquery(field) for field in cursor.description]
281+
bq_fields = [self.field_to_bigquery(field) for field in cursor.description]
282+
bq_types = [bq_field.get('type') if bq_field is not None else None for bq_field in bq_fields]
282283
pq_types = [type_map.get(bq_type, pa.string()) for bq_type in bq_types]
283284
parquet_schema = pa.schema(zip(columns, pq_types))
284285
return parquet_schema
@@ -288,7 +289,7 @@ def query(self):
288289
"""Execute DBAPI query."""
289290

290291
@abc.abstractmethod
291-
def field_to_bigquery(self, field):
292+
def field_to_bigquery(self, field) -> Dict[str, str]:
292293
"""Convert a DBAPI field to BigQuery schema format."""
293294

294295
@abc.abstractmethod

tests/providers/google/cloud/transfers/test_sql_to_gcs.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import json
1919
import unittest
20+
from typing import Dict
2021
from unittest import mock
2122
from unittest.mock import MagicMock, Mock
2223

@@ -62,8 +63,12 @@
6263

6364

6465
class DummySQLToGCSOperator(BaseSQLToGCSOperator):
65-
def field_to_bigquery(self, field):
66-
pass
66+
def field_to_bigquery(self, field) -> Dict[str, str]:
67+
return {
68+
'name': field[0],
69+
'type': 'STRING',
70+
'mode': 'NULLABLE',
71+
}
6772

6873
def convert_type(self, value, schema_type):
6974
return 'convert_type_return_value'

0 commit comments

Comments
 (0)