insert()
Insert new documents into the collection. Documents must have unique IDs and conform to the collection schema.
Signature
def insert(self, docs: Union[Doc, list[Doc]]) -> Union[Status, list[Status]]
Parameters
docs
Union[Doc, list[Doc]]
required
One or more documents to insert. Each document must:
- Have a unique ID not already in the collection
- Include all required fields defined in the schema
- Have vectors matching the schema dimensions
Returns
Status
Union[Status, list[Status]]
- If a single
Doc is provided: returns a single Status object
- If a list is provided: returns a
list[Status] with one status per document
Each Status indicates success or failure for that document.
Basic Example
import zvec
from zvec import Doc
# Insert a single document
doc = Doc(
id="doc_001",
vectors={"embedding": [0.1, 0.2, 0.3, 0.4]},
fields={"title": "Getting Started", "category": "tutorial"}
)
status = collection.insert(doc)
if status.ok():
print("Document inserted successfully")
else:
print(f"Insert failed: {status.message()}")
Batch Insertion
For better performance, insert multiple documents at once:
docs = [
Doc(
id=f"doc_{i}",
vectors={"embedding": [0.1 * i, 0.2 * i, 0.3 * i, 0.4 * i]},
fields={"title": f"Document {i}", "score": i * 10}
)
for i in range(1000)
]
statuses = collection.insert(docs)
# Check results
success_count = sum(1 for s in statuses if s.ok())
print(f"Successfully inserted {success_count}/{len(docs)} documents")
Documents must adhere to the collection’s schema:
- Vector fields: Must match the defined dimension and data type
- Scalar fields: Must match the defined data type (INT64, STRING, etc.)
- Nullable fields: Can be omitted or set to
None
- Non-nullable fields: Must be provided
# Schema definition
schema = zvec.CollectionSchema(
name="articles",
vectors=zvec.VectorSchema("embedding", zvec.DataType.VECTOR_FP32, dimension=128),
fields=[
zvec.FieldSchema("title", zvec.DataType.STRING, nullable=False),
zvec.FieldSchema("views", zvec.DataType.INT64, nullable=True)
]
)
# Valid document
doc = Doc(
id="1",
vectors={"embedding": [0.1] * 128},
fields={"title": "Required field", "views": 100} # views can be omitted
)
upsert()
Insert new documents or update existing ones by ID. If a document with the given ID exists, it will be updated; otherwise, it will be inserted.
Signature
def upsert(self, docs: Union[Doc, list[Doc]]) -> Union[Status, list[Status]]
Parameters
docs
Union[Doc, list[Doc]]
required
One or more documents to upsert. Documents with existing IDs will be updated; new IDs will be inserted.
Returns
Status
Union[Status, list[Status]]
- Single
Status if one document was provided
list[Status] if multiple documents were provided
Example
# First insert
doc = Doc(id="user_123", vectors={"profile_emb": [0.1, 0.2]}, fields={"name": "Alice"})
collection.upsert(doc)
# Update the same document (overwrites all fields)
updated_doc = Doc(
id="user_123",
vectors={"profile_emb": [0.3, 0.4]},
fields={"name": "Alice Smith"}
)
collection.upsert(updated_doc)
upsert() replaces the entire document. To update specific fields only, use update() instead.
Error Handling
Common Insert Errors
from zvec import StatusCode
status = collection.insert(doc)
if not status.ok():
if status.code() == StatusCode.ALREADY_EXISTS:
print("Document ID already exists")
elif status.code() == StatusCode.INVALID_ARGUMENT:
print("Invalid document format or schema mismatch")
else:
print(f"Error: {status.message()}")
Handling Batch Failures
docs = [...] # List of documents
statuses = collection.insert(docs)
# Find failed inserts
failed_docs = [
(doc, status) for doc, status in zip(docs, statuses) if not status.ok()
]
if failed_docs:
print(f"{len(failed_docs)} documents failed to insert:")
for doc, status in failed_docs:
print(f" ID {doc.id}: {status.message()}")
Batch your inserts: Inserting 1000 documents in a single call is much faster than 1000 individual calls.
# Good: Batch insertion
docs = [create_doc(i) for i in range(10000)]
collection.insert(docs)
# Bad: Individual insertions
for i in range(10000):
collection.insert(create_doc(i))
Flush periodically: Call collection.flush() after large batches to ensure durability.
for batch_start in range(0, len(all_docs), 1000):
batch = all_docs[batch_start:batch_start + 1000]
collection.insert(batch)
if batch_start % 10000 == 0:
collection.flush() # Flush every 10k documents
See Also