AI Model Inference Functions in Confluent Cloud for Apache Flink

Confluent Cloud for Apache Flink® provides the ML_PREDICT and ML_EVALUATE built-in functions to invoke remote AI/ML models in Flink SQL queries. These simplify developing and deploying AI applications by providing a unified platform for both data processing and AI/ML tasks.

For more information, see Run an AI Model.

ML_EVALUATE

Aggregate a table and return model evaluation metrics.

Syntax
ML_EVALUATE(`model_name`, label, col1, col2, ...) FROM 'eval_data_table'

Description

The ML_EVALUATE function is a table aggregation function that takes an entire table and returns a single row of model evaluation metrics. If run on all versions of a model, the function returns one row for each model version. After comparing the metrics for different versions, you can update the default version for deployment with the model that has the best evaluation metrics.

Internally, the ML_EVALUATE function runs ML_PREDICT and processes the results.

Before using ML_EVALUATE, you must register the model by using the CREATE MODEL statement.

The first argument to the ML_EVALUATE table function is the model name. The second argument is the true label that the output of the model should be evaluated against. Its type depends on the model OUTPUT type and the model task. The other arguments are the columns used for prediction. They are defined in the model resource INPUT for AI models and may vary in length or type.

The return type of the ML_EVALUATE function is Map<String, Double> for all types of tasks. Each task type has different metrics keys in the map, depending on the task type.

Metrics

The metric columns returned by ML_EVALUATE depend on the task type of the specified model.

Classification

Classification models choose a group to place their inputs in and return one of N possible values. A classification model that returns only 2 possible values is called a binary classifier. If it returns more than 2 values, it is referred to as multi-class.

Classification models return these metrics:

  • Accuracy: Total Fraction of correct predictions across all classes.
  • F1 Score: Harmonic mean of precision and recall.
  • Precision: (Class X Correctly Predicted) / (# of Class X Predicted)
  • Recall: (Class X Correctly Predicted) / (# of actual Class X)

Clustering

Clustering models group the model examples into K groups. Metrics are a measure of how compact the clusters are.

Clustering models return these metrics:

  • Davies Bouldin Index: A measure of how separated clusters are and how compact they are.
  • Intra-Cluster Variance (Mean Squared Distance): Average Squared distance of each training point to the centroid of the cluster it was assigned to.
  • Silhouette Score: Compares how similar each point is to its own cluster with how dissimilar it is to other clusters.

Embedding

Embedding models return these metrics:

Regression

Regression models predict a continuous output variable based on one or more input features.

Regression models return these metrics:

  • Mean Absolute Error: The average of the absolute differences between the predicted and actual values.
  • Mean Squared Error: The average of the squared differences between the predicted and actual values.

Text generation

Text generation models generate text based on a prompt. Text generation models return these metrics:

Example metrics

The following table shows example metrics for different task types.

Task type Example metrics
Classification {Accuracy=0.9999991465990892, Precision=0.9996998081063332, Recall=0.0013025368892873059, F1=0.0013025368892873059}
Clustering {Mean Davies-Bouldin Index=0.9999991465990892}
Embedding {Mean Cosine Similarity=0.9999991465990892, Mean Jaccard Similarity=0.9996998081063332, Mean Euclidean Distance=0.0013025368892873059}
Regression {MAE=0.9999991465990892, MSE=0.9996998081063332, RMSE=0.0013025368892873059, MAPE=0.0013025368892873059, R²=0.0043025368892873059}
Text generation {Mean BLEU=0.9999991465990892, Mean ROUGE=0.9996998081063332, Mean Semantic Similarity=0.0013025368892873059}

Example

After you have registered the AI model by using the CREATE MODEL statement, run the model by using the ML_EVALUATE function in a SQL query.

The following example statement registers a remote OpenAI model for a classification task.

CREATE MODEL `my_remote_model`
INPUT (f1 INT, f2 STRING)
OUTPUT (output_label STRING)
WITH(
  'task' = 'classification',
  'type' = 'remote',
  'provider' = 'openai',
  'openai.endpoint' = 'https://api.openai.com/v1/llm/v1/chat',
  'openai.api_key' = '<api-key>'
);

The following statements show how to run the ML_EVALUATE function on various versions of my_remote_model using data in a table named eval_data.

-- Model evaluation with all versions
SELECT ML_EVALUATE(`my_remote_model$all`, label, f1, f2) FROM `eval_data`;

-- Model evaluation with default version
SELECT ML_EVALUATE(`my_remote_model`, label, f1, f2) FROM `eval_data`;

-- Model evaluation with specific version 2
SELECT ML_EVALUATE(`my_remote_model$2`, label, f1, f2) FROM `eval_data`;

ML_PREDICT

Run a remote AI/ML model for tasks like predicting outcomes, generating text, and classification.

Syntax
ML_PREDICT(`model_name[$version_id]`, column);

-- map settings are optional
ML_PREDICT(`model_name[$version_id]`, column, map['async_enabled', [boolean], 'client_timeout', [int], 'max_parallelism', [int], 'retry_count', [int]])
Description

The ML_PREDICT function performs predictions using pre-trained machine learning models.

The first argument to the ML_PREDICT table function is the model name. The other arguments are the columns used for prediction. They are defined in the model resource INPUT for AI models and may vary in length or type.

Before using ML_PREDICT, you must register the model by using the CREATE MODEL statement.

Configuration

You can control how calls to the remote model execute with these optional parameters.

  • async_enabled: Calls to remote models are asynchronous and don’t block. The default is true.
  • client_timeout: Time, in seconds, after which the request to the model endpoint times out. The default is 30 seconds.
  • debug: Return a detailed stack trace in the API response. The default is false. Confluent Cloud for Apache Flink implements data masking for error messages to remove any secrets or customer input, but the stack trace may contain the prompt itself or some part of the response string.
  • retry_count: Maximum number of times the remote model request is retried if the request to the model fails. The default is 3.
  • max_parallelism: Maximum number of parallel requests that the function can make. Can be used only when async_enabled is true. The default is 10.
Example

After you have registered the AI model by using the CREATE MODEL statement, run the model by using the ML_PREDICT function in a SQL query.

The following example runs a model named embeddingmodel on the data in a table named text_stream.

SELECT id, text, embedding FROM text_stream, LATERAL TABLE(ML_PREDICT('embeddingmodel', text));

The following examples call the ML_PREDICT function with different configurations.

-- Specify the timeout.
SELECT * FROM `db1`.`tb1`, LATERAL TABLE(ML_PREDICT('md1', key, map['client_timeout', 60 ]));

-- Specify all configuration parameters.
SELECT * FROM `db1`.`tb1`, LATERAL TABLE(ML_PREDICT('md1', key, map['async_enabled', true, 'client_timeout', 60, 'max_parallelism', 20, 'retry_count', 5]));