Cloud Storage-Importthema erstellen

Mit einem Cloud Storage-Importthema können Sie Daten kontinuierlich aus Cloud Storage in Pub/Sub aufnehmen. Anschließend können Sie die Daten in eines der Ziele streamen, die von Pub/Sub unterstützt werden. Pub/Sub erkennt automatisch neue Objekte, die dem Cloud Storage-Bucket hinzugefügt werden, und nimmt sie auf.

Cloud Storage ist ein Dienst zum Speichern Ihrer Objekte inGoogle Cloud. Ein Objekt ist ein unveränderliches Datenelement, das aus einer Datei mit einem beliebigen Format besteht. Objekte werden in Containern gespeichert, die als Buckets bezeichnet werden. Buckets können auch verwaltete Ordner enthalten, mit denen Sie erweiterten Zugriff auf Gruppen von Objekten mit einem gemeinsamen Namenspräfix gewähren können.

Weitere Informationen zu Cloud Storage finden Sie in der Cloud Storage-Dokumentation.

Weitere Informationen zu Importthemen finden Sie unter Importthemen.

Hinweise

Erforderliche Rollen und Berechtigungen

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Pub/Sub Editor (roles/pubsub.editor) für Ihr Thema oder Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen und Verwalten eines Cloud Storage-Importthemas benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen und Verwalten eines Cloud Storage-Importthemas erforderlich sind. Erweitern Sie den Abschnitt Erforderliche Berechtigungen, um die erforderlichen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind erforderlich, um ein Cloud Storage-Importthema zu erstellen und zu verwalten:

  • Importthema erstellen: pubsub.topics.create
  • Importthema löschen: pubsub.topics.delete
  • Importthema abrufen: pubsub.topics.get
  • Importthema auflisten: pubsub.topics.list
  • In einem Importthema veröffentlichen: pubsub.topics.publish
  • Importthema aktualisieren: pubsub.topics.update
  • IAM-Richtlinie für ein Importthema abrufen: pubsub.topics.getIamPolicy
  • Konfigurieren Sie die IAM-Richtlinie für ein Importthema: pubsub.topics.setIamPolicy

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Sie können die Zugriffssteuerung auf Projekt- und auf der Ebene einzelner Ressourcen konfigurieren.

Die Nachrichtenspeicherrichtlinie entspricht dem Bucket-Standort.

Die Nachrichtenspeicherrichtlinie des Pub/Sub-Themas muss sich mit den Regionen überschneiden, in denen sich Ihr Cloud Storage-Bucket befindet. Diese Richtlinie legt fest, wo Pub/Sub Ihre Nachrichtendaten speichern darf.

  • Für Buckets mit dem Standorttyp „Region“: Die Richtlinie muss diese bestimmte Region enthalten. Wenn sich Ihr Bucket beispielsweise in der Region us-central1 befindet, muss die Richtlinie für die Nachrichtenspeicherung auch us-central1 enthalten.

  • Für Buckets mit dem Standorttyp „Dual-Region“ oder „Multi-Region“: Die Richtlinie muss mindestens eine Region innerhalb des Dual-Region- oder Multi-Region-Standorts enthalten. Wenn sich Ihr Bucket beispielsweise in US multi-region befindet, kann die Nachrichtenspeicherrichtlinie us-central1, us-east1 oder eine andere Region in US multi-region enthalten.

    Wenn die Richtlinie die Region des Buckets nicht enthält, schlägt das Erstellen des Themas fehl. Wenn sich Ihr Bucket beispielsweise in europe-west1 befindet und Ihre Richtlinie zur Nachrichtenspeicherung nur asia-east1 umfasst, erhalten Sie eine Fehlermeldung.

    Wenn die Nachrichtenspeicherrichtlinie nur eine Region enthält, die sich mit dem Standort des Buckets überschneidet, kann die Multiregionenredundanz beeinträchtigt werden. Wenn diese einzelne Region nicht mehr verfügbar ist, sind Ihre Daten möglicherweise nicht mehr zugänglich. Um vollständige Redundanz zu gewährleisten, sollten Sie mindestens zwei Regionen in die Richtlinie für Nachrichtenspeicherung aufnehmen, die Teil des multiregionalen oder biregionalen Standorts des Buckets sind.

Weitere Informationen zu Bucket-Standorten finden Sie in der Dokumentation.

