僅傳送一次

本頁說明如何使用 Pub/Sub 的「只處理一次」功能接收及確認訊息,以便追蹤及防止重複處理訊息。啟用這項功能後,Pub/Sub 會提供下列語意:

  • 訂閱者可以判斷訊息確認是否成功。

  • 訊息成功確認後,系統不會重新傳送訊息。

  • 訊息待處理時,系統不會重新傳送。在確認期限到期或訊息獲得確認前,訊息都會視為未解決。

  • 如果有多個有效傳送作業 (因為確認期限到期或用戶端啟動否定確認),只有最新的確認 ID 可用於確認訊息。如果要求含有先前的確認 ID,就會失敗。

啟用「僅處理一次」功能後,訂閱者可以按照下列指南,確保訊息只會處理一次:

  • 在確認期限內確認訊息。

  • 維護處理訊息進度的相關資訊,直到訊息成功確認為止。

  • 如果確認失敗,請使用處理訊息進度的相關資訊,避免重複作業。

只有提取訂閱類型支援「只傳送一次」傳送機制,包括使用 StreamingPull API 的訂閱者。推送和匯出訂閱項目不支援「僅傳送一次」功能。

Pub/Sub 支援在雲端區域內僅傳送一次訊息,依據的是 Pub/Sub 定義的專屬訊息 ID

重新傳送與重複傳送

請務必瞭解預期和非預期重新遞送的差異。

  • 如果用戶端對訊息發出負面確認,或未在確認期限前延長訊息的確認期限,系統就會重新傳送訊息。重新遞送視為有效,系統運作正常。

    如要排解重新傳送的問題,請參閱「處理重複項目」。

  • 如果訊息在成功確認後或確認期限到期前重新傳送,即為重複訊息。

  • 重新傳送的郵件在每次嘗試重新傳送時,都會保留相同的郵件 ID。

啟用「僅傳送一次」的訂閱項目不會收到重複傳送的訊息。

用戶端程式庫支援「僅傳送一次」

  • 支援的用戶端程式庫具有確認介面,可搭配回應使用 (例如 Go)。您可以使用這個介面檢查確認要求是否成功。 如果確認要求成功,用戶端保證不會收到重新傳送的訊息。如果確認要求失敗,用戶端應會重新傳送訊息。

  • 用戶端也可以使用支援的用戶端程式庫,不必使用確認介面。不過,在這種情況下,確認失敗可能會導致系統在無聲無息中重新傳送訊息。

  • 支援的用戶端程式庫具有介面,可設定最短租約延長時間 (例如:Go)。您必須將最低租約延期值設為較大的數字,以免發生任何網路相關的確認逾期問題。上限為 600 秒。

  • 如果您使用 Java 用戶端程式庫,並透過 setChannelProvider() 方法,使用自訂 gRPC 管道初始化訂閱者,建議您在建構 TransportChannelProvider 時,也將 maxInboundMetadataSize 設為至少 1 MB。如要進行這項設定,可以使用 InstantiatingGrpcChannelProvider.Builder.setMaxInboundMetadataSize()ManagedChannelBuilder.maxInboundMetadataSize() 方法。

與「只傳送一次」相關的變數預設值和範圍,以及變數名稱,在不同用戶端程式庫中可能有所不同。舉例來說,在 Java 用戶端程式庫中,下列變數會控管「只傳送一次」的傳送作業。

變數 說明
setEnableExactlyOnceDelivery 啟用或停用「僅傳送一次」傳送功能。 true 或 false 預設值=false
minDurationPerAckExtension 延長修改確認期限的最短時間 (以秒為單位)。 範圍=0 至 600 預設值=無
maxDurationPerAckExtension 延長修改確認期限的時間上限 (以秒為單位)。 範圍=0 至 600 預設值=無

如果是只傳送一次,當確認 ID 已過期時,對 Pub/Sub 的 modifyAckDeadlineacknowledgment 要求就會失敗。在這種情況下,由於較新的遞送作業可能已在進行中,因此服務會將過期的確認 ID 視為無效。這是為了確保訊息只傳送一次而設計。接著,您會看到 acknowledgmentModifyAckDeadline 要求傳回 INVALID_ARGUMENT 回應。如果停用「只傳送一次」功能,這些要求會在確認 ID 過期時傳回 OK

為確保 acknowledgmentModifyAckDeadline 要求具有有效的確認 ID,請考慮將 minDurationPerAckExtension 的值設為較大的數字。

區域注意事項

