Skip to content

Commit 04ec45f

Browse files
authored
Add DataprocCreateWorkflowTemplateOperator (#13338)
* Add DataprocCreateWorkflowTemplateOperator * fixup! Add DataprocCreateWorkflowTemplateOperator * fixup! fixup! Add DataprocCreateWorkflowTemplateOperator
1 parent f7d354d commit 04ec45f

File tree

5 files changed

+160
-3
lines changed

5 files changed

+160
-3
lines changed

airflow/providers/google/cloud/example_dags/example_dataproc.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,17 @@
2525
from airflow import models
2626
from airflow.providers.google.cloud.operators.dataproc import (
2727
DataprocCreateClusterOperator,
28+
DataprocCreateWorkflowTemplateOperator,
2829
DataprocDeleteClusterOperator,
30+
DataprocInstantiateWorkflowTemplateOperator,
2931
DataprocSubmitJobOperator,
3032
DataprocUpdateClusterOperator,
3133
)
3234
from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor
3335
from airflow.utils.dates import days_ago
3436

3537
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
36-
CLUSTER_NAME = os.environ.get("GCP_DATAPROC_CLUSTER_NAME", "example-project")
38+
CLUSTER_NAME = os.environ.get("GCP_DATAPROC_CLUSTER_NAME", "example-cluster")
3739
REGION = os.environ.get("GCP_LOCATION", "europe-west1")
3840
ZONE = os.environ.get("GCP_REGION", "europe-west1-b")
3941
BUCKET = os.environ.get("GCP_DATAPROC_BUCKET", "dataproc-system-tests")
@@ -136,6 +138,18 @@
136138
},
137139
}
138140
# [END how_to_cloud_dataproc_hadoop_config]
141+
WORKFLOW_NAME = "airflow-dataproc-test"
142+
WORKFLOW_TEMPLATE = {
143+
"id": WORKFLOW_NAME,
144+
"placement": {
145+
"managed_cluster": {
146+
"cluster_name": CLUSTER_NAME,
147+
"config": CLUSTER_CONFIG,
148+
}
149+
},
150+
"jobs": [{"step_id": "pig_job_1", "pig_job": PIG_JOB["pig_job"]}],
151+
}
152+
139153

140154
with models.DAG("example_gcp_dataproc", start_date=days_ago(1), schedule_interval=None) as dag:
141155
# [START how_to_cloud_dataproc_create_cluster_operator]
@@ -160,6 +174,21 @@
160174
)
161175
# [END how_to_cloud_dataproc_update_cluster_operator]
162176

177+
# [START how_to_cloud_dataproc_create_workflow_template]
178+
create_workflow_template = DataprocCreateWorkflowTemplateOperator(
179+
task_id="create_workflow_template",
180+
template=WORKFLOW_TEMPLATE,
181+
project_id=PROJECT_ID,
182+
location=REGION,
183+
)
184+
# [END how_to_cloud_dataproc_create_workflow_template]
185+
186+
# [START how_to_cloud_dataproc_trigger_workflow_template]
187+
trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
188+
task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID, template_id=WORKFLOW_NAME
189+
)
190+
# [END how_to_cloud_dataproc_trigger_workflow_template]
191+
163192
pig_task = DataprocSubmitJobOperator(
164193
task_id="pig_task", job=PIG_JOB, location=REGION, project_id=PROJECT_ID
165194
)
@@ -210,6 +239,7 @@
210239
# [END how_to_cloud_dataproc_delete_cluster_operator]
211240

212241
create_cluster >> scale_cluster
242+
scale_cluster >> create_workflow_template >> trigger_workflow >> delete_cluster
213243
scale_cluster >> hive_task >> delete_cluster
214244
scale_cluster >> pig_task >> delete_cluster
215245
scale_cluster >> spark_sql_task >> delete_cluster

airflow/providers/google/cloud/operators/dataproc.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1545,6 +1545,70 @@ def execute(self, context):
15451545
super().execute(context)
15461546

15471547

1548+
class DataprocCreateWorkflowTemplateOperator(BaseOperator):
1549+
"""
1550+
Creates new workflow template.
1551+
1552+
:param project_id: Required. The ID of the Google Cloud project the cluster belongs to.
1553+
:type project_id: str
1554+
:param location: Required. The Cloud Dataproc region in which to handle the request.
1555+
:type location: str
1556+
:param template: The Dataproc workflow template to create. If a dict is provided,
1557+
it must be of the same form as the protobuf message WorkflowTemplate.
1558+
:type template: Union[dict, WorkflowTemplate]
1559+
:param retry: A retry object used to retry requests. If ``None`` is specified, requests will not be
1560+
retried.
1561+
:type retry: google.api_core.retry.Retry
1562+
:param timeout: The amount of time, in seconds, to wait for the request to complete. Note that if
1563+
``retry`` is specified, the timeout applies to each individual attempt.
1564+
:type timeout: float
1565+
:param metadata: Additional metadata that is provided to the method.
1566+
:type metadata: Sequence[Tuple[str, str]]
1567+
"""
1568+
1569+
template_fields = ("location", "template")
1570+
template_fields_renderers = {"template": "json"}
1571+
1572+
def __init__(
1573+
self,
1574+
*,
1575+
location: str,
1576+
template: Dict,
1577+
project_id: str,
1578+
retry: Optional[Retry] = None,
1579+
timeout: Optional[float] = None,
1580+
metadata: Optional[Sequence[Tuple[str, str]]] = None,
1581+
gcp_conn_id: str = "google_cloud_default",
1582+
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
1583+
**kwargs,
1584+
):
1585+
super().__init__(**kwargs)
1586+
self.location = location
1587+
self.template = template
1588+
self.project_id = project_id
1589+
self.retry = retry
1590+
self.timeout = timeout
1591+
self.metadata = metadata
1592+
self.gcp_conn_id = gcp_conn_id
1593+
self.impersonation_chain = impersonation_chain
1594+
1595+
def execute(self, context):
1596+
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
1597+
self.log.info("Creating template")
1598+
try:
1599+
workflow = hook.create_workflow_template(
1600+
location=self.location,
1601+
template=self.template,
1602+
project_id=self.project_id,
1603+
retry=self.retry,
1604+
timeout=self.timeout,
1605+
metadata=self.metadata,
1606+
)
1607+
self.log.info("Workflow %s created", workflow.name)
1608+
except AlreadyExists:
1609+
self.log.info("Workflow with given id already exists")
1610+
1611+
15481612
class DataprocInstantiateWorkflowTemplateOperator(BaseOperator):
15491613
"""
15501614
Instantiate a WorkflowTemplate on Google Cloud Dataproc. The operator will wait
@@ -1596,7 +1660,8 @@ class DataprocInstantiateWorkflowTemplateOperator(BaseOperator):
15961660
:type impersonation_chain: Union[str, Sequence[str]]
15971661
"""
15981662