Veröffentlichung aktivieren

Damit die Veröffentlichung möglich ist, müssen Sie dem Pub/Sub-Dienstkonto die Rolle „Pub/Sub-Publisher“ zuweisen, damit Pub/Sub im Cloud Storage-Importthema veröffentlichen kann.

Veröffentlichung für alle Cloud Storage-Importthemen aktivieren

Wählen Sie diese Option aus, wenn in Ihrem Projekt kein Cloud Storage-Importthema verfügbar ist.

  1. Rufen Sie in der Google Cloud Console die Seite IAM auf.

    IAM aufrufen

  2. Klicken Sie auf das Kästchen Von Googlebereitgestellte Rollenzuweisungen einschließen.

  3. Suchen Sie nach dem Pub/Sub-Dienstkonto im folgenden Format:

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.iam.gserviceaccount.com

  4. Klicken Sie für dieses Dienstkonto auf die Schaltfläche Hauptkonto bearbeiten.

  5. Klicken Sie bei Bedarf auf Weitere Rolle hinzufügen.

  6. Suchen Sie nach der Pub/Sub-Publisher-Rolle (roles/pubsub.publisher) und wählen Sie sie aus.

  7. Klicken Sie auf Speichern.

Veröffentlichung in einem einzelnen Cloud Storage-Importthema aktivieren

Wenn Sie Pub/Sub die Berechtigung zum Veröffentlichen in einem bestimmten Cloud Storage-Importthema gewähren möchten, das bereits vorhanden ist, gehen Sie so vor:

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Führen Sie den Befehl gcloud pubsub topics add-iam-policy-binding aus:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    Ersetzen Sie Folgendes:

    • TOPIC_ID ist die ID oder der Name des Cloud Storage-Importthemas.

    • ist PROJECT_NUMBER die Projektnummer. Informationen zum Aufrufen der Projektnummer finden Sie unter Projekte identifizieren.

Cloud Storage-Rollen dem Pub/Sub-Dienstkonto zuweisen

Damit Sie ein Cloud Storage-Importthema erstellen können, muss das Pub/Sub-Dienstkonto die Berechtigung zum Lesen aus dem jeweiligen Cloud Storage-Bucket haben. Folgende Berechtigungen sind erforderlich:

  • storage.objects.list
  • storage.objects.get
  • storage.buckets.get

Wählen Sie eines der folgenden Verfahren aus, um dem Pub/Sub-Dienstkonto diese Berechtigungen zuzuweisen:

  • Berechtigungen auf Bucket-Ebene erteilen. Weisen Sie dem Pub/Sub-Dienstkonto im jeweiligen Cloud Storage-Bucket die Rollen „Storage Legacy Object Reader“ (roles/storage.legacyObjectReader) und „Storage Legacy Bucket Reader“ (roles/storage.legacyBucketReader) zu.

  • Wenn Sie Rollen auf Projektebene zuweisen müssen, können Sie stattdessen die Rolle „Storage-Administrator“ (roles/storage.admin) für das Projekt zuweisen, das den Cloud Storage-Bucket enthält. Weisen Sie dem Pub/Sub-Dienstkonto diese Rolle zu.

Bucket-Berechtigungen

Führen Sie die folgenden Schritte aus, um dem Pub/Sub-Dienstkonto auf Bucket-Ebene die Rollen „Storage Legacy Object Reader“ (roles/storage.legacyObjectReader) und „Storage Legacy Bucket Reader“ (roles/storage.legacyBucketReader) zuzuweisen:

  1. Rufen Sie in der Google Cloud -Console die Seite Cloud Storage auf.

    Cloud Storage aufrufen

  2. Klicken Sie auf den Cloud Storage-Bucket, aus dem Sie Nachrichten lesen und in das Cloud Storage-Importthema importieren möchten.

    Die Seite Bucket-Details wird geöffnet.

  3. Klicken Sie auf der Seite Bucket-Details auf den Tab Berechtigungen.

  4. Klicken Sie auf dem Tab Berechtigungen > Nach Hauptkonten ansehen auf Zugriff gewähren.

    Die Seite Zugriff erlauben wird geöffnet.

  5. Geben Sie im Bereich Hauptkonten hinzufügen den Namen Ihres Pub/Sub-Dienstkontos ein.

    Das Dienstkonto hat das Format service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Für ein Projekt mit PROJECT_NUMBER=112233445566 hat das Dienstkonto beispielsweise das Format [email protected].

  6. Geben Sie unter Rollen zuweisen > Rolle auswählen Object Reader ein und wählen Sie die Rolle Storage Legacy Object Reader aus.

  7. Klicken Sie auf Weitere Rolle hinzufügen.

  8. Geben Sie im Drop-down-Menü Rolle auswählen die Option Bucket Reader ein und wählen Sie die Rolle Storage Legacy Bucket Reader aus.

  9. Klicken Sie auf Speichern.

