Reach out

Command Palette

Search for a command to run...

[Capabilities]

import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem';

Prepare and upload your batch

A batch is composed of a list of API requests. The structure of an individual request includes:

  • A unique custom_id for identifying each request and referencing results after completion
  • A body object with message information

Here's an example of how to structure a batch request:

1{"custom_id": "0", "body": {"max_tokens": 100, "messages": [{"role": "user", "content": "What is the best French cheese?"}]}}
2{"custom_id": "1", "body": {"max_tokens": 100, "messages": [{"role": "user", "content": "What is the best French wine?"}]}}

Save your batch into a .jsonl file. Once saved, you can upload your batch input file to ensure it is correctly referenced when initiating batch processes:

1from mistralai import Mistral
2import os
3
4api_key = os.environ["MISTRAL_API_KEY"]
5
6client = Mistral(api_key=api_key)
7
8batch_data = client.files.upload(
9    file={
10        "file_name": "test.jsonl",
11        "content": open("test.jsonl", "rb")
12    },
13    purpose = "batch"
14)
1import { Mistral } from '@mistralai/mistralai';
2import fs from 'fs';
3
4const apiKey = process.env.MISTRAL_API_KEY;
5
6const client = new Mistral({ apiKey: apiKey });
7
8const batchFile = fs.readFileSync('batch_input_file.jsonl');
9const batchData = await client.files.upload({
10  file: {
11    fileName: 'batch_input_file.jsonl',
12    content: batchFile,
13  },
14  purpose: 'batch',
15});
1curl https://api.mistral.ai/v1/files \
2  -H "Authorization: Bearer $MISTRAL_API_KEY" \
3  -F purpose="batch" \
4  -F file="@batch_input_file.jsonl"

Create a new batch job

Create a new batch job, it will be queued for processing.

  • input_files: a list of the batch input file IDs.
  • model: you can only use one model (e.g., codestral-latest) per batch. However, you can run multiple batches on the same files with different models if you want to compare outputs.
  • endpoint: we currently support /v1/embeddings, /v1/chat/completions, /v1/fim/completions, /v1/moderations, /v1/chat/moderations.
  • metadata: optional custom metadata for the batch.
1import { Mistral } from '@mistralai/mistralai';
2
3const apiKey = process.env.MISTRAL_API_KEY;
4
5const client = new Mistral({ apiKey: apiKey });
6
7const createdJob = await client.batch.jobs.create({
8  inputFiles: [batchData.id],
9  model: 'mistral-small-latest',
10  endpoint: '/v1/chat/completions',
11  metadata: { jobType: 'testing' },
12});
1curl --location "https://api.mistral.ai/v1/batch/jobs" \
2--header "Authorization: Bearer $MISTRAL_API_KEY" \
3--header "Content-Type: application/json" \
4--header "Accept: application/json" \
5--data '{
6    "model": "mistral-small-latest",
7    "input_files": [
8        "<uuid>"
9    ],
10    "endpoint": "/v1/chat/completions",
11    "metadata": {
12        "job_type": "testing"
13    }
14}'

Get a batch job details

1retrieved_job = client.batch.jobs.get(job_id=created_job.id)
1const retrievedJob = await client.batch.jobs.get({ jobId: createdJob.id });
1curl https://api.mistral.ai/v1/batch/jobs/<jobid> \
2--header "Authorization: Bearer $MISTRAL_API_KEY"

Get batch job results

1output_file_stream = client.files.download(file_id=retrieved_job.output_file)
2
3# Write and save the file
4with open('batch_results.jsonl', 'wb') as f:
5    f.write(output_file_stream.read())
1import fs from 'fs';
2
3const outputFileStream = await client.files.download({
4  fileId: retrievedJob.outputFile,
5});
6
7// Write the stream to a file
8const writeStream = fs.createWriteStream('batch_results.jsonl');
9outputFileStream.pipeTo(
10  new WritableStream({
11    write(chunk) {
12      writeStream.write(chunk);
13    },
14    close() {
15      writeStream.end();
16    },
17  })
18);
1curl 'https://api.mistral.ai/v1/files/<uuid>/content' \
2--header "Authorization: Bearer $MISTRAL_API_KEY" \

List batch jobs

You can view a list of your batch jobs and filter them by various criteria, including:

  • Status: QUEUED, RUNNING, SUCCESS, FAILED, TIMEOUT_EXCEEDED, CANCELLATION_REQUESTED and CANCELLED
  • Metadata: custom metadata key and value for the batch
1list_job = client.batch.jobs.list(
2    status="RUNNING",
3    metadata={"job_type": "testing"}
4)
1const listJob = await client.batch.jobs.list({
2  status: 'RUNNING',
3  metadata: {
4    jobType: 'testing',
5  },
6});
1curl 'https://api.mistral.ai/v1/batch/jobs?status=RUNNING&job_type=testing'\
2--header 'x-api-key: $MISTRAL_API_KEY'

Request the cancellation of a batch job

1canceled_job = client.batch.jobs.cancel(job_id=created_job.id)
1const canceledJob = await mistral.batch.jobs.cancel({
2  jobId: createdJob.id,
3});
1curl -X POST https://api.mistral.ai/v1/batch/jobs/<jobid>/cancel \
2--header "Authorization: Bearer $MISTRAL_API_KEY"

An end-to-end example

