Skip to content

Commit 8b7ae76

Browse files
authored
Fixup docs and optimize system test for DataprocSubmitJobOperator (Hadoop job) (#32722)
1 parent 848c69a commit 8b7ae76

File tree

2 files changed

+17
-23
lines changed

2 files changed

+17
-23
lines changed

airflow/providers/google/cloud/operators/dataproc.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1424,6 +1424,10 @@ def execute(self, context: Context):
14241424
class DataprocSubmitHadoopJobOperator(DataprocJobBaseOperator):
14251425
"""Start a Hadoop Job on a Cloud DataProc cluster.
14261426
1427+
.. seealso::
1428+
This operator is deprecated, please use
1429+
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`:
1430+
14271431
:param main_jar: The HCFS URI of the jar file containing the main class
14281432
(use this or the main_class, not both together).
14291433
:param main_class: Name of the job class. (use this or the main_jar, not both
@@ -1931,7 +1935,9 @@ class DataprocSubmitJobOperator(GoogleCloudBaseOperator):
19311935
:param region: Required. The Cloud Dataproc region in which to handle the request.
19321936
:param job: Required. The job resource.
19331937
If a dict is provided, it must be of the same form as the protobuf message
1934-
:class:`~google.cloud.dataproc_v1.types.Job`
1938+
:class:`~google.cloud.dataproc_v1.types.Job`.
1939+
For the complete list of supported job types please take a look here
1940+
https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs
19351941
:param request_id: Optional. A unique id used to identify the request. If the server receives two
19361942
``SubmitJobRequest`` requests with the same id, then the second request will be ignored and the first
19371943
``Job`` created and stored in the backend is returned.

tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
DataprocCreateClusterOperator,
2929
DataprocDeleteClusterOperator,
3030
DataprocSubmitJobOperator,
31-
DataprocUpdateClusterOperator,
3231
)
3332
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
3433
from airflow.utils.trigger_rule import TriggerRule
@@ -53,20 +52,12 @@
5352
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
5453
},
5554
"worker_config": {
56-
"num_instances": 2,
55+
"num_instances": 3,
5756
"machine_type_uri": "n1-standard-4",
5857
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
5958
},
6059
}
6160

62-
# Update options
63-
CLUSTER_UPDATE = {
64-
"config": {"worker_config": {"num_instances": 3}, "secondary_worker_config": {"num_instances": 3}}
65-
}
66-
UPDATE_MASK = {
67-
"paths": ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]
68-
}
69-
7061
TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
7162

7263
# Jobs definitions
@@ -87,7 +78,7 @@
8778
schedule="@once",
8879
start_date=datetime(2021, 1, 1),
8980
catchup=False,
90-
tags=["example", "dataproc"],
81+
tags=["example", "dataproc", "hadoop"],
9182
) as dag:
9283
create_bucket = GCSCreateBucketOperator(
9384
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
@@ -101,16 +92,6 @@
10192
cluster_name=CLUSTER_NAME,
10293
)
10394

104-
scale_cluster = DataprocUpdateClusterOperator(
105-
task_id="scale_cluster",
106-
cluster_name=CLUSTER_NAME,
107-
cluster=CLUSTER_UPDATE,
108-
update_mask=UPDATE_MASK,
109-
graceful_decommission_timeout=TIMEOUT,
110-
project_id=PROJECT_ID,
111-
region=REGION,
112-
)
113-
11495
hadoop_task = DataprocSubmitJobOperator(
11596
task_id="hadoop_task", job=HADOOP_JOB, region=REGION, project_id=PROJECT_ID
11697
)
@@ -127,7 +108,14 @@
127108
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
128109
)
129110

130-
create_bucket >> create_cluster >> scale_cluster >> hadoop_task >> delete_cluster >> delete_bucket
111+
(
112+
# TEST SETUP
113+
[create_bucket, create_cluster]
114+
# TEST BODY
115+
>> hadoop_task
116+
# TEST TEARDOWN
117+
>> [delete_cluster, delete_bucket]
118+
)
131119

132120
from tests.system.utils.watcher import watcher
133121

0 commit comments

Comments
 (0)