Gemini API 的批量模式旨在以标准费用 50% 的价格异步处理大量请求。目标周转时间为 24 小时,但在大多数情况下,周转时间会短得多。
对于大规模的非紧急任务(例如数据预处理或运行评估),请使用批处理模式,因为这些任务不需要立即响应。
使用入门
本部分将帮助您开始以批量模式提交第一个请求。
创建批处理作业
您可以通过以下两种方式在批量模式下提交请求:
- 内嵌请求:直接包含在批量创建请求中的
GenerateContentRequest
对象列表。此方法适用于总请求大小不超过 20MB 的较小批次。从模型返回的输出是inlineResponse
对象的列表。 - 输入文件:一种 JSON Lines (JSONL) 文件,其中每行都包含一个完整的
GenerateContentRequest
对象。 建议针对较大的请求使用此方法。模型返回的输出是一个 JSONL 文件,其中每行都是一个GenerateContentResponse
或状态对象。
内嵌请求
对于少量请求,您可以直接将 GenerateContentRequest
对象嵌入到 BatchGenerateContentRequest
中。以下示例使用内嵌请求调用 BatchGenerateContent
方法:
Python
from google import genai
from google.genai import types
client = genai.Client()
# A list of dictionaries, where each is a GenerateContentRequest
inline_requests = [
{
'contents': [{
'parts': [{'text': 'Tell me a one-sentence joke.'}],
'role': 'user'
}]
},
{
'contents': [{
'parts': [{'text': 'Why is the sky blue?'}],
'role': 'user'
}]
}
]
inline_batch_job = client.batches.create(
model="models/gemini-2.5-flash",
src=inline_requests,
config={
'display_name': "inlined-requests-job-1",
},
)
print(f"Created batch job: {inline_batch_job.name}")
REST
curl https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:batchGenerateContent \
-H "x-goog-api-key: $GEMINI_API_KEY" \
-X POST \
-H "Content-Type:application/json" \
-d '{
"batch": {
"display_name": "my-batch-requests",
"input_config": {
"requests": {
"requests": [
{
"request": {"contents": [{"parts": [{"text": "Describe the process of photosynthesis."}]}]},
"metadata": {
"key": "request-1"
}
},
{
"request": {"contents": [{"parts": [{"text": "Describe the process of photosynthesis."}]}]},
"metadata": {
"key": "request-2"
}
}
]
}
}
}
}'
您可以使用在非批量(或交互式)模式下使用的任何请求。例如,您可以指定温度、系统指令,甚至传入其他模态。以下示例展示了一些内嵌请求示例,其中一个请求包含系统指令:
inline_requests_list = [
{'contents': [{'parts': [{'text': 'Write a short poem about a cloud.'}]}]},
{'contents': [{'parts': [{'text': 'Write a short poem about a cat.'}]}], 'system_instructions': {'parts': [{'text': 'You are a cat. Your name is Neko.'}]}}
]
同样,您也可以为请求指定要使用的工具。以下示例展示了启用 Google 搜索工具的请求:
inline_requests_list = [
{'contents': [{'parts': [{'text': 'Who won the euro 1998?'}]}]},
{'contents': [{'parts': [{'text': 'Who won the euro 2025?'}]}], 'tools': [{'google_search ': {}}]}
]
输入文件
对于较大的请求集,请准备一个 JSON 行 (JSONL) 文件。此文件中的每一行都必须是一个 JSON 对象,其中包含用户定义的键和请求对象,并且请求是有效的 GenerateContentRequest
对象。用户定义的键用于响应中,以指明哪个输出是哪个请求的结果。例如,如果请求中定义的键为 request-1
,则相应响应也会使用相同的键名称进行注释。
此文件是使用 File API 上传的。输入文件的最大允许大小为 2GB。
以下是 JSONL 文件的一个示例。您可以将其保存到名为 my-batch-requests.json
的文件中:
{"key": "request-1", "request": {"contents": [{"parts": [{"text": "Describe the process of photosynthesis."}]}], "generation_config": {"temperature": 0.7}}}
{"key": "request-2", "request": {"contents": [{"parts": [{"text": "What are the main ingredients in a Margherita pizza?"}]}]}}
与内嵌请求类似,您可以在每个 JSON 请求中指定其他参数,例如系统指令、工具或其他配置。
您可以使用 File API 上传此文件,如以下示例所示。如果您使用的是多模态输入,则可以在 JSONL 文件中引用其他已上传的文件。
Python
from google import genai
from google.genai import types
client = genai.Client()
# Create a sample JSONL file
with open("my-batch-requests.jsonl", "w") as f:
requests = [
{"key": "request-1", "request": {"contents": [{"parts": [{"text": "Describe the process of photosynthesis."}]}]}},
{"key": "request-2", "request": {"contents": [{"parts": [{"text": "What are the main ingredients in a Margherita pizza?"}]}]}}
]
for req in requests:
f.write(json.dumps(req) + "\n")
# Upload the file to the File API
uploaded_file = client.files.upload(
file='my-batch-requests.jsonl',
config=types.UploadFileConfig(display_name='my-batch-requests', mime_type='jsonl')
)
print(f"Uploaded file: {uploaded_file.name}")
REST
tmp_batch_input_file=batch_input.tmp
echo -e '{"contents": [{"parts": [{"text": "Describe the process of photosynthesis."}]}], "generationConfig": {"temperature": 0.7}}\n{"contents": [{"parts": [{"text": "What are the main ingredients in a Margherita pizza?"}]}]}' > batch_input.tmp
MIME_TYPE=$(file -b --mime-type "${tmp_batch_input_file}")
NUM_BYTES=$(wc -c < "${tmp_batch_input_file}")
DISPLAY_NAME=BatchInput
tmp_header_file=upload-header.tmp
# Initial resumable request defining metadata.
# The upload url is in the response headers dump them to a file.
curl "https://generativelanguage.googleapis.com/upload/v1beta/files \
-D "${tmp_header_file}" \
-H "x-goog-api-key: $GEMINI_API_KEY" \
-H "X-Goog-Upload-Protocol: resumable" \
-H "X-Goog-Upload-Command: start" \
-H "X-Goog-Upload-Header-Content-Length: ${NUM_BYTES}" \
-H "X-Goog-Upload-Header-Content-Type: ${MIME_TYPE}" \
-H "Content-Type: application/jsonl" \
-d "{'file': {'display_name': '${DISPLAY_NAME}'}}" 2> /dev/null
upload_url=$(grep -i "x-goog-upload-url: " "${tmp_header_file}" | cut -d" " -f2 | tr -d "\r")
rm "${tmp_header_file}"
# Upload the actual bytes.
curl "${upload_url}" \
-H "Content-Length: ${NUM_BYTES}" \
-H "X-Goog-Upload-Offset: 0" \
-H "X-Goog-Upload-Command: upload, finalize" \
--data-binary "@${tmp_batch_input_file}" 2> /dev/null > file_info.json
file_uri=$(jq ".file.uri" file_info.json)
以下示例使用 File API 上传的输入文件调用 BatchGenerateContent
方法:
Python
# Assumes `uploaded_file` is the file object from the previous step
file_batch_job = client.batches.create(
model="gemini-2.5-flash",
src=uploaded_file.name,
config={
'display_name': "file-upload-job-1",
},
)
print(f"Created batch job: {file_batch_job.name}")
REST
BATCH_INPUT_FILE='files/123456' # File ID
curl https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:batchGenerateContent \
-X POST \
-H "x-goog-api-key: $GEMINI_API_KEY" \
-H "Content-Type:application/json" \
-d "{
'batch': {
'display_name': 'my-batch-requests',
'input_config': {
'requests': {
'file_name': ${BATCH_INPUT_FILE}
}
}
}
}"
创建批量作业时,系统会返回作业名称。使用此名称监控作业状态,并在作业完成后检索结果。
以下是包含作业名称的输出示例:
Created batch job from file: batches/123456789
监控作业状态
使用创建批处理作业时获得的操作名称来轮询其状态。批处理作业的状态字段将指示其当前状态。批处理作业可能处于以下任一状态:
JOB_STATE_PENDING
:作业已创建,正在等待服务处理。JOB_STATE_SUCCEEDED
:作业已成功完成。您现在可以检索结果了。JOB_STATE_FAILED
:作业失败。如需了解详情,请查看错误详情。JOB_STATE_CANCELLED
:作业已被用户取消。
您可以定期轮询作业状态,以检查作业是否已完成。
Python
# Use the name of the job you want to check
# e.g., inline_batch_job.name from the previous step
job_name = "YOUR_BATCH_JOB_NAME" # (e.g. 'batches/your-batch-id')
batch_job = client.batches.get(name=job_name)
completed_states = set([
'JOB_STATE_SUCCEEDED',
'JOB_STATE_FAILED',
'JOB_STATE_CANCELLED',
])
print(f"Polling status for job: {job_name}")
batch_job = client.batches.get(name=job_name) # Initial get
while batch_job.state.name not in completed_states:
print(f"Current state: {batch_job.state.name}")
time.sleep(30) # Wait for 30 seconds before polling again
batch_job = client.batches.get(name=job_name)
print(f"Job finished with state: {batch_job.state.name}")
if batch_job.state.name == 'JOB_STATE_FAILED':
print(f"Error: {batch_job.error}")
检索结果
当作业状态表明您的批处理作业已成功完成时,结果会显示在 response
字段中。
Python
import json
# Use the name of the job you want to check
# e.g., inline_batch_job.name from the previous step
job_name = "YOUR_BATCH_JOB_NAME"
batch_job = client.batches.get(name=job_name)
if batch_job.state.name == 'JOB_STATE_SUCCEEDED':
# If batch job was created with a file
if batch_job.dest and batch_job.dest.file_name:
# Results are in a file
result_file_name = batch_job.dest.file_name
print(f"Results are in file: {result_file_name}")
print("Downloading result file content...")
file_content = client.files.download(file=result_file_name)
# Process file_content (bytes) as needed
print(file_content.decode('utf-8'))
# If batch job was created with inline request
elif batch_job.dest and batch_job.dest.inlined_responses:
# Results are inline
print("Results are inline:")
for i, inline_response in enumerate(batch_job.dest.inlined_responses):
print(f"Response {i+1}:")
if inline_response.response:
# Accessing response, structure may vary.
try:
print(inline_response.response.text)
except AttributeError:
print(inline_response.response) # Fallback
elif inline_response.error:
print(f"Error: {inline_response.error}")
else:
print("No results found (neither file nor inline).")
else:
print(f"Job did not succeed. Final state: {batch_job.state.name}")
if batch_job.error:
print(f"Error: {batch_job.error}")
REST
BATCH_NAME="batches/123456" # Your batch job name
curl https://generativelanguage.googleapis.com/v1beta/$BATCH_NAME \
-H "x-goog-api-key: $GEMINI_API_KEY" \
-H "Content-Type:application/json" 2> /dev/null > batch_status.json
if jq -r '.done' batch_status.json | grep -q "false"; then
echo "Batch has not finished processing"
fi
batch_state=$(jq -r '.metadata.state' batch_status.json)
if [[ $batch_state = "JOB_STATE_SUCCEEDED" ]]; then
if [[ $(jq '.response | has("inlinedResponses")' batch_status.json) = "true" ]]; then
jq -r '.response.inlinedResponses' batch_status.json
exit
fi
responses_file_name=$(jq -r '.response.responsesFile' batch_status.json)
curl https://generativelanguage.googleapis.com/download/v1beta/$responses_file_name:download?alt=media \
-H "x-goog-api-key: $GEMINI_API_KEY" 2> /dev/null
elif [[ $batch_state = "JOB_STATE_FAILED" ]]; then
jq '.error' batch_status.json
elif [[ $batch_state == "JOB_STATE_CANCELLED" ]]; then
echo "Batch was cancelled by the user"
fi
取消批量作业
您可以使用正在进行的批处理作业的名称来取消该作业。当作业被取消时,它会停止处理新请求。
Python
# Cancel a batch job
client.batches.cancel(name=batch_job_to_cancel.name)
REST
BATCH_NAME="batches/123456" # Your batch job name
# Cancel the batch
curl https://generativelanguage.googleapis.com/v1beta/$BATCH_NAME:cancel \
-H "x-goog-api-key: $GEMINI_API_KEY" \
# Confirm that the status of the batch after cancellation is JOB_STATE_CANCELLED
curl https://generativelanguage.googleapis.com/v1beta/$BATCH_NAME \
-H "x-goog-api-key: $GEMINI_API_KEY" \
-H "Content-Type:application/json" 2> /dev/null | jq -r '.metadata.state'
删除批处理作业
您可以使用现有批处理作业的名称来删除该作业。删除作业后,该作业会停止处理新请求,并从批处理作业列表中移除。
Python
# Delete a batch job
client.batches.delete(name=batch_job_to_delete.name)
REST
BATCH_NAME="batches/123456" # Your batch job name
# Cancel the batch
curl https://generativelanguage.googleapis.com/v1beta/$BATCH_NAME:delete \
-H "x-goog-api-key: $GEMINI_API_KEY" \
技术详情
- 支持的模型:批量模式支持多种 Gemini 模型。 如需查看最新的兼容型号列表,请参阅“型号”页面。批量模式支持的模态与互动式(或非批量模式)API 支持的模态相同。
- 价格:批量模式的使用价格为相应模型的标准交互式 API 价格的 50%。
- 服务等级目标 (SLO):批量作业旨在在 24 小时内完成。许多作业可能会更快完成,具体取决于其大小和当前系统负载。
- 缓存:已为批量请求启用上下文缓存。如果批处理请求命中缓存,则缓存的令牌的价格与非批处理模式流量的价格相同。
最佳做法
- 针对大型请求使用输入文件:对于大量请求,请始终使用文件输入方法,以便更好地进行管理,并避免达到
BatchGenerateContent
调用的请求大小限制。请注意,每个输入文件的大小上限为 2GB。 - 错误处理:在作业完成后,检查
batchStats
是否为failedRequestCount
。如果使用文件输出,请解析每一行,以检查该行是GenerateContentResponse
还是指示特定请求出现错误的状态对象。 - 仅提交一次作业:批量作业的创建不是幂等的。如果您两次发送相同的创建请求,系统将创建两个单独的批处理作业。
- 拆分非常大的批次:虽然目标周转时间为 24 小时,但实际处理时间可能会因系统负载和作业大小而异。对于大型作业,如果需要尽快获得中间结果,请考虑将其拆分为较小的批次。
后续步骤
如需查看更多示例,请参阅批处理模式笔记本。