Projektberechtigungen

Führen Sie die folgenden Schritte aus, um die Rolle „Storage-Administrator“ (roles/storage.admin) auf Projektebene zuzuweisen:

  1. Rufen Sie in der Google Cloud Console die Seite IAM auf.

    IAM aufrufen

  2. Klicken Sie auf dem Tab Berechtigungen > Nach Hauptkonten ansehen auf Zugriff gewähren.

    Die Seite Zugriff erlauben wird geöffnet.

  3. Geben Sie im Bereich Hauptkonten hinzufügen den Namen Ihres Pub/Sub-Dienstkontos ein.

    Das Dienstkonto hat das Format service-PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com. Für ein Projekt mit PROJECT_NUMBER=112233445566 hat das Dienstkonto beispielsweise das Format [email protected].

  4. Geben Sie unter Rollen zuweisen > Rolle auswählen Storage Admin ein und wählen Sie die Rolle Storage-Administrator aus.

  5. Klicken Sie auf Speichern.

Weitere Informationen zu Cloud Storage IAM finden Sie unter Cloud Storage Identity and Access Management.

Eigenschaften von Cloud Storage-Importthemen

Weitere Informationen zu den gemeinsamen Attributen für alle Themen finden Sie unter Attribute eines Themas.

Bucket-Name

Dies ist der Name des Cloud Storage-Bucket, aus dem Pub/Sub die Daten liest, die in einem Cloud Storage-Importthema veröffentlicht werden.

Eingabeformat

Wenn Sie ein Cloud Storage-Importthema erstellen, können Sie das Format der aufzunehmenden Objekte als Text, Avro oder Pub/Sub-Avro angeben.

  • Text Es wird davon ausgegangen, dass Objekte Daten mit Nur-Text enthalten. Bei diesem Eingabeformat werden alle Objekte im Bucket aufgenommen, sofern das Objekt die Mindesterstellungszeit der Objekte erfüllt und dem Glob-Muster entspricht.

    Trennzeichen. Sie können auch ein Trennzeichen angeben, mit dem Objekte in Nachrichten aufgeteilt werden. Wenn keine Angabe erfolgt, wird standardmäßig das Zeilenvorschubzeichen (\n) verwendet. Das Trennzeichen darf nur ein einzelnes Zeichen sein.

  • Avro Objekte haben das binäre Apache Avro-Format. Alle Objekte, die nicht im gültigen Apache Avro-Format vorliegen, werden nicht aufgenommen. Hier sind die Einschränkungen in Bezug auf Avro:

    • Avro-Versionen 1.1.0 und 1.2.0 werden nicht unterstützt.
    • Die maximale Größe eines Avro-Blocks beträgt 16 MB.
  • Pub/Sub Avro. Objekte haben das binäre Apache Avro-Format mit einem Schema, das dem eines Objekts entspricht, das mit einem Pub/Sub-Cloud Storage-Abo mit dem Avro-Dateiformat in Cloud Storage geschrieben wurde. Hier sind einige wichtige Richtlinien für Pub/Sub Avro:

    • Das Datenfeld des Avro-Datensatzes wird verwendet, um das Datenfeld der generierten Pub/Sub-Nachricht zu füllen.

    • Wenn die Option „Metadaten schreiben“ für das Cloud Storage-Abo angegeben ist, werden alle Werte im Feld „attributes“ als Attribute der generierten Pub/Sub-Nachricht eingefügt.

    • Wenn in der ursprünglichen Nachricht, die in Cloud Storage geschrieben wurde, ein Sortierschlüssel angegeben ist, wird dieses Feld in der generierten Pub/Sub-Nachricht als Attribut mit dem Namen original_message_ordering_key eingefügt.

Mindesterstellungszeit der Objekte

