Alcuni modelli Dataflow forniti da Google supportano le funzioni definite dall'utente (UDF). Le funzioni definite dall'utente ti consentono di estendere la funzionalità di un modello senza modificarne il codice.
Panoramica
Per creare una UDF, scrivi una funzione JavaScript o Python, a seconda del modello. Archivi il file di codice della funzione definita dall'utente in Cloud Storage e specifichi la posizione come parametro del modello. Per ogni elemento di input, il modello chiama la tua funzione. La funzione trasforma l'elemento o esegue altre operazioni personalizzate e restituisce il risultato al modello.
Ad esempio, potresti utilizzare una UDF per:
- Riformatta i dati di input in modo che corrispondano a uno schema di destinazione.
- Oscura i dati sensibili.
- Filtra alcuni elementi dall'output.
L'input della funzione UDF è un singolo elemento di dati, serializzato come stringa JSON. La funzione restituisce una stringa JSON serializzata come output. Il formato dei dati dipende dal modello. Ad esempio, nel modello Sottoscrizione Pub/Sub a BigQuery, l'input sono i dati dei messaggi Pub/Sub serializzati come oggetto JSON e l'output è un oggetto JSON serializzato che rappresenta una riga della tabella BigQuery. Per saperne di più, consulta la documentazione di ogni modello.
Esegui un modello con una funzione definita dall'utente
Per eseguire un modello con una funzione definita dall'utente, specifica la posizione Cloud Storage del file JavaScript e il nome della funzione come parametri del modello.
Con alcuni modelli forniti da Google, puoi anche creare la funzione definita dall'utente direttamente nella consoleGoogle Cloud , come segue:
Vai alla pagina Dataflow nella console Google Cloud .
Fai clic su add_boxCrea job da modello.
Seleziona il modello fornito da Google che vuoi eseguire.
Espandi Parametri facoltativi. Se il modello supporta le funzioni definite dall'utente, ha un parametro per la posizione Cloud Storage della funzione definita dall'utente e un altro parametro per il nome della funzione.
Accanto al parametro del modello, fai clic su Crea UDF.
Nel riquadro Seleziona o crea una funzione definita dall'utente:
- Inserisci un nome file. Esempio:
my_udf.js
. - Seleziona una cartella Cloud Storage.
Esempio:
gs://your-bucket/your-folder
. - Utilizza l'editor di codice in linea per scrivere la funzione. L'editor è precompilato con un codice standard che puoi utilizzare come punto di partenza.
Fai clic su Crea UDF.
La console Google Cloud salva il file della funzione definita dall'utente e compila la posizione di Cloud Storage.
Inserisci il nome della funzione nel campo corrispondente.
- Inserisci un nome file. Esempio:
Scrivere una funzione JavaScript definita dall'utente
Il seguente codice mostra una UDF JavaScript no-op da cui puoi iniziare:
/*
* @param {string} inJson input JSON message (stringified)
* @return {?string} outJson output JSON message (stringified)
*/
function process(inJson) {
const obj = JSON.parse(inJson);
// Example data transformations:
// Add a field: obj.newField = 1;
// Modify a field: obj.existingField = '';
// Filter a record: return null;
return JSON.stringify(obj);
}
Il codice JavaScript viene eseguito sul
motore JavaScript Nashorn. Ti consigliamo di testare la UDF sul motore Nashorn prima di eseguirne il deployment. Il motore Nashorn
non corrisponde esattamente all'implementazione di JavaScript di Node.js. Un problema
comune è l'utilizzo di console.log()
o Number.isNaN()
, nessuno dei quali è
definito nel motore Nashorn.
Puoi testare la tua UDF sul motore Nashorn utilizzando Cloud Shell, in cui è preinstallato JDK 11. Avvia Nashorn in modalità interattiva nel seguente modo:
jjs --language=es6
Nella shell interattiva Nashorn, esegui questi passaggi:
- Chiama
load
per caricare il file JavaScript della funzione definita dall'utente. - Definisci un oggetto JSON di input a seconda dei messaggi previsti della pipeline.
- Utilizza la funzione
JSON.stringify
per serializzare l'input in una stringa JSON. - Chiama la funzione UDF per elaborare la stringa JSON.
- Chiama
JSON.parse
per deserializzare l'output. - Verifica il risultato.
Esempio:
> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)
Scrivi una funzione definita dall'utente Python
Il seguente codice mostra una UDF Python no-op da cui puoi iniziare:
import json
def process(value):
# Load the JSON string into a dictionary.
data = json.loads(value)
# Transform the data in some way.
data['new_field'] = 'new_value'
# Serialize the data back to JSON.
return json.dumps(data)
Le UDF Python supportano pacchetti di dipendenze standard per Python e Apache Beam. Non possono utilizzare pacchetti di terze parti.
Gestione degli errori
In genere, quando si verifica un errore durante l'esecuzione dell'UDF, l'errore viene scritto in una
posizione di messaggi non recapitabili. I dettagli dipendono dal modello. Ad esempio, il modello
Sottoscrizione Pub/Sub a BigQuery
crea una tabella _error_records
e scrive gli errori al suo interno. Gli errori UDF di runtime
possono verificarsi a causa di errori di sintassi o eccezioni non rilevate. Per verificare la presenza di errori di sintassi, testa la tua UDF localmente.
Puoi generare un'eccezione a livello di programmazione per un elemento che non deve essere elaborato. In questo caso, l'elemento viene scritto nella posizione di messaggi non recapitabili, se il modello ne supporta una. Per un esempio che mostra questo approccio, consulta Eventi di routing.
Esempi di casi d'uso
Questa sezione descrive alcuni pattern comuni per le UDF, basati su casi d'uso reali.
Arricchire gli eventi
Utilizza una UDF per arricchire gli eventi con nuovi campi per informazioni più contestuali.
Esempio:
function process(inJson) {
const data = JSON.parse(inJson);
// Add new field to track data source
data.source = "source1";
return JSON.stringify(data);
}
Trasformare gli eventi
Utilizza una funzione definita dall'utente per trasformare l'intero formato dell'evento a seconda di ciò che si aspetta la destinazione.
L'esempio seguente ripristina una voce di log di Cloud Logging
(LogEntry
) alla stringa di log originale
quando disponibile. A seconda dell'origine log, la stringa di log originale viene
a volte compilata nel campo textPayload
. Puoi utilizzare questo pattern per inviare i log non elaborati nel formato originale, anziché inviare l'intero LogEntry
da Cloud Logging.
function process(inJson) {
const data = JSON.parse(inJson);
if (data.textPayload) {
return data.textPayload; // Return string value, and skip JSON.stringify
}
return JSON.stringify(obj);
}
Oscurare o rimuovere i dati sugli eventi
Utilizza una UDF per oscurare o rimuovere una parte dell'evento.
L'esempio seguente redige il nome del campo sensitiveField
sostituendone il
valore e rimuove completamente il campo denominato redundantField
.
function process(inJson) {
const data = JSON.parse(inJson);
// Normalize existing field values
data.source = (data.source && data.source.toLowerCase()) || "unknown";
// Redact existing field values
if (data.sensitiveField) {
data.sensitiveField = "REDACTED";
}
// Remove existing fields
if (data.redundantField) {
delete(data.redundantField);
}
return JSON.stringify(data);
}
Eventi di route
Utilizza una UDF per indirizzare gli eventi a destinazioni separate nel sink downstream.
Il seguente esempio, basato sul modello Pub/Sub-Splunk, instrada ogni evento all'indice Splunk corretto. Chiama una funzione locale definita dall'utente per mappare gli eventi agli indici.
function process(inJson) {
const obj = JSON.parse(inJson);
// Set index programmatically for data segregation in Splunk
obj._metadata = {
index: splunkIndexLookup(obj)
}
return JSON.stringify(obj);
}
Il seguente esempio indirizza gli eventi non riconosciuti alla coda di messaggi non recapitabili, supponendo che il modello supporti una coda di messaggi non recapitabili. Ad esempio, consulta il modello Da Pub/Sub a JDBC. Puoi utilizzare questo pattern per filtrare le voci impreviste prima di scrivere nella destinazione.
function process(inJson) {
const data = JSON.parse(inJson);
// Route unrecognized events to the deadletter topic
if (!data.hasOwnProperty('severity')) {
throw new Error("Unrecognized event. eventId='" + data.Id + "'");
}
return JSON.stringify(data);
Filtra eventi
Utilizza una funzione definita dall'utente per filtrare gli eventi indesiderati o non riconosciuti dall'output.
Il seguente esempio elimina gli eventi in cui data.severity
è uguale a "DEBUG"
.
function process(inJson) {
const data = JSON.parse(inJson);
// Drop events with certain field values
if (data.severity == "DEBUG") {
return null;
}
return JSON.stringify(data);
}
Passaggi successivi
- Modelli forniti da Google
- Crea ed esegui un modello flessibile
- Esecuzione di modelli classici
- Estendi il modello Dataflow con le UDF (post del blog)
- UDF di esempio (GitHub)