Skip to content

Commit c051d0a

Browse files
Add ability to create Flink Jobs in dataproc cluster (#42342)
1 parent 178503a commit c051d0a

File tree

4 files changed

+146
-3
lines changed

4 files changed

+146
-3
lines changed

airflow/providers/google/provider.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ dependencies:
125125
- google-cloud-dataflow-client>=0.8.6
126126
- google-cloud-dataform>=0.5.0
127127
- google-cloud-dataplex>=1.10.0
128-
- google-cloud-dataproc>=5.8.0
128+
- google-cloud-dataproc>=5.12.0
129129
- google-cloud-dataproc-metastore>=1.12.0
130130
- google-cloud-dlp>=3.12.0
131131
- google-cloud-kms>=2.15.0

docs/apache-airflow-providers-google/operators/cloud/dataproc.rst

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ Submit a job to a cluster
249249
-------------------------
250250

251251
Dataproc supports submitting jobs of different big data components.
252-
The list currently includes Spark, Hadoop, Pig and Hive.
252+
The list currently includes Spark, PySpark, Hadoop, Trino, Pig, Flink and Hive.
253253
For more information on versions and images take a look at `Cloud Dataproc Image version list <https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-versions>`__
254254

255255
To submit a job to the cluster you need to provide a job source file. The job source file can be on GCS, the cluster or on your local
@@ -351,6 +351,14 @@ Example of the configuration for a Trino Job:
351351
:start-after: [START how_to_cloud_dataproc_trino_config]
352352
:end-before: [END how_to_cloud_dataproc_trino_config]
353353

354+
Example of the configuration for a Flink Job:
355+
356+
.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc/example_dataproc_flink.py
357+
:language: python
358+
:dedent: 0
359+
:start-after: [START how_to_cloud_dataproc_flink_config]
360+
:end-before: [END how_to_cloud_dataproc_flink_config]
361+
354362
Working with workflows templates
355363
--------------------------------
356364

generated/provider_dependencies.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,7 @@
645645
"google-cloud-dataform>=0.5.0",
646646
"google-cloud-dataplex>=1.10.0",
647647
"google-cloud-dataproc-metastore>=1.12.0",
648-
"google-cloud-dataproc>=5.8.0",
648+
"google-cloud-dataproc>=5.12.0",
649649
"google-cloud-dlp>=3.12.0",
650650
"google-cloud-kms>=2.15.0",
651651
"google-cloud-language>=2.9.0",
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
"""
19+
Example Airflow DAG for DataprocSubmitJobOperator with hadoop job.
20+
"""
21+
22+
from __future__ import annotations
23+
24+
import os
25+
from datetime import datetime
26+
27+
from google.api_core.retry import Retry
28+
29+
from airflow.models.dag import DAG
30+
from airflow.providers.google.cloud.operators.dataproc import (
31+
DataprocCreateClusterOperator,
32+
DataprocDeleteClusterOperator,
33+
DataprocSubmitJobOperator,
34+
)
35+
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
36+
from airflow.utils.trigger_rule import TriggerRule
37+
from tests.system.providers.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
38+
39+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
40+
DAG_ID = "dataproc_flink"
41+
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
42+
43+
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
44+
CLUSTER_NAME_BASE = f"cluster-{DAG_ID}".replace("_", "-")
45+
CLUSTER_NAME_FULL = CLUSTER_NAME_BASE + f"-{ENV_ID}".replace("_", "-")
46+
CLUSTER_NAME = CLUSTER_NAME_BASE if len(CLUSTER_NAME_FULL) >= 33 else CLUSTER_NAME_FULL
47+
REGION = "europe-west1"
48+
49+
OUTPUT_FOLDER = "wordcount"
50+
OUTPUT_PATH = f"gs://{BUCKET_NAME}/{OUTPUT_FOLDER}/"
51+
52+
# Cluster definition
53+
CLUSTER_CONFIG = {
54+
"master_config": {
55+
"num_instances": 1,
56+
"machine_type_uri": "n1-standard-4",
57+
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
58+
},
59+
"software_config": {"image_version": "2.2-debian12", "properties": {}, "optional_components": ["FLINK"]},
60+
"worker_config": {
61+
"num_instances": 3,
62+
"machine_type_uri": "n1-standard-4",
63+
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
64+
},
65+
}
66+
67+
# Jobs definitions
68+
# [START how_to_cloud_dataproc_flink_config]
69+
FLINK_JOB = {
70+
"reference": {"project_id": PROJECT_ID},
71+
"placement": {"cluster_name": CLUSTER_NAME},
72+
"flink_job": {
73+
"main_class": "org.apache.flink.examples.java.wordcount.WordCount",
74+
"jar_file_uris": ["file:///usr/lib/flink/examples/batch/WordCount.jar"],
75+
},
76+
}
77+
# [END how_to_cloud_dataproc_flink_config]
78+
79+
80+
with DAG(
81+
DAG_ID,
82+
schedule="@once",
83+
start_date=datetime(2021, 1, 1),
84+
catchup=False,
85+
tags=["example", "dataproc", "hadoop"],
86+
) as dag:
87+
create_bucket = GCSCreateBucketOperator(
88+
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
89+
)
90+
91+
create_cluster = DataprocCreateClusterOperator(
92+
task_id="create_cluster",
93+
project_id=PROJECT_ID,
94+
cluster_config=CLUSTER_CONFIG,
95+
region=REGION,
96+
cluster_name=CLUSTER_NAME,
97+
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
98+
)
99+
100+
flink_task = DataprocSubmitJobOperator(
101+
task_id="hadoop_task", job=FLINK_JOB, region=REGION, project_id=PROJECT_ID
102+
)
103+
104+
delete_cluster = DataprocDeleteClusterOperator(
105+
task_id="delete_cluster",
106+
project_id=PROJECT_ID,
107+
cluster_name=CLUSTER_NAME,
108+
region=REGION,
109+
trigger_rule=TriggerRule.ALL_DONE,
110+
)
111+
112+
delete_bucket = GCSDeleteBucketOperator(
113+
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
114+
)
115+
116+
(
117+
# TEST SETUP
118+
[create_bucket, create_cluster]
119+
# TEST BODY
120+
>> flink_task
121+
# TEST TEARDOWN
122+
>> [delete_cluster, delete_bucket]
123+
)
124+
125+
from tests.system.utils.watcher import watcher
126+
127+
# This test needs watcher in order to properly mark success/failure
128+
# when "teardown" task with trigger rule is part of the DAG
129+
list(dag.tasks) >> watcher()
130+
131+
132+
from tests.system.utils import get_test_run # noqa: E402
133+
134+
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
135+
test_run = get_test_run(dag)

0 commit comments

Comments
 (0)