import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem';
Prepare and upload your batch
A batch is composed of a list of API requests. The structure of an individual request includes:
- A unique
custom_id
for identifying each request and referencing results after completion - A
body
object with message information
Here's an example of how to structure a batch request:
1{"custom_id": "0", "body": {"max_tokens": 100, "messages": [{"role": "user", "content": "What is the best French cheese?"}]}}
2{"custom_id": "1", "body": {"max_tokens": 100, "messages": [{"role": "user", "content": "What is the best French wine?"}]}}
Save your batch into a .jsonl file. Once saved, you can upload your batch input file to ensure it is correctly referenced when initiating batch processes:
1from mistralai import Mistral
2import os
3
4api_key = os.environ["MISTRAL_API_KEY"]
5
6client = Mistral(api_key=api_key)
7
8batch_data = client.files.upload(
9 file={
10 "file_name": "test.jsonl",
11 "content": open("test.jsonl", "rb")
12 },
13 purpose = "batch"
14)
1import { Mistral } from '@mistralai/mistralai';
2import fs from 'fs';
3
4const apiKey = process.env.MISTRAL_API_KEY;
5
6const client = new Mistral({ apiKey: apiKey });
7
8const batchFile = fs.readFileSync('batch_input_file.jsonl');
9const batchData = await client.files.upload({
10 file: {
11 fileName: 'batch_input_file.jsonl',
12 content: batchFile,
13 },
14 purpose: 'batch',
15});
1curl https://api.mistral.ai/v1/files \
2 -H "Authorization: Bearer $MISTRAL_API_KEY" \
3 -F purpose="batch" \
4 -F file="@batch_input_file.jsonl"
Create a new batch job
Create a new batch job, it will be queued for processing.
input_files
: a list of the batch input file IDs.model
: you can only use one model (e.g.,codestral-latest
) per batch. However, you can run multiple batches on the same files with different models if you want to compare outputs.endpoint
: we currently support/v1/embeddings
,/v1/chat/completions
,/v1/fim/completions
,/v1/moderations
,/v1/chat/moderations
.metadata
: optional custom metadata for the batch.
1import { Mistral } from '@mistralai/mistralai';
2
3const apiKey = process.env.MISTRAL_API_KEY;
4
5const client = new Mistral({ apiKey: apiKey });
6
7const createdJob = await client.batch.jobs.create({
8 inputFiles: [batchData.id],
9 model: 'mistral-small-latest',
10 endpoint: '/v1/chat/completions',
11 metadata: { jobType: 'testing' },
12});
1curl --location "https://api.mistral.ai/v1/batch/jobs" \
2--header "Authorization: Bearer $MISTRAL_API_KEY" \
3--header "Content-Type: application/json" \
4--header "Accept: application/json" \
5--data '{
6 "model": "mistral-small-latest",
7 "input_files": [
8 "<uuid>"
9 ],
10 "endpoint": "/v1/chat/completions",
11 "metadata": {
12 "job_type": "testing"
13 }
14}'
Get a batch job details
1retrieved_job = client.batch.jobs.get(job_id=created_job.id)
1const retrievedJob = await client.batch.jobs.get({ jobId: createdJob.id });
1curl https://api.mistral.ai/v1/batch/jobs/<jobid> \
2--header "Authorization: Bearer $MISTRAL_API_KEY"
Get batch job results
1output_file_stream = client.files.download(file_id=retrieved_job.output_file)
2
3# Write and save the file
4with open('batch_results.jsonl', 'wb') as f:
5 f.write(output_file_stream.read())
1import fs from 'fs';
2
3const outputFileStream = await client.files.download({
4 fileId: retrievedJob.outputFile,
5});
6
7// Write the stream to a file
8const writeStream = fs.createWriteStream('batch_results.jsonl');
9outputFileStream.pipeTo(
10 new WritableStream({
11 write(chunk) {
12 writeStream.write(chunk);
13 },
14 close() {
15 writeStream.end();
16 },
17 })
18);
1curl 'https://api.mistral.ai/v1/files/<uuid>/content' \
2--header "Authorization: Bearer $MISTRAL_API_KEY" \
List batch jobs
You can view a list of your batch jobs and filter them by various criteria, including:
- Status:
QUEUED
,RUNNING
,SUCCESS
,FAILED
,TIMEOUT_EXCEEDED
,CANCELLATION_REQUESTED
andCANCELLED
- Metadata: custom metadata key and value for the batch
1list_job = client.batch.jobs.list(
2 status="RUNNING",
3 metadata={"job_type": "testing"}
4)
1const listJob = await client.batch.jobs.list({
2 status: 'RUNNING',
3 metadata: {
4 jobType: 'testing',
5 },
6});
1curl 'https://api.mistral.ai/v1/batch/jobs?status=RUNNING&job_type=testing'\
2--header 'x-api-key: $MISTRAL_API_KEY'
Request the cancellation of a batch job
1canceled_job = client.batch.jobs.cancel(job_id=created_job.id)
1const canceledJob = await mistral.batch.jobs.cancel({
2 jobId: createdJob.id,
3});
1curl -X POST https://api.mistral.ai/v1/batch/jobs/<jobid>/cancel \
2--header "Authorization: Bearer $MISTRAL_API_KEY"
An end-to-end example
1import argparse
2import json
3import os
4import random
5import time
6from io import BytesIO
7
8import httpx
9from mistralai import File, Mistral
10
11
12def create_client():
13 """
14 Create a Mistral client using the API key from environment variables.
15
16 Returns:
17 Mistral: An instance of the Mistral client.
18 """
19 return Mistral(api_key=os.environ["MISTRAL_API_KEY"])
20
21def generate_random_string(start, end):
22 """
23 Generate a random string of variable length.
24
25 Args:
26 start (int): Minimum length of the string.
27 end (int): Maximum length of the string.
28
29 Returns:
30 str: A randomly generated string.
31 """
32 length = random.randrange(start, end)
33 return ' '.join(random.choices('abcdefghijklmnopqrstuvwxyz', k=length))
34
35def print_stats(batch_job):
36 """
37 Print the statistics of the batch job.
38
39 Args:
40 batch_job: The batch job object containing job statistics.
41 """
42 print(f"Total requests: {batch_job.total_requests}")
43 print(f"Failed requests: {batch_job.failed_requests}")
44 print(f"Successful requests: {batch_job.succeeded_requests}")
45 print(
46 f"Percent done: {round((batch_job.succeeded_requests + batch_job.failed_requests) / batch_job.total_requests, 4) * 100}")
47
48
49def create_input_file(client, num_samples):
50 """
51 Create an input file for the batch job.
52
53 Args:
54 client (Mistral): The Mistral client instance.
55 num_samples (int): Number of samples to generate.
56
57 Returns:
58 File: The uploaded input file object.
59 """
60 buffer = BytesIO()
61 for idx in range(num_samples):
62 request = {
63 "custom_id": str(idx),
64 "body": {
65 "max_tokens": random.randint(10, 1000),
66 "messages": [{"role": "user", "content": generate_random_string(100, 5000)}]
67 }
68 }
69 buffer.write(json.dumps(request).encode("utf-8"))
70 buffer.write("\n".encode("utf-8"))
71 return client.files.upload(file=File(file_name="file.jsonl", content=buffer.getvalue()), purpose="batch")
72
73
74def run_batch_job(client, input_file, model):
75 """
76 Run a batch job using the provided input file and model.
77
78 Args:
79 client (Mistral): The Mistral client instance.
80 input_file (File): The input file object.
81 model (str): The model to use for the batch job.
82
83 Returns:
84 BatchJob: The completed batch job object.
85 """
86 batch_job = client.batch.jobs.create(
87 input_files=[input_file.id],
88 model=model,
89 endpoint="/v1/chat/completions",
90 metadata={"job_type": "testing"}
91 )
92
93 while batch_job.status in ["QUEUED", "RUNNING"]:
94 batch_job = client.batch.jobs.get(job_id=batch_job.id)
95 print_stats(batch_job)
96 time.sleep(1)
97
98 print(f"Batch job {batch_job.id} completed with status: {batch_job.status}")
99 return batch_job
100
101
102def download_file(client, file_id, output_path):
103 """
104 Download a file from the Mistral server.
105
106 Args:
107 client (Mistral): The Mistral client instance.
108 file_id (str): The ID of the file to download.
109 output_path (str): The path where the file will be saved.
110 """
111 if file_id is not None:
112 print(f"Downloading file to {output_path}")
113 output_file = client.files.download(file_id=file_id)
114 with open(output_path, "w") as f:
115 for chunk in output_file.stream:
116 f.write(chunk.decode("utf-8"))
117 print(f"Downloaded file to {output_path}")
118
119
120def main(num_samples, success_path, error_path, model):
121 """
122 Main function to run the batch job.
123
124 Args:
125 num_samples (int): Number of samples to process.
126 success_path (str): Path to save successful outputs.
127 error_path (str): Path to save error outputs.
128 model (str): Model name to use.
129 """
130 client = create_client()
131 input_file = create_input_file(client, num_samples)
132 print(f"Created input file {input_file}")
133
134 batch_job = run_batch_job(client, input_file, model)
135 print(f"Job duration: {batch_job.completed_at - batch_job.created_at} seconds")
136 download_file(client, batch_job.error_file, error_path)
137 download_file(client, batch_job.output_file, success_path)
138
139
140if __name__ == "__main__":
141 parser = argparse.ArgumentParser(description="Run Mistral AI batch job")
142 parser.add_argument("--num_samples", type=int, default=100, help="Number of samples to process")
143 parser.add_argument("--success_path", type=str, default="output.jsonl", help="Path to save successful outputs")
144 parser.add_argument("--error_path", type=str, default="error.jsonl", help="Path to save error outputs")
145 parser.add_argument("--model", type=str, default="codestral-latest", help="Model name to use")
146
147 args = parser.parse_args()
148
149 main(args.num_samples, args.success_path, args.error_path, args.model)
FAQ
Is the batch API available for all models?
Yes, batch API is available for all models including user fine-tuned models.
Does the batch API affect pricing?
We offer a 50% discount on batch API. Learn more about our pricing.
Does the batch API affect rate limits?
No
What's the max number of requests in a batch?
Currently, there is a maximum limit of 1 million pending requests per workspace. This means you cannot submit a job with more than 1 million requests. Additionally, you cannot submit two jobs with 600,000 requests each at the same time. You would need to wait until the first job has processed at least 200,000 requests, reducing its pending count to 400,000. At that point, the new job with 600,000 requests would fit within the limit.
What's the max number of batch jobs one can create?
Currently, there is no maximum limit.
How long does the batch API take to process?
Processing speeds may be adjusted based on current demand and the volume of your request. Your batch results will only be accessible once the entire batch processing is complete.
Users can set timeout_hours
when creating a job, which specifies the number of hours after which the job should expire. It defaults to 24 hours and should be lower than 7 days. A batch will expire if processing does not complete within the specified timeout.
Can I view batch results from my workspace?
Yes, batches are specific to a workspace. You can see all batches and their results that were created within the workspace associated with your API key.
Will batch results ever expire?
No, the results do not expire at this time.
Can batches exceed the spend limit?
Yes, due to high throughput and concurrent processing, batches may slightly exceed your workspace's configured spend limit.