Sie können optional eine Mindesterstellungszeit für Objekte angeben, wenn Sie ein Cloud Storage-Importthema erstellen. Es werden nur Objekte aufgenommen, die an oder nach diesem Zeitstempel erstellt wurden. Dieser Zeitstempel muss in einem Format wie YYYY-MM-DDThh:mm:ssZ angegeben werden. Jedes Datum in der Vergangenheit oder Zukunft zwischen 0001-01-01T00:00:00Z und 9999-12-31T23:59:59Z (einschließlich) ist gültig.

Glob-Muster abgleichen

Sie können optional ein Glob-Muster für den Abgleich angeben, wenn Sie ein Cloud Storage-Importthema erstellen. Es werden nur Objekte mit Namen aufgenommen, die diesem Muster entsprechen. Wenn Sie beispielsweise alle Objekte mit dem Suffix .txt aufnehmen möchten, können Sie das Glob-Muster als **.txt angeben.

Informationen zur unterstützten Syntax für Glob-Muster finden Sie in der Cloud Storage-Dokumentation.

Cloud Storage-Importthemen verwenden

Sie können ein neues Importthema erstellen oder ein vorhandenes Thema bearbeiten.

Hinweise

  • Wenn Sie das Thema und das Abo separat erstellen, kann es zu Datenverlusten kommen, auch wenn Sie die Schritte schnell nacheinander ausführen. Es gibt ein kurzes Zeitfenster, in dem das Thema ohne Abo verfügbar ist. Wenn während dieser Zeit Daten an das Thema gesendet werden, gehen sie verloren. Wenn Sie zuerst das Thema und dann das Abo erstellen und das Thema anschließend in ein Importthema umwandeln, stellen Sie sicher, dass während des Importvorgangs keine Nachrichten verloren gehen.

Cloud Storage-Importthema erstellen

