本文說明如何在 Dataform 中執行下列操作:
事前準備
如要使用工作流程設定排定執行時間 或使用工作流程和 Cloud Scheduler 排定執行時間,請務必執行下列操作:
如要使用 Cloud Composer 安排執行時間,請務必完成下列步驟:
- 選取或建立 Dataform 存放區。
- 將 BigQuery 存取權授予 Dataform。
- 選取或建立 Dataform 工作區。
- 至少建立一個資料表。
- 建立 Cloud Composer 2 環境。
必要的角色
如要取得完成本文工作所需的權限,請要求管理員授予下列 IAM 角色:
-
Dataform 管理員 (
roles/dataform.admin
) 存放區 -
Composer Worker (
roles/composer.worker
) 在 Cloud Composer 環境的服務帳戶上
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。
如要使用預設 Dataform 服務帳戶以外的服務帳戶,請授予自訂服務帳戶存取權。
如要啟用工作流程設定的排程執行作業,且已啟用嚴格的「以服務帳戶身分執行」模式,您必須將 iam.serviceAccounts.actAs
權限授予Dataform 服務帳戶,以供工作流程設定中使用的服務帳戶使用。「服務帳戶使用者」角色 (roles/iam.serviceAccountUser
) 具備這項權限。
如要在建立工作流程設定 (預覽版) 時使用 Google 帳戶使用者憑證,請授予 Google 帳戶存取權。
使用工作流程設定排定執行時間
本節說明如何在 Dataform 中建立工作流程設定,以排定及設定工作流程執行作業。您可以運用工作流程設定,讓系統按照排程執行 Dataform 工作流程。
關於工作流程設定
如要排定在 BigQuery 中執行所有或所選工作流程動作的 Dataform 執行作業,可以建立工作流程設定。在工作流程設定中,選取編譯版本設定、要執行的工作流程動作,以及執行時間表。
接著,在排定的工作流程設定執行期間,Dataform 會將您在發布設定中選取的動作,從最新的編譯結果部署至 BigQuery。您也可以使用 Dataform API workflowConfigs 手動觸發工作流程設定的執行作業。
Dataform 工作流程設定包含下列執行設定:
- 工作流程設定的 ID。
- 版本設定。
服務帳戶。
這是與工作流程設定相關聯的服務帳戶。您可以選取預設的 Dataform 服務帳戶或與 Google Cloud 專案相關聯的服務帳戶,也可以手動輸入其他服務帳戶。根據預設,工作流程設定會使用與存放區相同的服務帳戶。
服務帳戶憑證是建立及執行排定工作流程設定時的預設授權方法。
Google 帳戶使用者憑證 (預先發布版)
Google 帳戶使用者憑證是手動建立及執行非排程工作流程設定時的預設授權方法。詳情請參閱「授權給您的 Google 帳戶」。
要執行的工作流程動作:
- 所有動作。
- 選取動作。
- 選取標記。
執行時間表和時區。
建立工作流程設定
如要建立 Dataform 工作流程設定,請按照下列步驟操作:
- 在存放區中,前往「發行內容和排程」。
- 在「Workflow configurations」(工作流程設定) 部分,按一下「Create」(建立)。
在「建立工作流程設定」窗格的「設定 ID」欄位中,輸入工作流程設定的專屬 ID。
ID 只能使用數字、英文字母、連字號和底線。
在「Release configuration」(版本設定) 選單中,選取編譯版本設定。
在「驗證」部分,使用 Google 帳戶使用者憑證或服務帳戶授權工作流程設定。
- 如要使用 Google 帳戶使用者憑證 (預覽),請選取「以我的使用者憑證執行」。
- 如要使用服務帳戶,請選取「Execute with selected service account」(使用所選服務帳戶執行),然後選取預設 Dataform 服務帳戶,或與您有權存取的Google Cloud 專案相關聯的任何服務帳戶。如未選取服務帳戶,工作流程設定會使用存放區的服務帳戶。
選用:在「Schedule frequency」(排程頻率) 欄位中,以 Unix-Cron 格式輸入執行頻率。
為確保 Dataform 在對應的版本設定中執行最新的編譯結果,請在建立編譯結果的時間與排定執行時間之間,至少間隔一小時。
選用:在「時區」選單中,選取執行作業的時區。
預設時區為世界標準時間。
選取要執行的工作流程動作:
- 如要執行整個工作流程,請按一下「所有動作」。
- 如要執行工作流程中選取的動作,請按一下「選取動作」,然後選取動作。
- 如要對所選標記執行動作,請按一下「選取標記」,然後選取標記。
- 選用:如要執行所選動作或代碼及其依附元件,請選取「Include dependencies」選項。
- 選用:如要執行所選動作或代碼及其依附元件,請選取「Include dependents」選項。
- 選用:如要從頭重建所有表格,請選取「以完整重新整理執行」選項。
如果沒有這個選項,Dataform 會更新遞增資料表,但不會從頭重建。
按一下「建立」,如果驗證方法選取「使用我的使用者憑證執行」,您必須授權 Google 帳戶 (預覽)。
舉例來說,下列工作流程設定會以 CEST 時區每小時執行一次標記為 hourly
的動作:
- 設定 ID:
production-hourly
- 版本設定:-
- 頻率:
0 * * * *
- 時區:
Central European Summer Time (CEST)
- 選取工作流程動作:選取標記、
hourly
標記
授權給您的 Google 帳戶
如要使用Google 帳戶使用者憑證驗證資源,您必須手動授予 BigQuery 管道權限,才能取得 Google 帳戶的存取權杖,並代表您存取來源資料。您可以使用 OAuth 對話方塊介面手動核准。
您只需要授予 BigQuery 管道一次權限。
如要撤銷授予的權限,請按照下列步驟操作:
- 前往 Google 帳戶頁面。
- 按一下「BigQuery Pipelines」。
- 按一下 [移除存取權]。
如果新的 Google 帳戶擁有者從未建立工作流程設定,更新憑證以變更工作流程設定擁有者時,也需要手動核准。
編輯工作流程設定
如要編輯工作流程設定,請按照下列步驟操作:
- 在存放區中,前往「發行內容和排程」。
- 在要編輯的工作流程設定中,按一下 「更多」選單,然後按一下「編輯」。
- 在「Edit workflow configuration」(編輯工作流程設定) 窗格中,編輯版本設定,然後按一下「Save」(儲存)。
刪除工作流程設定
如要刪除工作流程設定,請按照下列步驟操作:
- 在存放區中,前往「發行內容和排程」。
- 在要刪除的工作流程設定中,按一下 「More」(更多) 選單,然後按一下「Delete」(刪除)。
- 在「Delete release configuration」(刪除發布版本設定) 對話方塊中,按一下「Delete」(刪除)。
使用 Workflows 和 Cloud Scheduler 排定執行作業
本節說明如何使用 Workflows 和 Cloud Scheduler,排定 Dataform 工作流程的執行時間。
關於排定的工作流程執行作業
您可以建立 Cloud Scheduler 工作來觸發 Workflows 工作流程,藉此設定 Dataform 工作流程的執行頻率。Workflows 會在您定義的自動化調度管理工作流程中執行服務。
工作流程會以兩步驟程序執行 Dataform 工作流程。首先,系統會從 Git 供應商提取 Dataform 存放區程式碼,並編譯成編譯結果。接著,系統會使用編譯結果建立 Dataform 工作流程,並以您設定的頻率執行該工作流程。
建立排定時間的自動化調度管理工作流程
如要排定 Dataform 工作流程的執行時間,請使用 Workflows 建立協調工作流程,並新增 Cloud Scheduler 工作做為觸發條件。
Workflows 會使用服務帳戶,授予工作流程資源存取權。Google Cloud 建立服務帳戶,並授予「Dataform 編輯者」 (
roles/dataform.editor
) Identity and Access Management 角色,以及管理協調工作流程所需的最低權限。詳情請參閱「授予工作流程權限,以便存取 Google Cloud 資源」。建立自動化調度管理工作流程,並使用下列 YAML 原始碼做為工作流程定義:
main: steps: - init: assign: - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}
更改下列內容:
- PROJECT_ID:您的 Google Cloud 專案 ID。
- REPOSITORY_LOCATION:Dataform 存放區的位置。
- REPOSITORY_ID:Dataform 存放區的名稱。
- GIT_COMMITISH:您要從中執行 Dataform 程式碼的 Git 分支。如果是新建立的存放區,請替換為
main
。
自訂 Dataform 工作流程建立編譯結果要求
您可以更新現有的協調工作流程,並以 YAML 格式定義 Dataform 工作流程建立編譯結果要求設定。如要進一步瞭解相關設定,請參閱 projects.locations.repositories.compilationResults
REST 資源參考資料。
舉例來說,如要在編譯期間將 _dev
schemaSuffix
設定新增至所有動作,請將 createCompilationResult
步驟主體替換為下列程式碼片段:
- createCompilationResult:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
gitCommitish: GIT_COMMITISH
codeCompilationConfig:
schemaSuffix: dev
您也可以在 Workflows 執行要求中,以執行階段引數的形式傳遞其他設定,並使用變數存取這些引數。詳情請參閱「在執行要求中傳遞執行階段引數」。
自訂 Dataform 工作流程叫用要求
您可以更新現有的協調工作流程,並以 YAML 格式定義 Dataform 工作流程叫用要求設定。如要進一步瞭解叫用要求設定,請參閱 projects.locations.repositories.workflowInvocations
REST 資源參考資料。
舉例來說,如要只執行含有 hourly
標記的動作,並納入所有遞移依附元件,請將 createWorkflowInvocation
主體替換為下列程式碼片段:
- createWorkflowInvocation:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: ${compilationResult.body.name}
invocationConfig:
includedTags:
- hourly
transitiveDependenciesIncluded: true
您也可以在 Workflows 執行要求中,以執行階段引數的形式傳遞其他設定,並使用變數存取這些引數。詳情請參閱「在執行要求中傳遞執行階段引數」。
使用 Cloud Composer 排定執行作業
您可以使用 Cloud Composer 2 安排 Dataform 執行作業。Dataform 不支援 Cloud Composer 1。
如要使用 Cloud Composer 2 管理 Dataform 執行的排程,可以在 Airflow 有向非循環圖 (DAG) 中使用 Dataform 運算子。您可以建立 Airflow DAG,排定 Dataform 工作流程的呼叫時間。
Dataform 提供各種 Airflow 運算子。包括取得編譯結果、取得工作流程叫用項目,以及取消工作流程叫用項目的運算子。如要查看可用的 Dataform Airflow 運算子完整清單,請參閱「Google Dataform 運算子」。
安裝 google-cloud-dataform
PyPi 套件
如果您使用 Cloud Composer 2 2.0.25
以上版本,環境中會預先安裝這個套件。你不需要安裝。
如果您使用較舊的 Cloud Composer 2 版本,請安裝 google-cloud-dataform
PyPi 套件。
在 PyPI 套件部分中,指定版本 ==0.2.0
。
建立 Airflow DAG,排定 Dataform 工作流程呼叫作業
如要使用 Cloud Composer 2 管理 Dataform 工作流程的排定執行作業,請使用 Dataform Airflow 運算子編寫 DAG,然後將其上傳至環境的 bucket。
下列程式碼範例顯示 Airflow DAG,可建立 Dataform 編譯結果,並啟動 Dataform 工作流程調用:
from datetime import datetime
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
更改下列內容:
- PROJECT_ID:您的 Dataform Google Cloud 專案 ID。
- REPOSITORY_ID:Dataform 存放區的名稱。
- REGION:Dataform 存放區所在的區域。
- COMPILATION_RESULT:您要用於這項工作流程調用的編譯結果名稱。
- GIT_COMMITISH:遠端 Git 存放區中的 Git commitish,代表您要使用的程式碼版本,例如分支或 Git SHA。
以下程式碼範例顯示 Airflow DAG,可執行下列作業:
- 建立 Dataform 編譯結果。
- 啟動非同步 Dataform 工作流程叫用。
- 使用
DataformWorkflowInvocationStateSensor
輪詢工作流程的狀態,直到工作流程進入預期狀態為止。
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
更改下列內容:
- PROJECT_ID:您的 Dataform Google Cloud projectID。
- REPOSITORY_ID:Dataform 存放區的名稱。
- REGION:Dataform 存放區所在的區域。
- COMPILATION_RESULT:您要用於這項工作流程調用的編譯結果名稱。
- GIT_COMMITISH:遠端 Git 存放區中的 Git commitish,代表您要使用的程式碼版本,例如分支或 Git SHA。
- COMPILATION_RESULT:您要用於這項工作流程調用的編譯結果名稱。
新增編譯設定參數
您可以將其他編譯設定參數新增至 create_compilation_result
Airflow DAG 物件。如要進一步瞭解可用參數,請參閱 CodeCompilationConfig
Dataform API 參考資料。
如要將編譯設定參數新增至
create_compilation_result
Airflow DAG 物件,請以以下格式將所選參數新增至code_compilation_config
欄位:create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"} }, )
更改下列內容:
- PROJECT_ID:您的 Dataform Google Cloud 專案 ID。
- REPOSITORY_ID:Dataform 存放區的名稱。
- REGION:Dataform 存放區所在的區域。
- GIT_COMMITISH:遠端 Git 存放區中的 Git commitish,代表您要使用的程式碼版本,例如分支或 Git SHA。
- PARAMETER:所選
CodeCompilationConfig
參數。您可以新增多個參數。 - PARAMETER_VALUE:所選參數的值。
下列程式碼範例顯示新增至 create_compilation_result
Airflow DAG 物件的 defaultDatabase
參數:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
新增工作流程叫用設定參數
您可以將其他工作流程叫用設定參數新增至 create_workflow_invocation
Airflow DAG 物件。如要進一步瞭解可用參數,請參閱 InvocationConfig
Dataform API 參考資料。
如要將工作流程叫用設定參數新增至
create_workflow_invocation
Airflow DAG 物件,請以以下格式將所選參數新增至invocation_config
欄位:create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "PARAMETER": PARAMETER_VALUE } }, )
更改下列內容:
- PROJECT_ID:您的 Dataform Google Cloud 專案 ID。
- REPOSITORY_ID:Dataform 存放區的名稱。
- REGION:Dataform 存放區所在的區域。
- PARAMETER:所選
InvocationConfig
參數。您可以新增多個參數。 - PARAMETER_VALUE:所選參數的值。
下列程式碼範例顯示新增至 create_workflow_invocation
Airflow DAG 物件的 includedTags[]
和 transitiveDependenciesIncluded
參數:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
},
)
後續步驟
- 如要瞭解如何設定 Dataform 編譯版本設定,請參閱建立版本設定。
- 如要進一步瞭解 Dataform 中的程式碼生命週期,請參閱「Dataform 中的程式碼生命週期簡介」。
- 如要進一步瞭解 Dataform API,請參閱 Dataform API。
- 如要進一步瞭解 Cloud Composer 環境,請參閱「Cloud Composer 總覽」。
- 如要進一步瞭解 Workflows 定價,請參閱這篇文章。