Skip to main content
A batch write groups multiple Firestore operations into a single atomic commit. Either every operation in the batch succeeds, or none of them do — there is no partial state. Batch writes also reduce network round-trips: regardless of how many operations you include, only one RPC is sent to Firestore. This makes them ideal for tasks like seeding data, bulk updates, or any workflow where you need a consistent snapshot after multiple mutations.

The BatchOperation Enum

The BatchOperation enum defines the three operations a batch can perform:
from firestore_pydantic_odm import BatchOperation

BatchOperation.CREATE   # Insert a new document
BatchOperation.UPDATE   # Update fields on an existing document
BatchOperation.DELETE   # Remove an existing document
MemberValueDescription
CREATE"create"Create a new document. Auto-generates an ID if the instance has none.
UPDATE"update"Merge the provided fields into an existing document. Requires an id.
DELETE"delete"Delete the document. Requires an id.

batch_write() — Executing the Batch

batch_write() is a class method on any model that inherits BaseFirestoreModel. Pass a list of (BatchOperation, model_instance) tuples:
await User.batch_write([
    (BatchOperation.CREATE, user_a),
    (BatchOperation.UPDATE, user_b),
    (BatchOperation.DELETE, user_c),
])
The method commits all operations in a single Firestore batch.commit() call.
UPDATE and DELETE operations require the model instance to have an id set. Passing an instance without an id for these operations will raise a ValueError before the batch is submitted.

Batch Create

Create multiple documents in one call. Instances without an id are assigned auto-generated IDs before the batch is committed, so you can read instance.id immediately after batch_write() returns:
from firestore_pydantic_odm import BatchOperation

users = [
    User(name="Alice", email="[email protected]", age=30),
    User(name="Bob",   email="[email protected]",   age=25),
    User(name="Carol", email="[email protected]",  age=35),
]

await User.batch_write([(BatchOperation.CREATE, u) for u in users])

# IDs are now set on each instance
for u in users:
    print(u.id, u.name)

Batch Update

Update multiple documents atomically. Each instance must already have an id:
# Fetch documents to update
alice = await User.find_one(filters=[User.name == "Alice"])
bob   = await User.find_one(filters=[User.name == "Bob"])

# Mutate in memory
alice.email = "[email protected]"
bob.age = 26

await User.batch_write([
    (BatchOperation.UPDATE, alice),
    (BatchOperation.UPDATE, bob),
])

Batch Delete

Delete multiple documents in one atomic operation. Each instance must have an id:
users_to_remove = [alice, bob, carol]  # instances with ids set

await User.batch_write([
    (BatchOperation.DELETE, u) for u in users_to_remove
])

Mixed Operations in One Batch

You can combine CREATE, UPDATE, and DELETE operations in a single batch call. All three happen atomically:
from firestore_pydantic_odm import BatchOperation

# Pre-fetch existing documents
to_update = await User.find_one(filters=[User.name == "WillUpdate"])
to_delete = await User.find_one(filters=[User.name == "WillDelete"])

# Mutate the update target
to_update.name = "WasUpdated"

# New document to create
to_create = User(name="NewUser", email="[email protected]")

await User.batch_write([
    (BatchOperation.CREATE, to_create),
    (BatchOperation.UPDATE, to_update),
    (BatchOperation.DELETE, to_delete),
])

print(to_create.id)   # auto-assigned ID
print(to_update.id)   # unchanged

Auto-ID Assignment

When a CREATE operation is added to the batch with an instance that has no id, the ODM calls collection_ref.document() to pre-allocate a Firestore document reference and assigns its auto-generated ID to model_instance.id before batch.commit() is called. This means IDs are available synchronously in your code as soon as batch_write() returns:
user_a = User(name="AutoID-A", email="[email protected]")
user_b = User(name="AutoID-B", email="[email protected]")

assert user_a.id is None
assert user_b.id is None

await User.batch_write([
    (BatchOperation.CREATE, user_a),
    (BatchOperation.CREATE, user_b),
])

assert user_a.id is not None  # e.g. "xK3mP9..."
assert user_b.id is not None  # e.g. "zR7qL2..."
assert user_a.id != user_b.id

Batch with Subcollections

To batch-write subcollection documents, set the _parent_path private attribute on each instance before passing it to batch_write(). This tells the ODM which parent document path to write under without requiring a live parent instance:
import asyncio
from firestore_pydantic_odm import BatchOperation

# Create the parent document first
user = User(name="BatchParent", email="[email protected]")
await user.save()

# Build subcollection posts and set their parent path
posts = [
    Post(title="Batch Post 1", body="Content 1"),
    Post(title="Batch Post 2", body="Content 2"),
]
for post in posts:
    object.__setattr__(post, "_parent_path", f"users/{user.id}")

await Post.batch_write([(BatchOperation.CREATE, p) for p in posts])

for post in posts:
    print(post.id, post.title)
    # documents live at: users/{user.id}/posts/{post.id}
object.__setattr__ is used because _parent_path is a Pydantic private attribute (PrivateAttr). Direct assignment via post._parent_path = ... is also valid in Pydantic v2, but object.__setattr__ works across both Pydantic v1 and v2.

Firestore’s 500-Operation Limit

Firestore enforces a hard limit of 500 operations per batch. If you need to write more than 500 documents, split your operations into chunks:
from itertools import islice

def chunked(iterable, size):
    it = iter(iterable)
    while chunk := list(islice(it, size)):
        yield chunk

all_ops = [(BatchOperation.CREATE, User(name=f"User{i}", email=f"u{i}@example.com")) for i in range(1200)]

for chunk in chunked(all_ops, 500):
    await User.batch_write(chunk)
Exceeding 500 operations in a single batch raises an error from the Firestore backend. Always chunk large batches before calling batch_write().

Complete Batch Example

import asyncio
from firestore_pydantic_odm import BaseFirestoreModel, FirestoreDB, BatchOperation, init_firestore_odm


class User(BaseFirestoreModel):
    class Settings:
        name = "users"

    name: str
    email: str
    age: int = 0


async def main():
    db = FirestoreDB(project_id="my-project")
    init_firestore_odm(db, [User])

    # Create three users in one batch
    new_users = [
        User(name="Alice", email="[email protected]", age=30),
        User(name="Bob",   email="[email protected]",   age=25),
        User(name="Carol", email="[email protected]",  age=35),
    ]
    await User.batch_write([(BatchOperation.CREATE, u) for u in new_users])
    print("Created:", [u.id for u in new_users])

    # Update one and delete another in the same batch
    new_users[0].age = 31
    await User.batch_write([
        (BatchOperation.UPDATE, new_users[0]),
        (BatchOperation.DELETE, new_users[2]),
    ])
    print("Alice's new age:", new_users[0].age)


asyncio.run(main())

Querying

Fetch, filter, and paginate documents with the expressive query API.

Models

Learn how to define models and collection settings.