Skip to content

Commit de65a5c

Browse files
Lukasz Wyszomirskipotiuk
authored andcommitted
Support serviceAccount attr for dataflow in the Apache beam
1 parent 76dc737 commit de65a5c

File tree

2 files changed

+6
-0
lines changed

2 files changed

+6
-0
lines changed

airflow/providers/apache/beam/operators/beam.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ def __get_dataflow_pipeline_options(
8989
pipeline_options = copy.deepcopy(pipeline_options)
9090
if job_name_key is not None:
9191
pipeline_options[job_name_key] = job_name
92+
if self.dataflow_config.service_account:
93+
pipeline_options["serviceAccount"] = self.dataflow_config.service_account
9294
pipeline_options["project"] = self.dataflow_config.project_id
9395
pipeline_options["region"] = self.dataflow_config.location
9496
pipeline_options.setdefault("labels", {}).update(
@@ -182,6 +184,7 @@ def _init_pipeline_options(
182184
pipeline_options=pipeline_options,
183185
job_name_variable_key=job_name_variable_key,
184186
)
187+
self.log.info(pipeline_options)
185188

186189
pipeline_options.update(self.pipeline_options)
187190

airflow/providers/google/cloud/operators/dataflow.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ class DataflowConfiguration:
124124
WaitForRun = wait until job finished and the run job.
125125
Supported only by:
126126
:py:class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator`
127+
:param service_account: Run the job as a specific service account, instead of the default GCE robot.
127128
"""
128129

129130
template_fields: Sequence[str] = ("job_name", "location")
@@ -144,6 +145,7 @@ def __init__(
144145
wait_until_finished: Optional[bool] = None,
145146
multiple_jobs: Optional[bool] = None,
146147
check_if_running: CheckJobRunning = CheckJobRunning.WaitForRun,
148+
service_account: Optional[str] = None,
147149
) -> None:
148150
self.job_name = job_name
149151
self.append_job_name = append_job_name
@@ -158,6 +160,7 @@ def __init__(
158160
self.wait_until_finished = wait_until_finished
159161
self.multiple_jobs = multiple_jobs
160162
self.check_if_running = check_if_running
163+
self.service_account = service_account
161164

162165

163166
class DataflowCreateJavaJobOperator(BaseOperator):

0 commit comments

Comments
 (0)