So erstellen Sie ein Cloud Storage-Importthema:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Themen auf.

    Themen aufrufen

  2. Klicken Sie auf Thema erstellen.

    Die Seite mit den Themendetails wird geöffnet.

  3. Geben Sie im Feld Themen-ID eine ID für das Cloud Storage-Importthema ein.

    Weitere Informationen zur Benennung von Themen finden Sie in den Benennungsrichtlinien.

  4. Wählen Sie Standardabo hinzufügen aus.

  5. Wählen Sie Aufnahme aktivieren aus.

  6. Wählen Sie als Erfassungsquelle Google Cloud Storage aus.

  7. Klicken Sie für den Cloud Storage-Bucket auf Durchsuchen.

    Die Seite Bucket auswählen wird geöffnet. Wählen Sie eine der folgenden Optionen aus:

    • Wählen Sie einen vorhandenen Bucket aus einem beliebigen geeigneten Projekt aus.

    • Klicken Sie auf das Symbol zum Erstellen und folgen Sie der Anleitung auf dem Bildschirm, um einen neuen Bucket zu erstellen. Nachdem Sie den Bucket erstellt haben, wählen Sie ihn für das Cloud Storage-Importthema aus.

  8. Wenn Sie den Bucket angeben, prüft Pub/Sub, ob das Pub/Sub-Dienstkonto die entsprechenden Berechtigungen für den Bucket hat. Wenn es Probleme mit Berechtigungen gibt, wird eine Meldung ähnlich der folgenden angezeigt:

    Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

    Wenn Sie Probleme mit Berechtigungen haben, klicken Sie auf Berechtigungen festlegen. Weitere Informationen finden Sie unter Cloud Storage-Berechtigungen für das Pub/Sub-Dienstkonto gewähren.

  9. Wählen Sie für Objektformat die Option Text, Avro oder Pub/Sub-Avro aus.

    Wenn Sie Text auswählen, können Sie optional ein Trennzeichen angeben, mit dem Objekte in Nachrichten aufgeteilt werden.

    Weitere Informationen zu diesen Optionen finden Sie unter Eingabeformat.

  10. Optional. Sie können für Ihr Thema eine Mindesterstellungszeit der Objekte angeben. Wenn festgelegt, werden nur Objekte aufgenommen, die nach der Mindesterstellungszeit der Objekte erstellt wurden.

    Weitere Informationen finden Sie unter Mindesterstellungszeit der Objekte.

  11. Sie müssen ein Glob-Muster angeben. Wenn Sie alle Objekte im Bucket aufnehmen möchten, verwenden Sie ** als Glob-Muster. Wenn festgelegt, werden nur Objekte aufgenommen, die dem angegebenen Muster entsprechen.

    Weitere Informationen finden Sie unter Glob-Muster abgleichen.

  12. Behalten Sie die anderen Standardeinstellungen bei.
  13. Klicken Sie auf Thema erstellen.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Führen Sie den Befehl gcloud pubsub topics create aus:

    gcloud pubsub topics create TOPIC_ID\
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Im Befehl sind nur TOPIC_ID, das Flag --cloud-storage-ingestion-bucket und das Flag --cloud-storage-ingestion-input-format erforderlich. Die verbleibenden Flags sind optional und können weggelassen werden.

    Ersetzen Sie Folgendes:

    • TOPIC_ID: Der Name oder die ID Ihres Themas.

    • BUCKET_NAME: Gibt den Namen eines vorhandenen Buckets an. Beispiel: prod_bucket. Der Bucket-Name darf die Projekt-ID nicht enthalten. Informationen zum Erstellen eines Buckets finden Sie unter Buckets erstellen.

    • INPUT_FORMAT: Gibt das Format der aufgenommenen Objekte an. Das kann text, avro oder pubsub_avro sein. Weitere Informationen zu diesen Optionen finden Sie unter Eingabeformat.

    • TEXT_DELIMITER: Gibt das Trennzeichen an, mit dem Textobjekte in Pub/Sub-Nachrichten aufgeteilt werden. Dies sollte ein einzelnes Zeichen sein und nur festgelegt werden, wenn INPUT_FORMAT gleich text ist. Die Standardeinstellung ist das Zeilenumbruchzeichen (\n).

      Achten Sie bei der Angabe des Trennzeichens mit der gcloud CLI auf die Verarbeitung von Sonderzeichen wie dem Zeilenumbruch \n. Verwenden Sie das Format '\n', damit das Trennzeichen richtig interpretiert wird. Wenn Sie \n ohne Anführungszeichen oder Escapezeichen verwenden, wird "n" als Trennzeichen verwendet.

    • MINIMUM_OBJECT_CREATE_TIME: Gibt die Mindestzeit an, zu der ein Objekt erstellt wurde, damit es aufgenommen werden kann. Sie muss in UTC im Format YYYY-MM-DDThh:mm:ssZ angegeben werden. Beispiel: 2024-10-14T08:30:30Z

      Jedes Datum zwischen 0001-01-01T00:00:00Z und 9999-12-31T23:59:59Z (einschließlich), unabhängig davon, ob es in der Vergangenheit oder Zukunft liegt, ist gültig.

    • MATCH_GLOB: Gibt das Glob-Muster an, das abgeglichen werden muss, damit ein Objekt aufgenommen wird. Wenn Sie die gcloud CLI verwenden, muss ein Glob-Muster mit *-Zeichen das *-Zeichen als Escapezeichen in der Form \*\*.txt enthalten oder das gesamte Glob-Muster muss in Anführungszeichen "**.txt" oder '**.txt' stehen. Informationen zur unterstützten Syntax für Glob-Muster finden Sie in der Cloud Storage-Dokumentation.

Go

Folgen Sie der Einrichtungsanleitung für Go in der Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

Richten Sie zur Authentifizierung bei Pub/Sub Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/timestamppb"
)

func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime, delimiter string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// bucket := "my-bucket"
	// matchGlob := "**.txt"
	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"
	// delimiter := ","

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
	if err != nil {
		return err
	}

	topicpb := &pubsubpb.Topic{
		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_CloudStorage_{
				CloudStorage: &pubsubpb.IngestionDataSourceSettings_CloudStorage{
					Bucket: bucket,
					// Alternatively, can be Avro or PubSubAvro formats. See
					InputFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat_{
						TextFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat{
							Delimiter: &delimiter,
						},
					},
					MatchGlob:               matchGlob,
					MinimumObjectCreateTime: timestamppb.New(minCreateTime),
				},
			},
		},
	}
	t, err := client.TopicAdminClient.CreateTopic(ctx, topicpb)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
	return nil
}

Java

Folgen Sie der Einrichtungsanleitung für Java in der Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.

Richten Sie zur Authentifizierung bei Pub/Sub Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.text.ParseException;

public class CreateTopicWithCloudStorageIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Cloud Storage ingestion settings.
    // bucket and inputFormat are required arguments.
    String bucket = "your-bucket";
    String inputFormat = "text";
    String textDelimiter = "\n";
    String matchGlob = "**.txt";
    String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";

    createTopicWithCloudStorageIngestionExample(
        projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
  }

  public static void createTopicWithCloudStorageIngestionExample(
      String projectId,
      String topicId,
      String bucket,
      String inputFormat,
      String textDelimiter,
      String matchGlob,
      String minimumObjectCreateTime)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
          IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
      switch (inputFormat) {
        case "text":
          cloudStorageBuilder.setTextFormat(
              IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                  .setDelimiter(textDelimiter)
                  .build());
          break;
        case "avro":
          cloudStorageBuilder.setAvroFormat(
              IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
          break;
        case "pubsub_avro":
          cloudStorageBuilder.setPubsubAvroFormat(
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
          break;
        default:
          throw new IllegalArgumentException(
              "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
      }

      if (matchGlob != null && !matchGlob.isEmpty()) {
        cloudStorageBuilder.setMatchGlob(matchGlob);
      }

      if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
        try {
          cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
        } catch (ParseException e) {
          System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
        }
      }

      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder()
              .setCloudStorage(cloudStorageBuilder.build())
              .build();

      TopicName topicName = TopicName.of(projectId, topicId);

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

Folgen Sie der Einrichtungsanleitung für Node.js in der Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Richten Sie zur Authentifizierung bei Pub/Sub Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId,
  bucket,
  inputFormat,
  textDelimiter,
  matchGlob,
  minimumObjectCreateTime,
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Python

Folgen Sie der Einrichtungsanleitung für Python in der Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

Richten Sie zur Authentifizierung bei Pub/Sub Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

from google.cloud import pubsub_v1
from google.protobuf import timestamp_pb2
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bucket = "your-bucket"
# input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
# text_delimiter = "\n"
# match_glob = "**.txt"
# minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
    bucket=bucket,
)
if input_format == "text":
    cloud_storage_settings.text_format = (
        IngestionDataSourceSettings.CloudStorage.TextFormat(
            delimiter=text_delimiter
        )
    )
elif input_format == "avro":
    cloud_storage_settings.avro_format = (
        IngestionDataSourceSettings.CloudStorage.AvroFormat()
    )
elif input_format == "pubsub_avro":
    cloud_storage_settings.pubsub_avro_format = (
        IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
    )
else:
    print(
        "Invalid input_format: "
        + input_format
        + "; must be in ('text', 'avro', 'pubsub_avro')"
    )
    return

if match_glob:
    cloud_storage_settings.match_glob = match_glob

if minimum_object_create_time:
    try:
        minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
        minimum_object_create_time_timestamp.FromJsonString(
            minimum_object_create_time
        )
        cloud_storage_settings.minimum_object_create_time = (
            minimum_object_create_time_timestamp
        )
    except ValueError:
        print("Invalid minimum_object_create_time: " + minimum_object_create_time)
        return

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        cloud_storage=cloud_storage_settings,
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

C++

Folgen Sie der Einrichtungsanleitung für C++ in der Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

Richten Sie zur Authentifizierung bei Pub/Sub Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string bucket, std::string const& input_format,
   std::string text_delimiter, std::string match_glob,
   std::string const& minimum_object_create_time) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                             ->mutable_cloud_storage();
  cloud_storage.set_bucket(std::move(bucket));
  if (input_format == "text") {
    cloud_storage.mutable_text_format()->set_delimiter(
        std::move(text_delimiter));
  } else if (input_format == "avro") {
    cloud_storage.mutable_avro_format();
  } else if (input_format == "pubsub_avro") {
    cloud_storage.mutable_pubsub_avro_format();
  } else {
    std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                 "got value: "
              << input_format << std::endl;
    return;
  }

  if (!match_glob.empty()) {
    cloud_storage.set_match_glob(std::move(match_glob));
  }

  if (!minimum_object_create_time.empty()) {
    google::protobuf::Timestamp timestamp;
    if (!google::protobuf::util::TimeUtil::FromString(
            minimum_object_create_time,
            cloud_storage.mutable_minimum_object_create_time())) {
      std::cout << "Invalid minimum object create time: "
                << minimum_object_create_time << std::endl;
    }
  }

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

