Atomic Batch Write Operations in Firestore Pydantic ODM
Execute multiple Firestore create, update, and delete operations atomically in one round-trip using the batch_write() class method.
A batch write groups multiple Firestore operations into a single atomic commit. Either every operation in the batch succeeds, or none of them do — there is no partial state. Batch writes also reduce network round-trips: regardless of how many operations you include, only one RPC is sent to Firestore. This makes them ideal for tasks like seeding data, bulk updates, or any workflow where you need a consistent snapshot after multiple mutations.
The BatchOperation enum defines the three operations a batch can perform:
from firestore_pydantic_odm import BatchOperationBatchOperation.CREATE # Insert a new documentBatchOperation.UPDATE # Update fields on an existing documentBatchOperation.DELETE # Remove an existing document
Member
Value
Description
CREATE
"create"
Create a new document. Auto-generates an ID if the instance has none.
UPDATE
"update"
Merge the provided fields into an existing document. Requires an id.
The method commits all operations in a single Firestore batch.commit() call.
UPDATE and DELETE operations require the model instance to have an id set. Passing an instance without an id for these operations will raise a ValueError before the batch is submitted.
Create multiple documents in one call. Instances without an id are assigned auto-generated IDs before the batch is committed, so you can read instance.id immediately after batch_write() returns:
from firestore_pydantic_odm import BatchOperationusers = [ User(name="Alice", email="[email protected]", age=30), User(name="Bob", email="[email protected]", age=25), User(name="Carol", email="[email protected]", age=35),]await User.batch_write([(BatchOperation.CREATE, u) for u in users])# IDs are now set on each instancefor u in users: print(u.id, u.name)
When a CREATE operation is added to the batch with an instance that has no id, the ODM calls collection_ref.document() to pre-allocate a Firestore document reference and assigns its auto-generated ID to model_instance.idbeforebatch.commit() is called. This means IDs are available synchronously in your code as soon as batch_write() returns:
user_a = User(name="AutoID-A", email="[email protected]")user_b = User(name="AutoID-B", email="[email protected]")assert user_a.id is Noneassert user_b.id is Noneawait User.batch_write([ (BatchOperation.CREATE, user_a), (BatchOperation.CREATE, user_b),])assert user_a.id is not None # e.g. "xK3mP9..."assert user_b.id is not None # e.g. "zR7qL2..."assert user_a.id != user_b.id
To batch-write subcollection documents, set the _parent_path private attribute on each instance before passing it to batch_write(). This tells the ODM which parent document path to write under without requiring a live parent instance:
import asynciofrom firestore_pydantic_odm import BatchOperation# Create the parent document firstuser = User(name="BatchParent", email="[email protected]")await user.save()# Build subcollection posts and set their parent pathposts = [ Post(title="Batch Post 1", body="Content 1"), Post(title="Batch Post 2", body="Content 2"),]for post in posts: object.__setattr__(post, "_parent_path", f"users/{user.id}")await Post.batch_write([(BatchOperation.CREATE, p) for p in posts])for post in posts: print(post.id, post.title) # documents live at: users/{user.id}/posts/{post.id}
object.__setattr__ is used because _parent_path is a Pydantic private attribute (PrivateAttr). Direct assignment via post._parent_path = ... is also valid in Pydantic v2, but object.__setattr__ works across both Pydantic v1 and v2.
Firestore enforces a hard limit of 500 operations per batch. If you need to write more than 500 documents, split your operations into chunks:
from itertools import islicedef chunked(iterable, size): it = iter(iterable) while chunk := list(islice(it, size)): yield chunkall_ops = [(BatchOperation.CREATE, User(name=f"User{i}", email=f"u{i}@example.com")) for i in range(1200)]for chunk in chunked(all_ops, 500): await User.batch_write(chunk)
Exceeding 500 operations in a single batch raises an error from the Firestore backend. Always chunk large batches before calling batch_write().
import asynciofrom firestore_pydantic_odm import BaseFirestoreModel, FirestoreDB, BatchOperation, init_firestore_odmclass User(BaseFirestoreModel): class Settings: name = "users" name: str email: str age: int = 0async def main(): db = FirestoreDB(project_id="my-project") init_firestore_odm(db, [User]) # Create three users in one batch new_users = [ User(name="Alice", email="[email protected]", age=30), User(name="Bob", email="[email protected]", age=25), User(name="Carol", email="[email protected]", age=35), ] await User.batch_write([(BatchOperation.CREATE, u) for u in new_users]) print("Created:", [u.id for u in new_users]) # Update one and delete another in the same batch new_users[0].age = 31 await User.batch_write([ (BatchOperation.UPDATE, new_users[0]), (BatchOperation.DELETE, new_users[2]), ]) print("Alice's new age:", new_users[0].age)asyncio.run(main())
Querying
Fetch, filter, and paginate documents with the expressive query API.
Models
Learn how to define models and collection settings.