Dataproc クラスタを作成する際には、オプション コンポーネント機能を使用して、Delta Lake などの追加コンポーネントをインストールできます。このページでは、必要に応じて Dataproc クラスタに Delta Lake コンポーネントをインストールする方法について説明します。
Dataproc クラスタにインストールすると、Delta Lake コンポーネントは Delta Lake ライブラリをインストールし、Delta Lake と連携するようにクラスタ内の Spark と Hive を構成します。
互換性のある Dataproc イメージ バージョン
Delta Lake コンポーネントは、Dataproc イメージ バージョン 2.2.46 以降のイメージ バージョンで作成された Dataproc クラスタにインストールできます。
Dataproc イメージ リリースに含まれる Delta Lake コンポーネント バージョンについては、サポートされる Dataproc バージョンをご覧ください。
Delta Lake 関連のプロパティ
Delta Lake コンポーネントを有効にして Dataproc クラスタを作成すると、次の Spark プロパティが Delta Lake と連携するように構成されます。
構成ファイル | プロパティ | デフォルト値 |
---|---|---|
/etc/spark/conf/spark-defaults.conf |
spark.sql.extensions |
io.delta.sql.DeltaSparkSessionExtension |
/etc/spark/conf/spark-defaults.conf |
spark.sql.catalog.spark_catalog |
org.apache.spark.sql.delta.catalog.DeltaCatalog |
コンポーネントをインストールする
Google Cloud コンソール、Google Cloud CLI、または Dataproc API を使用して Dataproc クラスタを作成するときにコンポーネントをインストールします。
コンソール
- Google Cloud コンソールで、Dataproc の [クラスタの作成] ページに移動します。
[クラスターを設定] パネルが選択されています。
- [コンポーネント] セクションの [オプション コンポーネント] で、クラスタにインストールする Delta Lake やその他のオプション コンポーネントを選択します。
gcloud CLI
Jupyter コンポーネントを含む Dataproc クラスタを作成するには、--optional-components
フラグを指定した gcloud dataproc clusters create コマンドを使用します。
gcloud dataproc clusters create CLUSTER_NAME \ --optional-components=DELTA \ --region=REGION \ ... other flags
注:
- CLUSTER_NAME: クラスタの名前を指定します。
- REGION: クラスタが配置される Compute Engine リージョンを指定します。
REST API
Delta Lake コンポーネントは、clusters.create リクエストの一部として SoftwareConfig.Component を使用して Dataproc API で指定できます。
使用例
このセクションでは、Delta Lake テーブルを使用したデータの読み取りと書き込みの例を示します。
Delta Lake テーブル
Delta Lake テーブルに書き込む
Spark DataFrame を使用して、Delta Lake テーブルにデータを書き込むことができます。次の例では、サンプルデータを使用して DataFrame
を作成し、Cloud Storage に my_delta_table
Delta Lake テーブルを作り、データを Delta Lake テーブルに書き込みます。
PySpark
# Create a DataFrame with sample data.
data = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])
# Create a Delta Lake table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS my_delta_table (
id integer,
name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table'""")
# Write the DataFrame to the Delta Lake table in Cloud Storage.
data.writeTo("my_delta_table").append()
Scala
// Create a DataFrame with sample data.
val data = Seq((1, "Alice"), (2, "Bob")).toDF("id", "name")
// Create a Delta Lake table in Cloud Storage.
spark.sql("""CREATE TABLE IF NOT EXISTS my_delta_table (
id integer,
name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table'""")
// Write the DataFrame to the Delta Lake table in Cloud Storage.
data.write.format("delta").mode("append").saveAsTable("my_delta_table")
Spark SQL
CREATE TABLE IF NOT EXISTS my_delta_table (
id integer,
name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table';
INSERT INTO my_delta_table VALUES ("1", "Alice"), ("2", "Bob");
Delta Lake テーブルから読み取る
次の例では、my_delta_table
を読み取り、その内容を表示します。
PySpark
# Read the Delta Lake table into a DataFrame.
df = spark.table("my_delta_table")
# Display the data.
df.show()
Scala
// Read the Delta Lake table into a DataFrame.
val df = spark.table("my_delta_table")
// Display the data.
df.show()
Spark SQL
SELECT * FROM my_delta_table;
Delta Lake を使用した Hive
Hive の Delta テーブルに書き込みます。
Dataproc Delta Lake オプション コンポーネントは、Hive 外部テーブルと連携するように事前構成されています。
詳細については、Hive コネクタをご覧ください。
beeline クライアントでサンプルを実行します。
beeline -u jdbc:hive2://
Spark Delta Lake テーブルを作成します。
Hive 外部テーブルが Delta Lake テーブルを参照するには、Spark を使用して Delta Lake テーブルを作成する必要があります。
CREATE TABLE IF NOT EXISTS my_delta_table (
id integer,
name string)
USING delta
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table';
INSERT INTO my_delta_table VALUES ("1", "Alice"), ("2", "Bob");
Hive 外部テーブルを作成します。
SET hive.input.format=io.delta.hive.HiveInputFormat;
SET hive.tez.input.format=io.delta.hive.HiveInputFormat;
CREATE EXTERNAL TABLE deltaTable(id INT, name STRING)
STORED BY 'io.delta.hive.DeltaStorageHandler'
LOCATION 'gs://delta-gcs-demo/example-prefix/default/my_delta_table';
注:
io.delta.hive.DeltaStorageHandler
クラスは、Hive データソース API を実装します。Delta テーブルを読み込んでメタデータを抽出できます。CREATE TABLE
ステートメントのテーブル スキーマが基盤となる Delta Lake メタデータと一致しない場合、エラーがスローされます。
Hive の Delta Lake テーブルから読み取ります。
Delta テーブルからデータを読み取るには、SELECT
ステートメントを使用します。
SELECT * FROM deltaTable;
Delta Lake テーブルを削除します。
Delta テーブルを削除するには、DROP TABLE
ステートメントを使用します。
DROP TABLE deltaTable;