Lesen Sie unter Pub/Sub-Kurzanleitung: Clientbibliotheken verwenden die Anleitung für die Einrichtung von Node.js, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Richten Sie zur Authentifizierung bei Pub/Sub Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId: string,
  bucket: string,
  inputFormat: string,
  textDelimiter: string,
  matchGlob: string,
  minimumObjectCreateTime: string,
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata: TopicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Falls Probleme auftreten, lesen Sie den Abschnitt Fehlerbehebung beim Importieren aus Cloud Storage.

Cloud Storage-Importthema bearbeiten

Sie können ein Cloud Storage-Importthema bearbeiten, um seine Eigenschaften zu aktualisieren.

Wenn Sie beispielsweise die Aufnahme neu starten möchten, können Sie den Bucket ändern oder die Mindestzeit für die Objekterstellung aktualisieren.

So bearbeiten Sie ein Cloud Storage-Importthema:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Themen auf.

    Themen aufrufen

  2. Klicken Sie auf das Thema „Cloud Storage-Import“.

  3. Klicken Sie auf der Seite mit den Themendetails auf Bearbeiten.

  4. Aktualisieren Sie die Felder, die Sie ändern möchten.

  5. Klicken Sie auf Aktualisieren.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Damit die Einstellungen für das Importthema nicht verloren gehen, müssen Sie sie bei jeder Aktualisierung des Themas angeben. Wenn Sie etwas weglassen, wird die Einstellung von Pub/Sub auf den ursprünglichen Standardwert zurückgesetzt.

    Führen Sie den Befehl gcloud pubsub topics update mit allen im folgenden Beispiel genannten Flags aus:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    Ersetzen Sie Folgendes:

    • TOPIC_ID ist die ID oder der Name des Themas. Dieses Feld kann nicht aktualisiert werden.

    • BUCKET_NAME: Gibt den Namen eines vorhandenen Buckets an. Beispiel: prod_bucket. Der Bucket-Name darf die Projekt-ID nicht enthalten. Informationen zum Erstellen eines Buckets finden Sie unter Buckets erstellen.

    • INPUT_FORMAT: Gibt das Format der aufgenommenen Objekte an. Das kann text, avro oder pubsub_avro sein. Weitere Informationen zu diesen Optionen finden Sie unter Eingabeformat.

    • TEXT_DELIMITER: Gibt das Trennzeichen an, mit dem Textobjekte in Pub/Sub-Nachrichten aufgeteilt werden. Dies sollte ein einzelnes Zeichen sein und nur festgelegt werden, wenn INPUT_FORMAT gleich text ist. Die Standardeinstellung ist das Zeilenumbruchzeichen (\n).

      Achten Sie bei der Angabe des Trennzeichens mit der gcloud CLI auf die Verarbeitung von Sonderzeichen wie dem Zeilenumbruch \n. Verwenden Sie das Format '\n', damit das Trennzeichen richtig interpretiert wird. Wenn Sie \n ohne Anführungszeichen oder Escapezeichen verwenden, wird "n" als Trennzeichen verwendet.

    • MINIMUM_OBJECT_CREATE_TIME: Gibt die Mindestzeit an, zu der ein Objekt erstellt wurde, damit es aufgenommen werden kann. Sie muss in UTC im Format YYYY-MM-DDThh:mm:ssZ angegeben werden. Beispiel: 2024-10-14T08:30:30Z

      Jedes Datum zwischen 0001-01-01T00:00:00Z und 9999-12-31T23:59:59Z (einschließlich), unabhängig davon, ob es in der Vergangenheit oder Zukunft liegt, ist gültig.

    • MATCH_GLOB: Gibt das Glob-Muster an, das abgeglichen werden muss, damit ein Objekt aufgenommen wird. Wenn Sie die gcloud CLI verwenden, muss ein Glob-Muster mit *-Zeichen das *-Zeichen als Escapezeichen in der Form \*\*.txt enthalten oder das gesamte Glob-Muster muss in Anführungszeichen "**.txt" oder '**.txt' stehen. Informationen zur unterstützten Syntax für Glob-Muster finden Sie in der Cloud Storage-Dokumentation.

Kontingente und Limits für Cloud Storage-Importthemen

Der Publisher-Durchsatz für Importthemen ist an das Veröffentlichungskontingent des Themas gebunden. Weitere Informationen finden Sie unter Pub/Sub-Kontingente und ‑Limits.

Nächste Schritte