只有在訂閱者連線至相同區域的服務時,系統才會保證僅傳送一次訊息。如果訂閱者應用程式分散在多個區域,即使啟用「僅傳送一次」,也可能導致訊息重複傳送。發布者可以將訊息傳送至任何區域,且系統仍會維持「正好一次」的保證。

在 Google Cloud中執行應用程式時,應用程式預設會連線至同一區域的 Pub/Sub 端點。因此,在 Google Cloud內單一區域執行應用程式,通常可確保您與單一區域互動。

在 Google Cloud外部或多個區域中執行訂閱端應用程式時,您可以在設定 Pub/Sub 用戶端時使用位置端點,確保連線至單一區域。Pub/Sub 的所有位置端點都指向單一區域。如要進一步瞭解位置端點,請參閱 Pub/Sub 端點。如需 Pub/Sub 的所有位置端點清單,請參閱位置端點清單

建立「僅傳送一次」的訂閱項目

您可以使用 Google Cloud 控制台、Google Cloud CLI、用戶端程式庫或 Pub/Sub API,建立具有「只傳送一次」傳送方式的訂閱項目。

提取訂閱項目

控制台

如要建立具有「只傳送一次」傳送機制的提取訂閱項目,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的「Subscriptions」(訂閱項目) 頁面。

    前往「訂閱項目」頁面

  2. 按一下「Create Subscription」 (建立訂閱項目)

  3. 輸入「Subscription ID」(訂閱項目 ID)

  4. 從下拉式選單中選擇或建立主題。

    訂閱項目會接收來自主題的訊息。

  5. 在「僅傳送一次」部分,選取「啟用僅傳送一次」

  6. 點選「建立」

gcloud

如要建立採用「僅傳送一次」傳送方式的提取訂閱項目,請使用 gcloud pubsub subscriptions create 指令並加上 --enable-exactly-once-delivery 旗標:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --enable-exactly-once-delivery

更改下列內容:

  • SUBSCRIPTION_ID:要建立的訂閱項目 ID
  • TOPIC_ID:要附加至訂閱項目的主題 ID

REST

如要建立「僅傳送一次」的訂閱項目,請使用 projects.subscriptions.create 方法。

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

更改下列內容:

  • PROJECT_ID:要在其中建立訂閱項目的專案 ID
  • SUBSCRIPTION_ID:要建立的訂閱項目 ID

如要建立「僅傳送一次」的提取訂閱項目,請在要求主體中指定這項設定:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "enableExactlyOnceDelivery": true,
}

更改下列內容:

  • PROJECT_ID:含有主題的專案 ID
  • TOPIC_ID:要附加至訂閱項目的主題 ID

C++

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_exactly_once_delivery(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C# 設定操作說明進行操作。詳情請參閱 Pub/Sub C# API 參考說明文件


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithExactlyOnceDeliverySample
{
    public Subscription CreateSubscriptionWithExactlyOnceDelivery(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableExactlyOnceDelivery = true
        };

        Subscription subscription = null;

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

以下範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createSubscriptionWithExactlyOnceDelivery(w io.Writer, projectID, topic, subscription string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	pbSub := &pubsubpb.Subscription{
		Name:                      subscription,
		Topic:                     topic,
		EnableExactlyOnceDelivery: true,
	}
	sub, err := client.SubscriptionAdminClient.CreateSubscription(ctx, pbSub)
	if err != nil {
		return fmt.Errorf("failed to create exactly once sub: %w", err)
	}
	fmt.Fprintf(w, "Created a subscription with exactly once delivery enabled: %v\n", sub)
	return nil
}

Java

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithExactlyOnceDelivery {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithExactlyOnceDeliveryExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithExactlyOnceDeliveryExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Enable exactly once delivery in the subscription.
                  .setEnableExactlyOnceDelivery(true)
                  .build());

      System.out.println(
          "Created a subscription with exactly once delivery enabled: "
              + subscription.getAllFields());
    }
  }
}

Python

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_exactly_once_delivery": True,
        }
    )
    print(
        f"Created subscription with exactly once delivery enabled: {subscription}"
    )

Node.js

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId,
  subscriptionNameOrId,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`,
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().',
  );
}

Node.js

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId: string,
  subscriptionNameOrId: string,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`,
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().',
  );
}

Ruby

以下範例使用 Ruby Pub/Sub 用戶端程式庫 v3。如果您仍在使用第 2 版程式庫,請參閱 第 3 版遷移指南。如要查看 Ruby 第 2 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件

# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new project_id: project_id

subscription_admin = pubsub.subscription_admin

