Skip to content

Commit 9279c44

Browse files
mnojekLukasz Wyszomirski
andauthored
Fix part of Google system tests (#18494)
* Fix CloudBuildExampleDagsSystemTest * Fix DataCatalog tests * Fix Dataprep test * Fix GCS test * Fix Dataflow test * Fix GC Functions test * Fix Cloud SQL tests * Fix Data Fusion test * Change env var in Dataprep system test * Fix config for dataflow system tests * Add unit test for datafusion * Fix Data Fusion utest, add optional parameter to data fusion pipeline state sensor * Update code with black suggestions * Fix flake8 issue * Revert changes to example_cloud_sql.py according to review feedback Co-authored-by: Lukasz Wyszomirski <[email protected]>
1 parent 461ec4c commit 9279c44

File tree

12 files changed

+85
-25
lines changed

12 files changed

+85
-25
lines changed

airflow/providers/google/cloud/example_dags/example_cloud_build.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@
9797

9898
# [START howto_operator_create_build_from_repo_body]
9999
create_build_from_repo_body = {
100-
"source": {"repo_source": {"repo_name": GCP_SOURCE_REPOSITORY_NAME, "branch_name": "master"}},
100+
"source": {"repo_source": {"repo_name": GCP_SOURCE_REPOSITORY_NAME, "branch_name": "main"}},
101101
"steps": [
102102
{
103103
"name": "gcr.io/cloud-builders/docker",

airflow/providers/google/cloud/example_dags/example_cloud_sql_query.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
from airflow.utils.dates import days_ago
4747

4848
GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
49-
GCP_REGION = os.environ.get('GCP_REGION', 'europe-west-1b')
49+
GCP_REGION = os.environ.get('GCP_REGION', 'europe-west1')
5050

5151
GCSQL_POSTGRES_INSTANCE_NAME_QUERY = os.environ.get('GCSQL_POSTGRES_INSTANCE_NAME_QUERY', 'testpostgres')
5252
GCSQL_POSTGRES_DATABASE_NAME = os.environ.get('GCSQL_POSTGRES_DATABASE_NAME', 'postgresdb')

airflow/providers/google/cloud/example_dags/example_datacatalog.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
"""
2020
Example Airflow DAG that interacts with Google Data Catalog service
2121
"""
22+
import os
23+
2224
from google.cloud.datacatalog_v1beta1 import FieldType, TagField, TagTemplateField
2325

2426
from airflow import models
@@ -49,7 +51,8 @@
4951
)
5052
from airflow.utils.dates import days_ago
5153

52-
PROJECT_ID = "polidea-airflow"
54+
PROJECT_ID = os.getenv("GCP_PROJECT_ID")
55+
BUCKET_ID = os.getenv("GCP_TEST_DATA_BUCKET", "INVALID BUCKET NAME")
5356
LOCATION = "us-central1"
5457
ENTRY_GROUP_ID = "important_data_jan_2019"
5558
ENTRY_ID = "python_files"
@@ -92,7 +95,7 @@
9295
entry={
9396
"display_name": "Wizard",
9497
"type_": "FILESET",
95-
"gcs_fileset_spec": {"file_patterns": ["gs://INVALID BUCKET NAME/**"]},
98+
"gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_ID}/**"]},
9699
},
97100
)
98101
# [END howto_operator_gcp_datacatalog_create_entry_gcs]

