MongoDB 到 BigQuery 範本 (串流)

這個範本會建立串流管道,用於處理 MongoDB 變更串流。如要使用這個範本,請將變更串流資料發布至 Pub/Sub。管道會從 Pub/Sub 讀取 JSON 記錄,並將其寫入 BigQuery。寫入 BigQuery 的記錄格式與 MongoDB 至 BigQuery 批次範本相同。

管道相關規定

  • 目標 BigQuery 資料集必須存在。
  • Dataflow 工作站機器必須能夠存取來源 MongoDB 執行個體。
  • 您必須建立 Pub/Sub 主題,才能讀取變更串流。 管道執行期間,請監聽 MongoDB 變更串流中的變更資料擷取 (CDC) 事件,並以 JSON 記錄的形式發布至 Pub/Sub。如要進一步瞭解如何將訊息發布至 Pub/Sub,請參閱將訊息發布至主題
  • 這個範本使用 MongoDB 變更串流。不支援 BigQuery 變更資料擷取。

範本參數

必要參數

  • mongoDbUri:MongoDB 連線 URI,格式為 mongodb+srv://:@.
  • 資料庫:要從中讀取集合的 MongoDB 資料庫。例如:my-db
  • collection:MongoDB 資料庫中的集合名稱。例如:my-collection
  • userOptionFLATTENJSONNONEFLATTEN 會將文件扁平化為單一層級。JSON 會以 BigQuery JSON 格式儲存文件。NONE 會將整份文件儲存為 JSON 格式的字串。預設值為 NONE。
  • inputTopic:要讀取的 Pub/Sub 輸入主題,格式為 projects/<PROJECT_ID>/topics/<TOPIC_NAME>
  • outputTableSpec:要寫入的 BigQuery 資料表。例如:bigquery-project:dataset.output_table

選用參數

使用者定義函式

您也可以選擇以 JavaScript 編寫使用者定義函式 (UDF) 來擴充這個範本。範本會針對每個輸入元素呼叫 UDF。 元素酬載會序列化為 JSON 字串。

如要使用 UDF,請將 JavaScript 檔案上傳至 Cloud Storage,並設定下列範本參數:

參數說明
javascriptDocumentTransformGcsPath JavaScript 檔案的 Cloud Storage 位置。
javascriptDocumentTransformFunctionName JavaScript 函式的名稱。

詳情請參閱「為 Dataflow 範本建立使用者定義函式」。

函式規格

UDF 的規格如下:

  • 輸入:MongoDB 文件。
  • 輸出:序列化為 JSON 字串的物件。
  • 執行範本

    控制台

    1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
    2. 前往「依據範本建立工作」
    3. 在「工作名稱」欄位中,輸入專屬工作名稱。
    4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

      如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

    5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the MongoDB (CDC) to BigQuery template。
    6. 在提供的參數欄位中輸入參數值。
    7. 按一下「Run Job」(執行工作)

    gcloud

    在殼層或終端機中執行範本:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION,\
    inputTopic=INPUT_TOPIC

    更改下列內容:

    • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
    • JOB_NAME: 您選擇的不重複工作名稱
    • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
    • VERSION: 您要使用的範本版本

      您可以使用下列值:

    • OUTPUT_TABLE_SPEC:目標 BigQuery 資料表名稱。
    • MONGO_DB_URI:您的 MongoDB URI。
    • DATABASE:您的 MongoDB 資料庫。
    • COLLECTION:您的 MongoDB 集合。
    • USER_OPTION:FLATTEN、JSON 或 NONE。
    • INPUT_TOPIC:您的 Pub/Sub 輸入主題。

    API

    如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION",
              "inputTopic": "INPUT_TOPIC"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC",
       }
    }

    更改下列內容:

    • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
    • JOB_NAME: 您選擇的不重複工作名稱
    • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
    • VERSION: 您要使用的範本版本

      您可以使用下列值:

    • OUTPUT_TABLE_SPEC:目標 BigQuery 資料表名稱。
    • MONGO_DB_URI:您的 MongoDB URI。
    • DATABASE:您的 MongoDB 資料庫。
    • COLLECTION:您的 MongoDB 集合。
    • USER_OPTION:FLATTEN、JSON 或 NONE。
    • INPUT_TOPIC:您的 Pub/Sub 輸入主題。

    後續步驟