subscription = subscription_admin.create_subscription \
  name: pubsub.subscription_path(subscription_id),
  topic: pubsub.topic_path(topic_id),
  enable_exactly_once_delivery: true

puts "Created subscription with exactly once delivery enabled: #{subscription_id}"

PHP

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 PHP 設定說明進行操作。 詳情請參閱 Pub/Sub PHP API 參考說明文件

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function create_subscription_with_exactly_once_delivery(
    string $projectId,
    string $topicName,
    string $subscriptionName
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $subscription->create([
        'enableExactlyOnceDelivery' => true
    ]);

    // Exactly Once Delivery status for the subscription
    $status = $subscription->info()['enableExactlyOnceDelivery'];

    printf('Subscription created with exactly once delivery status: %s' . PHP_EOL, $status ? 'true' : 'false');
}

監控「僅傳送一次」訂閱項目

這項指標會記錄可能導致重新傳送的事件數 (有效或重複)。 subscription/exactly_once_warning_count這項指標會計算 Pub/Sub 無法處理與確認 ID 相關聯要求 (ModifyAckDeadlineacknowledgment 要求) 的次數。失敗原因可能是伺服器或用戶端問題。舉例來說,如果用於維護「只傳送一次」傳送資訊的持續性層無法使用,就會是伺服器型事件。如果用戶端嘗試使用無效的確認 ID 確認訊息,就會是基於用戶端的事件。

瞭解指標

subscription/exactly_once_warning_count 會擷取可能或可能不會導致實際重新遞送的事件,且可能會根據用戶端行為而產生雜訊。舉例來說,如果重複發出 acknowledgmentModifyAckDeadline 要求,但確認 ID 無效,指標就會重複遞增。

下列指標也有助於瞭解用戶端行為:

在大多數可重試的失敗事件中,支援的用戶端程式庫會自動重試要求。

配額

「僅傳送一次」訂閱項目須遵守額外的配額規定。這些配額的實施對象包括:

  • 每個區域中,從啟用僅傳送一次訊息的訂閱項目取用的訊息數。
  • 使用啟用「僅傳送一次」的區域訂閱項目時,已確認或延後期限的訊息數量。

如要進一步瞭解這些配額,請參閱「配額」主題中的表格。

僅傳送一次訊息和依序傳送訂閱項目

Pub/Sub 支援依序傳送,確保訊息只會傳送一次。

使用「僅傳送一次」的排序功能時,Pub/Sub 會依序等待確認。如果確認訊息順序有誤,服務會因暫時性錯誤而導致要求失敗。如果遞送的依序確認作業在確認期限前到期,用戶端會收到重新遞送的訊息。因此,當您使用「保證送達一次」的排序功能時,用戶端輸送量會限制為每秒一千則訊息。

「僅傳送一次」和推送訂閱項目

Pub/Sub 僅支援透過提取訂閱項目「僅傳送一次」。

從推送訂閱項目取用訊息的用戶端,會透過成功回應推送要求來確認訊息。不過,用戶端不知道 Pub/Sub 訂閱項目是否收到並處理了回應。這與提取訂閱不同,在提取訂閱中,確認要求是由用戶端發起,如果要求處理成功,Pub/Sub 訂閱會做出回應。因此,「僅傳送一次」傳送語意與推送訂閱項目不太相符。

注意事項

  • 如果在建立訂閱項目時未指定確認期限,啟用「僅傳送一次」的訂閱項目會將確認期限預設為 60 秒。

  • 較長的預設確認期限有助於避免因網路事件而重新傳送訊息。支援的用戶端程式庫不會使用預設的訂閱項目確認期限。

  • 與一般訂閱項目相比,「僅傳送一次」訂閱項目的發布端到訂閱端延遲時間明顯較長。

  • 如果需要高處理量,確切傳送一次的用戶端也必須使用串流提取

  • 即使啟用「僅傳送一次」,訂閱項目仍可能因發布端重複而收到多個相同訊息副本。發布端重複項目可能是因為發布用戶端或 Pub/Sub 服務多次重試發布作業所致。發布端用戶端在重試期間多次發布訊息,會導致系統重新傳送不同的訊息 IDPub/Sub 服務會多次發布不重複的訊息,以回應用戶端發布要求,導致系統重複傳送相同訊息 ID 的訊息。

  • 您可以在 subscription/exactly_once_warning_count 中重試失敗的作業,支援的用戶端程式庫會自動重試這些作業。不過,如果失敗原因與無效的確認 ID 相關,則無法重試。