spark-bigquery-connector を Apache Spark とともに使用すると、BigQuery との間でデータの読み書きを行えます。このコネクタは、BigQuery からデータを読み取るときに BigQuery Storage API を使用します。
このチュートリアルでは、プリインストールされたコネクタの可用性について説明し、特定のコネクタ バージョンを Spark ジョブで使用できるようにする方法について説明します。コードの例は、Spark アプリケーション内で Spark BigQuery コネクタを使用する方法を示しています。
プリインストールされているコネクタを使用する
Spark BigQuery コネクタは、イメージ バージョン 2.1
以降で作成された Dataproc クラスタで実行される Spark ジョブにプリインストールされています。プリインストールされたコネクタのバージョンは、各イメージ バージョンのリリースページに記載されています。たとえば、[2.2.x イメージ リリースのバージョン] ページの [BigQuery Connector] 行には、最新の 2.2 イメージ リリースにインストールされているコネクタ バージョンが表示されます。
特定のコネクタ バージョンを Spark ジョブで使用できるようにする
2.1
以降のイメージ バージョンのクラスタにプリインストールされたバージョンとは異なるコネクタ バージョンを使用する場合、または 2.1
より前のイメージ バージョンのクラスタにコネクタをインストールする場合は、このセクションの手順に沿って操作します。
重要: spark-bigquery-connector
のバージョンには、Dataproc クラスタ イメージ バージョンとの互換性がある必要があります。Dataproc イメージとのコネクタの互換性マトリックスをご覧ください。
2.1
以降のイメージ バージョンのクラスタ
2.1
以降のイメージ バージョンで Dataproc クラスタを作成する場合は、クラスタ メタデータとしてコネクタ バージョンを指定します。
gcloud CLI の例:
gcloud dataproc clusters create CLUSTER_NAME \ --region=REGION \ --image-version=2.2 \ --metadata=SPARK_BQ_CONNECTOR_VERSION or SPARK_BQ_CONNECTOR_URL\ other flags
注:
SPARK_BQ_CONNECTOR_VERSION: コネクタのバージョンを指定します。Spark BigQuery コネクタのバージョンは、GitHub の spark-bigquery-connector/releases ページに記載されています。
例:
--metadata=SPARK_BQ_CONNECTOR_VERSION=0.42.1
SPARK_BQ_CONNECTOR_URL: Cloud Storage 内の jar を指す URL を指定します。GitHub の コネクタのダウンロードと使用の [リンク] 列に記載されているコネクタの URL を指定するか、カスタム コネクタ jar を配置した Cloud Storage のロケーションのパスを指定できます。
例:
--metadata=SPARK_BQ_CONNECTOR_URL=gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar --metadata=SPARK_BQ_CONNECTOR_URL=gs://PATH_TO_CUSTOM_JAR
2.0
以前のイメージ バージョンのクラスタ
次のいずれかの方法で、Spark BigQuery コネクタをアプリケーションで使用できるようにします。
クラスタの作成時に Dataproc コネクタ初期化アクションを使用して、すべてのノードの Spark の jars ディレクトリに spark-bigquery-connector をインストールします。
Google Cloud コンソール、gcloud CLI、または Dataproc API を使用してジョブをクラスタに送信するときに、コネクタ jar の URL を指定します。
コンソール
Dataproc の [ジョブの送信] ページで、Spark ジョブの [Jar ファイル] 項目を使用します。
gcloud
API
SparkJob.jarFileUris
フィールドを使用します。2.0 より前のバージョンのイメージ クラスタで Spark ジョブを実行するときにコネクタ JAR を指定する方法
- コネクタ jar は、次の URI 文字列の Scala とコネクタのバージョン情報を置き換えて指定します。
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
- Dataproc イメージ バージョン
1.5+
で Scala2.12
を使用する gcloud CLI の例:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job args
- Dataproc イメージ バージョン
1.4
以前には Scala2.11
を使用します。 gcloud CLI の例:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
- コネクタ jar は、次の URI 文字列の Scala とコネクタのバージョン情報を置き換えて指定します。
Scala または Java Spark アプリケーションに依存関係としてコネクタ jar を含める(コネクタのコンパイルをご覧ください)。
費用を計算する
このドキュメントでは、課金対象である次の Google Cloudコンポーネントを使用します。
- Dataproc
- BigQuery
- Cloud Storage
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
BigQuery との間でデータを読み書きする
このサンプルでは、BigQuery から Spark DataFrame にデータを読み込み、標準データソース API を使用してワード数をカウントします。
コネクタは、最初にデータを Cloud Storage の一時テーブルにバッファリングしてから、BigQuery にすべてのデータを書き込みます。次に、1 回のオペレーションですべてのデータを BigQuery にコピーします。BigQuery の読み込みオペレーションが成功した直後と、Spark アプリケーションが終了する際に、コネクタは一時ファイルの削除を試みます。ジョブが失敗した場合、残っている Cloud Storage の一時ファイルを削除します。通常、一時的な BigQuery ファイルは gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
にあります。
課金を構成する
デフォルトでは、認証情報またはサービス アカウントに関連付けられているプロジェクトには、API 使用量に対して課金されます。別のプロジェクトに請求するには、次のように構成します: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
また、読み取り/書き込みオペレーションに次のように追加することもできます: .option("parentProject", "<BILLED-GCP-PROJECT>")
。
コードを実行する
このサンプルを実行する場合は、事前に「wordcount_dataset」という名前のデータセットを作成するか、コード内の出力データセットをGoogle Cloud プロジェクトの既存の BigQuery データセットに変更します。
次のように bq コマンドを使用して wordcount_dataset
を作成します。
bq mk wordcount_dataset
Google Cloud CLI コマンドを使用して、BigQuery にエクスポートするために使用する Cloud Storage バケットを作成します。
gcloud storage buckets create gs://[bucket]
Scala
- コードを調査し、[bucket] プレースホルダを以前に作成した Cloud Storage バケットに置き換えます。
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = (spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache()) wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save())
- クラスタ上でコードを実行します
- SSH を使用して Dataproc クラスタ マスター ノードに接続します。
- Google Cloud コンソールで Dataproc の [クラスタ] ページに移動し、クラスタの名前をクリックします。
- [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスター ノード名の右側にある
SSH
をクリックします。
マスターノードのホーム ディレクトリでブラウザ ウィンドウが開きます。Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Google Cloud コンソールで Dataproc の [クラスタ] ページに移動し、クラスタの名前をクリックします。
- 事前にインストールされた
vi
、vim
、nano
テキスト エディタでwordcount.scala
を作成し、Scala コードの一覧から Scala コードを貼り付けます。nano wordcount.scala
spark-shell
REPL を起動します。$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
:load wordcount.scala
コマンドで wordcount.scala を実行して、BigQuery のwordcount_output
テーブルを作成します。出力リストには、ワードカウント出力のうち 20 行が表示されます。:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
出力テーブルをプレビューするには、BigQuery
ページを開いてwordcount_output
テーブルを選択し、[プレビュー] をクリックします。
- SSH を使用して Dataproc クラスタ マスター ノードに接続します。
PySpark
- コードを調査し、[bucket] プレースホルダを以前に作成した Cloud Storage バケットに置き換えます。
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- クラスタ上でコードを実行する
- SSH を使用して Dataproc クラスタ マスター ノードに接続します。
- Google Cloud コンソールで Dataproc の [クラスタ] ページに移動し、クラスタの名前をクリックします。
- [クラスタの詳細] ページで、[VM インスタンス] タブを選択します。次に、クラスタ マスター ノード名の右側にある
SSH
をクリックします。
マスターノードのホーム ディレクトリでブラウザ ウィンドウが開きます。Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Google Cloud コンソールで Dataproc の [クラスタ] ページに移動し、クラスタの名前をクリックします。
- 事前にインストールされた
vi
、vim
、またはnano
テキスト エディタでwordcount.py
を作成し、PySpark のコード一覧から PySpark コードを貼り付けます。nano wordcount.py
spark-submit
で wordcount を実行し、BigQuery のwordcount_output
テーブルを作成します。出力リストには、ワードカウント出力のうち 20 行が表示されます。spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
出力テーブルをプレビューするには、BigQuery
ページを開いてwordcount_output
テーブルを選択し、[プレビュー] をクリックします。
- SSH を使用して Dataproc クラスタ マスター ノードに接続します。
追加情報
- BigQuery ストレージと Spark SQL - Python
- 外部データソースに対するテーブル定義ファイルの作成
- 外部でパーティションに分割されたデータのクエリ
- Spark ジョブ調整のヒント