Skip to content

Commit 95b5a0a

Browse files
Refactor dataproc system tests (#40720)
1 parent 10f2503 commit 95b5a0a

35 files changed

+264
-2431
lines changed

airflow/providers/google/cloud/hooks/dataproc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ def wait_for_operation(
274274
self,
275275
operation: Operation,
276276
timeout: float | None = None,
277-
result_retry: AsyncRetry | _MethodDefault = DEFAULT,
277+
result_retry: AsyncRetry | _MethodDefault | Retry = DEFAULT,
278278
) -> Any:
279279
"""Wait for a long-lasting operation to complete."""
280280
try:

airflow/providers/google/cloud/operators/dataproc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2995,7 +2995,7 @@ def __init__(
29952995
metadata: Sequence[tuple[str, str]] = (),
29962996
gcp_conn_id: str = "google_cloud_default",
29972997
impersonation_chain: str | Sequence[str] | None = None,
2998-
result_retry: AsyncRetry | _MethodDefault = DEFAULT,
2998+
result_retry: AsyncRetry | _MethodDefault | Retry = DEFAULT,
29992999
asynchronous: bool = False,
30003000
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
30013001
polling_interval_seconds: int = 5,

airflow/providers/google/cloud/operators/dataproc_metastore.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
import time
2323
from typing import TYPE_CHECKING, Sequence
2424

25+
from google.api_core.exceptions import AlreadyExists
2526
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
2627
from google.api_core.retry import Retry, exponential_sleep_generator
2728
from google.cloud.metastore_v1 import MetadataExport, MetadataManagementActivity
2829
from google.cloud.metastore_v1.types import Backup, MetadataImport, Service
2930
from google.cloud.metastore_v1.types.metastore import DatabaseDumpSpec, Restore
30-
from googleapiclient.errors import HttpError
3131

3232
from airflow.exceptions import AirflowException
3333
from airflow.models import BaseOperator, BaseOperatorLink
@@ -242,9 +242,7 @@ def execute(self, context: Context) -> dict:
242242
)
243243
backup = hook.wait_for_operation(self.timeout, operation)
244244
self.log.info("Backup %s created successfully", self.backup_id)
245-
except HttpError as err:
246-
if err.resp.status not in (409, "409"):
247-
raise
245+
except AlreadyExists:
248246
self.log.info("Backup %s already exists", self.backup_id)
249247
backup = hook.get_backup(
250248
project_id=self.project_id,
@@ -448,9 +446,7 @@ def execute(self, context: Context) -> dict:
448446
)
449447
service = hook.wait_for_operation(self.timeout, operation)
450448
self.log.info("Service %s created successfully", self.service_id)
451-
except HttpError as err:
452-
if err.resp.status not in (409, "409"):
453-
raise
449+
except AlreadyExists:
454450
self.log.info("Instance %s already exists", self.service_id)
455451
service = hook.get_service(
456452
region=self.region,

scripts/ci/pre_commit/check_system_tests.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
errors: list[str] = []
3636

3737
WATCHER_APPEND_INSTRUCTION = "list(dag.tasks) >> watcher()"
38+
WATCHER_APPEND_INSTRUCTION_SHORT = " >> watcher()"
3839

3940
PYTEST_FUNCTION = """
4041
from tests.system.utils import get_test_run # noqa: E402
@@ -52,7 +53,7 @@
5253
def _check_file(file: Path):
5354
content = file.read_text()
5455
if "from tests.system.utils.watcher import watcher" in content:
55-
index = content.find(WATCHER_APPEND_INSTRUCTION)
56+
index = content.find(WATCHER_APPEND_INSTRUCTION_SHORT)
5657
if index == -1:
5758
errors.append(
5859
f"[red]The example {file} imports tests.system.utils.watcher "

tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import os
2424
from datetime import datetime
2525

26-
from google.api_core.retry_async import AsyncRetry
26+
from google.api_core.retry import Retry
2727

2828
from airflow.models.dag import DAG
2929
from airflow.providers.google.cloud.operators.dataproc import (
@@ -37,10 +37,10 @@
3737
from airflow.utils.trigger_rule import TriggerRule
3838
from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
3939

40-
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
41-
DAG_ID = "dataproc_batch"
40+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
4241
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
43-
REGION = "europe-west1"
42+
DAG_ID = "dataproc_batch"
43+
REGION = "europe-west3"
4444

4545
BATCH_ID = f"batch-{ENV_ID}-{DAG_ID}".replace("_", "-")
4646
BATCH_ID_2 = f"batch-{ENV_ID}-{DAG_ID}-2".replace("_", "-")
@@ -77,7 +77,7 @@
7777
region=REGION,
7878
batch=BATCH_CONFIG,
7979
batch_id=BATCH_ID_2,
80-
result_retry=AsyncRetry(maximum=10.0, initial=10.0, multiplier=1.0),
80+
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
8181
)
8282

8383
create_batch_3 = DataprocCreateBatchOperator(
@@ -104,10 +104,6 @@
104104
get_batch = DataprocGetBatchOperator(
105105
task_id="get_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
106106
)
107-
108-
get_batch_2 = DataprocGetBatchOperator(
109-
task_id="get_batch_2", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_2
110-
)
111107
# [END how_to_cloud_dataproc_get_batch_operator]
112108

113109
# [START how_to_cloud_dataproc_list_batches_operator]
@@ -136,6 +132,14 @@
136132
)
137133
# [END how_to_cloud_dataproc_cancel_operation_operator]
138134

135+
batch_cancelled_sensor = DataprocBatchSensor(
136+
task_id="batch_cancelled_sensor",
137+
region=REGION,
138+
project_id=PROJECT_ID,
139+
batch_id=BATCH_ID_4,
140+
poke_interval=10,
141+
)
142+
139143
# [START how_to_cloud_dataproc_delete_batch_operator]
140144
delete_batch = DataprocDeleteBatchOperator(
141145
task_id="delete_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
@@ -153,25 +157,30 @@
153157
delete_batch.trigger_rule = TriggerRule.ALL_DONE
154158
delete_batch_2.trigger_rule = TriggerRule.ALL_DONE
155159
delete_batch_3.trigger_rule = TriggerRule.ALL_DONE
156-
delete_batch_4.trigger_rule = TriggerRule.ALL_DONE
160+
delete_batch_4.trigger_rule = TriggerRule.ALL_FAILED
157161

158162
(
159163
# TEST SETUP
160164
[create_batch, create_batch_2, create_batch_3]
161165
# TEST BODY
162166
>> batch_async_sensor
163-
>> [get_batch, get_batch_2, list_batches]
167+
>> get_batch
168+
>> list_batches
164169
>> create_batch_4
165170
>> cancel_operation
166171
# TEST TEARDOWN
167-
>> [delete_batch, delete_batch_2, delete_batch_3, delete_batch_4]
172+
>> [delete_batch, delete_batch_2, delete_batch_3]
173+
>> batch_cancelled_sensor
174+
>> delete_batch_4
168175
)
169176

170177
from tests.system.utils.watcher import watcher
171178

172179
# This test needs watcher in order to properly mark success/failure
173180
# when "teardown" task with trigger rule is part of the DAG
174-
list(dag.tasks) >> watcher()
181+
182+
# Excluding sensor because we expect it to fail due to cancelled operation
183+
[task for task in dag.tasks if task.task_id != "batch_cancelled_sensor"] >> watcher()
175184

176185

177186
from tests.system.utils import get_test_run # noqa: E402

tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
3838
DAG_ID = "dataproc_batch_deferrable"
3939
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
40-
REGION = "europe-west1"
40+
REGION = "europe-north1"
4141
BATCH_ID = f"batch-{ENV_ID}-{DAG_ID}".replace("_", "-")
4242
BATCH_CONFIG = {
4343
"spark_batch": {

tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,21 @@
2828
ClusterGenerator,
2929
DataprocCreateBatchOperator,
3030
DataprocCreateClusterOperator,
31+
DataprocDeleteBatchOperator,
3132
DataprocDeleteClusterOperator,
3233
)
3334
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
3435
from airflow.utils.trigger_rule import TriggerRule
36+
from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
3537

36-
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
38+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
3739
DAG_ID = "dataproc_batch_ps"
38-
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
39-
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
40-
REGION = "europe-west1"
41-
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
40+
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
41+
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("-", "_")
42+
REGION = "europe-north1"
43+
CLUSTER_NAME_BASE = f"cluster-{DAG_ID}".replace("_", "-")
44+
CLUSTER_NAME_FULL = CLUSTER_NAME_BASE + f"-{ENV_ID}".replace("_", "-")
45+
CLUSTER_NAME = CLUSTER_NAME_BASE if len(CLUSTER_NAME_FULL) >= 33 else CLUSTER_NAME_FULL
4246
BATCH_ID = f"batch-{ENV_ID}-{DAG_ID}".replace("_", "-")
4347

4448
CLUSTER_GENERATOR_CONFIG_FOR_PHS = ClusterGenerator(
@@ -98,6 +102,14 @@
98102
)
99103
# [END how_to_cloud_dataproc_create_batch_operator_with_persistent_history_server]
100104

105+
delete_batch = DataprocDeleteBatchOperator(
106+
task_id="delete_batch",
107+
project_id=PROJECT_ID,
108+
region=REGION,
109+
batch_id=BATCH_ID,
110+
trigger_rule=TriggerRule.ALL_DONE,
111+
)
112+
101113
delete_cluster = DataprocDeleteClusterOperator(
102114
task_id="delete_cluster",
103115
project_id=PROJECT_ID,
@@ -117,6 +129,7 @@
117129
# TEST BODY
118130
>> create_batch
119131
# TEST TEARDOWN
132+
>> delete_batch
120133
>> delete_cluster
121134
>> delete_bucket
122135
)

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,18 @@
3232
DataprocStopClusterOperator,
3333
)
3434
from airflow.utils.trigger_rule import TriggerRule
35+
from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
3536

36-
DAG_ID = "example_dataproc_cluster_create_existing_stopped_cluster"
37+
DAG_ID = "dataproc_create_existing_stopped_cluster"
3738

38-
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
39-
PROJECT_ID = os.environ.get("SYSTEMS_TESTS_GCP_PROJECTS") or ""
39+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
40+
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
4041

41-
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
42-
REGION = "europe-west1"
42+
43+
CLUSTER_NAME_BASE = f"{DAG_ID}".replace("_", "-")
44+
CLUSTER_NAME_FULL = CLUSTER_NAME_BASE + f"-{ENV_ID}".replace("_", "-")
45+
CLUSTER_NAME = CLUSTER_NAME_BASE if len(CLUSTER_NAME_FULL) >= 33 else CLUSTER_NAME_FULL
46+
REGION = "europe-north1"
4347

4448
# Cluster definition
4549
CLUSTER_CONFIG = {

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,14 @@
3333
from airflow.utils.trigger_rule import TriggerRule
3434
from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
3535

36-
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
36+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
3737
DAG_ID = "dataproc_cluster_def"
3838
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
3939

40-
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
41-
REGION = "europe-west1"
40+
CLUSTER_NAME_BASE = f"cluster-{DAG_ID}".replace("_", "-")
41+
CLUSTER_NAME_FULL = CLUSTER_NAME_BASE + f"-{ENV_ID}".replace("_", "-")
42+
CLUSTER_NAME = CLUSTER_NAME_BASE if len(CLUSTER_NAME_FULL) >= 33 else CLUSTER_NAME_FULL
43+
REGION = "europe-north1"
4244

4345

4446
# Cluster definition

tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,15 @@
3333
from airflow.utils.trigger_rule import TriggerRule
3434
from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
3535

36-
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
36+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
3737
DAG_ID = "dataproc_diagnose_cluster"
3838
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
3939

40-
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
41-
REGION = "europe-west1"
40+
CLUSTER_NAME_BASE = f"cluster-{DAG_ID}".replace("_", "-")
41+
CLUSTER_NAME_FULL = CLUSTER_NAME_BASE + f"-{ENV_ID}".replace("_", "-")
42+
CLUSTER_NAME = CLUSTER_NAME_BASE if len(CLUSTER_NAME_FULL) >= 33 else CLUSTER_NAME_FULL
43+
44+
REGION = "europe-north1"
4245

4346

4447
# Cluster definition
@@ -102,7 +105,7 @@
102105
# TEST SETUP
103106
create_cluster
104107
# TEST BODY
105-
>> diagnose_cluster
108+
>> [diagnose_cluster, diagnose_cluster_deferrable]
106109
# TEST TEARDOWN
107110
>> delete_cluster
108111
)

0 commit comments

Comments
 (0)