1599-
template_fields = ['template_id', 'impersonation_chain']
1663+
template_fields = ['template_id', 'impersonation_chain', 'request_id', 'parameters']
1664+
template_fields_renderers = {"parameters": "json"}
16001665

16011666
@apply_defaults
16021667
def __init__( # pylint: disable=too-many-arguments

docs/apache-airflow-providers-google/operators/cloud/dataproc.rst

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,30 @@ Example of the configuration for a SparkR:
180180
:start-after: [START how_to_cloud_dataproc_sparkr_config]
181181
:end-before: [END how_to_cloud_dataproc_sparkr_config]
182182

183+
Working with workflows templates
184+
--------------------------------
185+
186+
Dataproc supports creating workflow templates that can be triggered later on.
187+
188+
A workflow template can be created using:
189+
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateWorkflowTemplateOperator`.
190+
191+
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc.py
192+
:language: python
193+
:dedent: 4
194+
:start-after: [START how_to_cloud_dataproc_create_workflow_template]
195+
:end-before: [END how_to_cloud_dataproc_create_workflow_template]
196+
197+
Once a workflow is created users can trigger it using
198+
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateWorkflowTemplateOperator`:
199+
200+
.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dataproc.py
201+
:language: python
202+
:dedent: 4
203+
:start-after: [START how_to_cloud_dataproc_trigger_workflow_template]
204+
:end-before: [END how_to_cloud_dataproc_trigger_workflow_template]
205+
206+
183207
References
184208
^^^^^^^^^^
185209
For further information, take a look at:

tests/always/test_project_structure.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase):
173173
# Deprecated operator. Ignore it
174174
'airflow.providers.google.cloud.operators.dataproc.DataprocSubmitHadoopJobOperator',
175175
'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator',
176-
'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateWorkflowTemplateOperator',
177176
# Deprecated operator. Ignore it
178177
'airflow.providers.google.cloud.operators.dataproc.DataprocScaleClusterOperator',
179178
# Base operator. Ignore it

tests/providers/google/cloud/operators/test_dataproc.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from airflow.providers.google.cloud.operators.dataproc import (
2929
ClusterGenerator,
3030
DataprocCreateClusterOperator,
31+
DataprocCreateWorkflowTemplateOperator,
3132
DataprocDeleteClusterOperator,
3233
DataprocInstantiateInlineWorkflowTemplateOperator,
3334
DataprocInstantiateWorkflowTemplateOperator,
@@ -115,6 +116,18 @@
115116
METADATA = [("key", "value")]
116117
REQUEST_ID = "request_id_uuid"
117118

119+
WORKFLOW_NAME = "airflow-dataproc-test"
120+
WORKFLOW_TEMPLATE = {
121+
"id": WORKFLOW_NAME,
122+
"placement": {
123+
"managed_cluster": {
124+
"cluster_name": CLUSTER_NAME,
125+
"config": CLUSTER,
126+
}
127+
},
128+
"jobs": [{"step_id": "pig_job_1", "pig_job": {}}],
129+
}
130+
118131

119132
def assert_warning(msg: str, warning: Any):
120133
assert any(msg in str(w) for w in warning.warnings)
@@ -914,3 +927,29 @@ def test_execute(self, mock_hook, mock_uuid):
914927
)
915928
job = op.generate_job()
916929
self.assertDictEqual(self.job, job)
930+
931+
932+
class TestDataprocCreateWorkflowTemplateOperator:
933+
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
934+
def test_execute(self, mock_hook):
935+
op = DataprocCreateWorkflowTemplateOperator(
936+
task_id=TASK_ID,
937+
gcp_conn_id=GCP_CONN_ID,
938+
impersonation_chain=IMPERSONATION_CHAIN,
939+
location=GCP_LOCATION,
940+
project_id=GCP_PROJECT,
941+
retry=RETRY,
942+
timeout=TIMEOUT,
943+
metadata=METADATA,
944+
template=WORKFLOW_TEMPLATE,
945+
)
946+
op.execute(context={})
947+
mock_hook.assert_called_once_with(gcp_conn_id=GCP_CONN_ID, impersonation_chain=IMPERSONATION_CHAIN)
948+
mock_hook.return_value.create_workflow_template.assert_called_once_with(
949+
location=GCP_LOCATION,
950+
project_id=GCP_PROJECT,
951+
retry=RETRY,
952+
timeout=TIMEOUT,
953+
metadata=METADATA,
954+
template=WORKFLOW_TEMPLATE,
955+
)

0 commit comments

Comments
 (0)