Skip to content

Commit efcffa3

Browse files
Add Dataproc SparkR Example (#8240)
* GCP SparkR Example Allows you to schedule R, and sparkR jobs on a dataproc cluster. The functionality to run that kind of job is already in dataproc, but it was not so clear how to do that from Airflow. * Update airflow/providers/google/cloud/example_dags/example_dataproc.py Co-Authored-By: Tomek Urbaszek <[email protected]> * Make sure R file finds correct library Co-authored-by: Tomek Urbaszek <[email protected]>
1 parent befff3e commit efcffa3

File tree

2 files changed

+33
-3
lines changed

2 files changed

+33
-3
lines changed

airflow/providers/google/cloud/example_dags/example_dataproc.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@
3838
OUTPUT_PATH = "gs://{}/{}/".format(BUCKET, OUTPUT_FOLDER)
3939
PYSPARK_MAIN = os.environ.get("PYSPARK_MAIN", "hello_world.py")
4040
PYSPARK_URI = "gs://{}/{}".format(BUCKET, PYSPARK_MAIN)
41-
41+
SPARKR_MAIN = os.environ.get("SPARKR_MAIN", "hello_world.R")
42+
SPARKR_URI = "gs://{}/{}".format(BUCKET, SPARKR_MAIN)
4243

4344
# Cluster definition
4445
CLUSTER = {
@@ -104,6 +105,12 @@
104105
"pyspark_job": {"main_python_file_uri": PYSPARK_URI},
105106
}
106107

108+
SPARKR_JOB = {
109+
"reference": {"project_id": PROJECT_ID},
110+
"placement": {"cluster_name": CLUSTER_NAME},
111+
"spark_r_job": {"main_r_file_uri": SPARKR_URI},
112+
}
113+
107114
HIVE_JOB = {
108115
"reference": {"project_id": PROJECT_ID},
109116
"placement": {"cluster_name": CLUSTER_NAME},
@@ -157,6 +164,10 @@
157164
task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
158165
)
159166

167+
sparkr_task = DataprocSubmitJobOperator(
168+
task_id="sparkr_task", job=SPARKR_JOB, location=REGION, project_id=PROJECT_ID
169+
)
170+
160171
hive_task = DataprocSubmitJobOperator(
161172
task_id="hive_task", job=HIVE_JOB, location=REGION, project_id=PROJECT_ID
162173
)
@@ -178,4 +189,5 @@
178189
scale_cluster >> spark_sql_task >> delete_cluster
179190
scale_cluster >> spark_task >> delete_cluster
180191
scale_cluster >> pyspark_task >> delete_cluster
192+
scale_cluster >> sparkr_task >> delete_cluster
181193
scale_cluster >> hadoop_task >> delete_cluster

tests/providers/google/cloud/operators/test_dataproc_system.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
BUCKET = os.environ.get("GCP_DATAPROC_BUCKET", "dataproc-system-tests")
2626
PYSPARK_MAIN = os.environ.get("PYSPARK_MAIN", "hello_world.py")
2727
PYSPARK_URI = "gs://{}/{}".format(BUCKET, PYSPARK_MAIN)
28+
SPARKR_MAIN = os.environ.get("SPARKR_MAIN", "hello_world.R")
29+
SPARKR_URI = "gs://{}/{}".format(BUCKET, SPARKR_MAIN)
2830

2931
pyspark_file = """
3032
#!/usr/bin/python
@@ -35,16 +37,32 @@
3537
print(words)
3638
"""
3739

40+
sparkr_file = """
41+
#!/usr/bin/r
42+
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
43+
Sys.setenv(SPARK_HOME = "/home/spark")
44+
}
45+
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
46+
sparkR.session()
47+
# Create the SparkDataFrame
48+
df <- as.DataFrame(faithful)
49+
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
50+
"""
51+
3852

3953
@pytest.mark.backend("mysql", "postgres")
4054
@pytest.mark.credential_file(GCP_DATAPROC_KEY)
4155
class DataprocExampleDagsTest(GoogleSystemTest):
42-
4356
@provide_gcp_context(GCP_DATAPROC_KEY)
4457
def setUp(self):
4558
super().setUp()
4659
self.create_gcs_bucket(BUCKET)
47-
self.upload_content_to_gcs(lines=pyspark_file, bucket=PYSPARK_URI, filename=PYSPARK_MAIN)
60+
self.upload_content_to_gcs(
61+
lines=pyspark_file, bucket=PYSPARK_URI, filename=PYSPARK_MAIN
62+
)
63+
self.upload_content_to_gcs(
64+
lines=sparkr_file, bucket=SPARKR_URI, filename=SPARKR_MAIN
65+
)
4866

4967
@provide_gcp_context(GCP_DATAPROC_KEY)
5068
def tearDown(self):

0 commit comments

Comments
 (0)