airflow/providers/google/cloud/example_dags/example_dataflow.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@
152152
# [START howto_operator_start_python_job_async]
153153
start_python_job_async = BeamRunPythonPipelineOperator(
154154
task_id="start-python-job-async",
155+
runner="DataflowRunner",
155156
py_file=GCS_PYTHON,
156157
py_options=[],
157158
pipeline_options={
@@ -160,14 +161,18 @@
160161
py_requirements=['apache-beam[gcp]==2.25.0'],
161162
py_interpreter='python3',
162163
py_system_site_packages=False,
163-
dataflow_config={"location": 'europe-west3', "wait_until_finished": False},
164+
dataflow_config={
165+
"job_name": "start-python-job-async",
166+
"location": 'europe-west3',
167+
"wait_until_finished": False,
168+
},
164169
)
165170
# [END howto_operator_start_python_job_async]
166171

167172
# [START howto_sensor_wait_for_job_status]
168173
wait_for_python_job_async_done = DataflowJobStatusSensor(
169174
task_id="wait-for-python-job-async-done",
170-
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
175+
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
171176
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
172177
location='europe-west3',
173178
)
@@ -191,9 +196,10 @@ def callback(metrics: List[Dict]) -> bool:
191196

192197
wait_for_python_job_async_metric = DataflowJobMetricsSensor(
193198
task_id="wait-for-python-job-async-metric",
194-
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
199+
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
195200
location='europe-west3',
196201
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
202+
fail_on_terminal_state=False,
197203
)
198204
# [END howto_sensor_wait_for_job_metric]
199205

@@ -207,9 +213,10 @@ def check_message(messages: List[dict]) -> bool:
207213

208214
wait_for_python_job_async_message = DataflowJobMessagesSensor(
209215
task_id="wait-for-python-job-async-message",
210-
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
216+
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
211217
location='europe-west3',
212218
callback=check_message,
219+
fail_on_terminal_state=False,
213220
)
214221
# [END howto_sensor_wait_for_job_message]
215222

@@ -223,9 +230,10 @@ def check_autoscaling_event(autoscaling_events: List[dict]) -> bool:
223230

224231
wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor(
225232
task_id="wait-for-python-job-async-autoscaling-event",
226-
job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",
233+
job_id="{{task_instance.xcom_pull('start-python-job-async')['dataflow_job_id']}}",
227234
location='europe-west3',
228235
callback=check_autoscaling_event,
236+
fail_on_terminal_state=False,
229237
)
230238
# [END howto_sensor_wait_for_job_autoscaling_event]
231239

airflow/providers/google/cloud/example_dags/example_datafusion.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,26 @@
3939
from airflow.utils.state import State
4040

4141
# [START howto_data_fusion_env_variables]
42+
SERVICE_ACCOUNT = os.environ.get("GCP_DATAFUSION_SERVICE_ACCOUNT")
4243
LOCATION = "europe-north1"
4344
INSTANCE_NAME = "airflow-test-instance"
44-
INSTANCE = {"type": "BASIC", "displayName": INSTANCE_NAME}
45+
INSTANCE = {
46+
"type": "BASIC",
47+
"displayName": INSTANCE_NAME,
48+
"dataprocServiceAccount": SERVICE_ACCOUNT,
49+
}
4550

4651
BUCKET_1 = os.environ.get("GCP_DATAFUSION_BUCKET_1", "test-datafusion-bucket-1")
4752
BUCKET_2 = os.environ.get("GCP_DATAFUSION_BUCKET_2", "test-datafusion-bucket-2")
4853

49-
BUCKET_1_URI = f"gs//{BUCKET_1}"
50-
BUCKET_2_URI = f"gs//{BUCKET_2}"
54+
BUCKET_1_URI = f"gs://{BUCKET_1}"
55+
BUCKET_2_URI = f"gs://{BUCKET_2}"
5156

5257
PIPELINE_NAME = os.environ.get("GCP_DATAFUSION_PIPELINE_NAME", "airflow_test")
5358
PIPELINE = {
5459
"name": "test-pipe",
5560
"description": "Data Pipeline Application",
56-
"artifact": {"name": "cdap-data-pipeline", "version": "6.1.2", "scope": "SYSTEM"},
61+
"artifact": {"name": "cdap-data-pipeline", "version": "6.4.1", "scope": "SYSTEM"},
5762
"config": {
5863
"resources": {"memoryMB": 2048, "virtualCores": 1},
5964
"driverResources": {"memoryMB": 2048, "virtualCores": 1},
@@ -72,7 +77,7 @@
7277
"label": "GCS",
7378
"artifact": {
7479
"name": "google-cloud",
75-
"version": "0.14.2",
80+
"version": "0.17.3",
7681
"scope": "SYSTEM",
7782
},
7883
"properties": {
@@ -105,7 +110,7 @@
105110
"label": "GCS2",
106111
"artifact": {
107112
"name": "google-cloud",
108-
"version": "0.14.2",
113+
"version": "0.17.3",
109114
"scope": "SYSTEM",
110115
},
111116
"properties": {
@@ -176,7 +181,7 @@
176181
location=LOCATION,
177182
instance_name=INSTANCE_NAME,
178183
instance=INSTANCE,
179-
update_mask="instance.displayName",
184+
update_mask="",
180185
task_id="update_instance",
181186
)
182187
# [END howto_cloud_data_fusion_update_instance_operator]
@@ -223,6 +228,7 @@
223228
pipeline_name=PIPELINE_NAME,
224229
pipeline_id=start_pipeline_async.output,
225230
expected_statuses=["COMPLETED"],
231+
failure_statuses=["FAILED"],
226232
instance_name=INSTANCE_NAME,
227233
location=LOCATION,
228234
)

airflow/providers/google/cloud/example_dags/example_functions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@
6767
)
6868
GCF_ZIP_PATH = os.environ.get('GCF_ZIP_PATH', '')
6969
GCF_ENTRYPOINT = os.environ.get('GCF_ENTRYPOINT', 'helloWorld')
70-
GCF_RUNTIME = 'nodejs6'
70+
GCF_RUNTIME = 'nodejs14'
7171
GCP_VALIDATE_BODY = os.environ.get('GCP_VALIDATE_BODY', "True") == "True"
7272

7373
# [START howto_operator_gcf_deploy_body]

airflow/providers/google/cloud/hooks/datafusion.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,9 @@ def create_pipeline(
333333
url = os.path.join(self._base_url(instance_url, namespace), quote(pipeline_name))
334334
response = self._cdap_request(url=url, method="PUT", body=pipeline)
335335
if response.status != 200:
336-
raise AirflowException(f"Creating a pipeline failed with code {response.status}")
336+
raise AirflowException(
337+
f"Creating a pipeline failed with code {response.status} while calling {url}"
338+
)
337339

338340
def delete_pipeline(
339341
self,

airflow/providers/google/cloud/sensors/datafusion.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ class CloudDataFusionPipelineStateSensor(BaseSensorOperator):
3333
:type pipeline_name: str
3434
:param expected_statuses: State that is expected
3535
:type expected_statuses: set[str]
36+
:param failure_statuses: State that will terminate the sensor with an exception
37+
:type failure_statuses: set[str]
3638
:param instance_name: The name of the instance.
3739
:type instance_name: str
3840
:param location: The Cloud Data Fusion location in which to handle the request.
@@ -70,6 +72,7 @@ def __init__(
7072
expected_statuses: Set[str],
7173
instance_name: str,
7274
location: str,
75+
failure_statuses: Set[str] = None,
7376
project_id: Optional[str] = None,
7477
namespace: str = "default",
7578
gcp_conn_id: str = 'google_cloud_default',
@@ -81,6 +84,7 @@ def __init__(
8184
self.pipeline_name = pipeline_name
8285
self.pipeline_id = pipeline_id
8386
self.expected_statuses = expected_statuses
87+
self.failure_statuses = failure_statuses
8488
self.instance_name = instance_name
8589
self.location = location
8690
self.project_id = project_id
@@ -119,6 +123,12 @@ def poke(self, context: dict) -> bool:
119123
except AirflowException:
120124
pass # Because the pipeline may not be visible in system yet
121125

126+
if self.failure_statuses and pipeline_status in self.failure_statuses:
127+
raise AirflowException(
128+
f"Pipeline with id '{self.pipeline_id}' state is: {pipeline_status}. "
129+
f"Terminating sensor..."
130+
)
131+
122132
self.log.debug(
123133
"Current status of the pipeline workflow for %s: %s.", self.pipeline_id, pipeline_status
124134
)

tests/providers/google/cloud/operators/test_cloud_build_system_helper.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def create_repository_and_bucket(self):
7979
GCP_PROJECT_ID, GCP_REPOSITORY_NAME
8080
)
8181
self.execute_cmd(["git", "remote", "add", "origin", repo_url], cwd=tmp_dir)
82-
self.execute_cmd(["git", "push", "--force", "origin", "main"], cwd=tmp_dir)
82+
self.execute_cmd(["git", "push", "--force", "origin", "master"], cwd=tmp_dir)
8383

8484
def delete_repo(self):
8585
"""Delete repository in Google Cloud Source Repository service"""

tests/providers/google/cloud/operators/test_dataprep_system.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@
2525
from tests.test_utils.db import clear_db_connections
2626
from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest
2727

28-
TOKEN = environ.get("DATAPREP_TOKEN", "dataprep-system-test-token")
28+
TOKEN = environ.get("DATAPREP_TOKEN")
2929
EXTRA = {"extra__dataprep__token": TOKEN}
3030

3131

32-
@pytest.mark.skipif(environ.get("DATAPREP_TOKEN") is None, reason='Dataprep token not present')
32+
@pytest.mark.skipif(TOKEN is None, reason='Dataprep token not present')
3333
class DataprepExampleDagsTest(GoogleSystemTest):
3434
"""
3535
System tests for Dataprep operators.

0 commit comments

Comments
 (0)