1import argparse
2import json
3import os
4import random
5import time
6from io import BytesIO
7
8import httpx
9from mistralai import File, Mistral
10
11
12def create_client():
13    """
14    Create a Mistral client using the API key from environment variables.
15
16    Returns:
17        Mistral: An instance of the Mistral client.
18    """
19    return Mistral(api_key=os.environ["MISTRAL_API_KEY"])
20
21def generate_random_string(start, end):
22    """
23    Generate a random string of variable length.
24
25    Args:
26        start (int): Minimum length of the string.
27        end (int): Maximum length of the string.
28
29    Returns:
30        str: A randomly generated string.
31    """
32    length = random.randrange(start, end)
33    return ' '.join(random.choices('abcdefghijklmnopqrstuvwxyz', k=length))
34
35def print_stats(batch_job):
36    """
37    Print the statistics of the batch job.
38
39    Args:
40        batch_job: The batch job object containing job statistics.
41    """
42    print(f"Total requests: {batch_job.total_requests}")
43    print(f"Failed requests: {batch_job.failed_requests}")
44    print(f"Successful requests: {batch_job.succeeded_requests}")
45    print(
46        f"Percent done: {round((batch_job.succeeded_requests + batch_job.failed_requests) / batch_job.total_requests, 4) * 100}")
47
48
49def create_input_file(client, num_samples):
50    """
51    Create an input file for the batch job.
52
53    Args:
54        client (Mistral): The Mistral client instance.
55        num_samples (int): Number of samples to generate.
56
57    Returns:
58        File: The uploaded input file object.
59    """
60    buffer = BytesIO()
61    for idx in range(num_samples):
62        request = {
63            "custom_id": str(idx),
64            "body": {
65                "max_tokens": random.randint(10, 1000),
66                "messages": [{"role": "user", "content": generate_random_string(100, 5000)}]
67            }
68        }
69        buffer.write(json.dumps(request).encode("utf-8"))
70        buffer.write("\n".encode("utf-8"))
71    return client.files.upload(file=File(file_name="file.jsonl", content=buffer.getvalue()), purpose="batch")
72
73
74def run_batch_job(client, input_file, model):
75    """
76    Run a batch job using the provided input file and model.
77
78    Args:
79        client (Mistral): The Mistral client instance.
80        input_file (File): The input file object.
81        model (str): The model to use for the batch job.
82
83    Returns:
84        BatchJob: The completed batch job object.
85    """
86    batch_job = client.batch.jobs.create(
87        input_files=[input_file.id],
88        model=model,
89        endpoint="/v1/chat/completions",
90        metadata={"job_type": "testing"}
91    )
92
93    while batch_job.status in ["QUEUED", "RUNNING"]:
94        batch_job = client.batch.jobs.get(job_id=batch_job.id)
95        print_stats(batch_job)
96        time.sleep(1)
97
98    print(f"Batch job {batch_job.id} completed with status: {batch_job.status}")
99    return batch_job
100
101
102def download_file(client, file_id, output_path):
103    """
104    Download a file from the Mistral server.
105
106    Args:
107        client (Mistral): The Mistral client instance.
108        file_id (str): The ID of the file to download.
109        output_path (str): The path where the file will be saved.
110    """
111    if file_id is not None:
112        print(f"Downloading file to {output_path}")
113        output_file = client.files.download(file_id=file_id)
114        with open(output_path, "w") as f:
115            for chunk in output_file.stream:
116                f.write(chunk.decode("utf-8"))
117        print(f"Downloaded file to {output_path}")
118
119
120def main(num_samples, success_path, error_path, model):
121    """
122    Main function to run the batch job.
123
124    Args:
125        num_samples (int): Number of samples to process.
126        success_path (str): Path to save successful outputs.
127        error_path (str): Path to save error outputs.
128        model (str): Model name to use.
129    """
130    client = create_client()
131    input_file = create_input_file(client, num_samples)
132    print(f"Created input file {input_file}")
133
134    batch_job = run_batch_job(client, input_file, model)
135    print(f"Job duration: {batch_job.completed_at - batch_job.created_at} seconds")
136    download_file(client, batch_job.error_file, error_path)
137    download_file(client, batch_job.output_file, success_path)
138
139
140if __name__ == "__main__":
141    parser = argparse.ArgumentParser(description="Run Mistral AI batch job")
142    parser.add_argument("--num_samples", type=int, default=100, help="Number of samples to process")
143    parser.add_argument("--success_path", type=str, default="output.jsonl", help="Path to save successful outputs")
144    parser.add_argument("--error_path", type=str, default="error.jsonl", help="Path to save error outputs")
145    parser.add_argument("--model", type=str, default="codestral-latest", help="Model name to use")
146
147    args = parser.parse_args()
148
149    main(args.num_samples, args.success_path, args.error_path, args.model)

FAQ

Is the batch API available for all models?

Yes, batch API is available for all models including user fine-tuned models.

Does the batch API affect pricing?

We offer a 50% discount on batch API. Learn more about our pricing.

Does the batch API affect rate limits?

No

What's the max number of requests in a batch?

Currently, there is a maximum limit of 1 million pending requests per workspace. This means you cannot submit a job with more than 1 million requests. Additionally, you cannot submit two jobs with 600,000 requests each at the same time. You would need to wait until the first job has processed at least 200,000 requests, reducing its pending count to 400,000. At that point, the new job with 600,000 requests would fit within the limit.

What's the max number of batch jobs one can create?

Currently, there is no maximum limit.

How long does the batch API take to process?

Processing speeds may be adjusted based on current demand and the volume of your request. Your batch results will only be accessible once the entire batch processing is complete.

Users can set timeout_hours when creating a job, which specifies the number of hours after which the job should expire. It defaults to 24 hours and should be lower than 7 days. A batch will expire if processing does not complete within the specified timeout.

Can I view batch results from my workspace?

Yes, batches are specific to a workspace. You can see all batches and their results that were created within the workspace associated with your API key.

Will batch results ever expire?

No, the results do not expire at this time.

Can batches exceed the spend limit?

Yes, due to high throughput and concurrent processing, batches may slightly exceed your workspace's configured spend limit.