Skip to main content

Overview

Docling is designed for efficient batch processing, supporting:
  • Streaming conversion: Process documents as an iterator without loading everything into memory
  • Parallel processing: Leverage multi-threading for faster throughput
  • Granular error handling: Continue processing even when individual documents fail
  • Resource management: Control memory usage and CPU utilization
This guide shows you how to efficiently process hundreds or thousands of documents.

Basic Batch Conversion

The convert_all() method processes multiple documents and returns an iterator:
from pathlib import Path
from docling.document_converter import DocumentConverter
from docling.datamodel.base_models import ConversionStatus

sources = [
    Path("documents/report_2023.pdf"),
    Path("documents/presentation.pptx"),
    Path("documents/data.xlsx"),
    "https://example.com/whitepaper.pdf",
]

converter = DocumentConverter()

for result in converter.convert_all(sources, raises_on_error=False):
    if result.status == ConversionStatus.SUCCESS:
        print(f"✓ Converted: {result.input.file.name}")
        # Process or save the document
        result.document.save_as_markdown(f"output/{result.input.file.stem}.md")
    else:
        print(f"✗ Failed: {result.input.file.name}")
        for error in result.errors:
            print(f"  Error: {error.error_message}")
convert_all() yields results as they’re ready, not all at once. This allows processing huge document collections without exhausting memory.

Processing Entire Directories

Use glob to find all documents in a directory:
from pathlib import Path
from docling.document_converter import DocumentConverter

input_dir = Path("documents/")
output_dir = Path("output/")
output_dir.mkdir(parents=True, exist_ok=True)

# Find all PDFs recursively
pdf_files = list(input_dir.glob("**/*.pdf"))

print(f"Found {len(pdf_files)} PDF files")

converter = DocumentConverter()

for result in converter.convert_all(pdf_files, raises_on_error=False):
    if result.status == ConversionStatus.SUCCESS:
        # Preserve directory structure in output
        relative_path = result.input.file.relative_to(input_dir)
        output_path = output_dir / relative_path.with_suffix(".md")
        output_path.parent.mkdir(parents=True, exist_ok=True)
        
        result.document.save_as_markdown(output_path)
        print(f"Saved: {output_path}")

Multi-Format Batch Processing

Process mixed document types with format-specific configuration:
from pathlib import Path
from docling.datamodel.base_models import InputFormat, ConversionStatus
from docling.datamodel.pipeline_options import PdfPipelineOptions
from docling.document_converter import (
    DocumentConverter,
    PdfFormatOption,
    WordFormatOption,
    PowerpointFormatOption,
)

input_dir = Path("documents/")

# Collect all supported documents
sources = []
for pattern in ["**/*.pdf", "**/*.docx", "**/*.pptx", "**/*.html"]:
    sources.extend(input_dir.glob(pattern))

print(f"Found {len(sources)} documents")

# Configure PDF-specific options
pdf_options = PdfPipelineOptions()
pdf_options.do_ocr = True
pdf_options.do_table_structure = True

converter = DocumentConverter(
    allowed_formats=[
        InputFormat.PDF,
        InputFormat.DOCX,
        InputFormat.PPTX,
        InputFormat.HTML,
    ],
    format_options={
        InputFormat.PDF: PdfFormatOption(pipeline_options=pdf_options),
        InputFormat.DOCX: WordFormatOption(),
        InputFormat.PPTX: PowerpointFormatOption(),
    },
)

success_count = 0
failure_count = 0

for result in converter.convert_all(sources, raises_on_error=False):
    if result.status == ConversionStatus.SUCCESS:
        success_count += 1
        # Export to multiple formats
        stem = result.input.file.stem
        result.document.save_as_markdown(f"output/{stem}.md")
        result.document.save_as_json(f"output/{stem}.json")
    else:
        failure_count += 1

print(f"\nCompleted: {success_count} successful, {failure_count} failed")

Parallel Processing

Docling automatically parallelizes batch conversions. Control concurrency with environment settings:
from docling.datamodel.settings import settings

# Configure parallel processing
settings.perf.doc_batch_size = 10           # Documents per batch
settings.perf.doc_batch_concurrency = 4     # Parallel workers

# Now run batch conversion
from pathlib import Path
from docling.document_converter import DocumentConverter

input_files = list(Path("documents/").glob("*.pdf"))

converter = DocumentConverter()

for result in converter.convert_all(input_files):
    # Process results
    pass
Higher concurrency increases CPU and memory usage. For resource-constrained environments, use doc_batch_concurrency=1 to process sequentially.

Export Pipeline

Build a complete batch processing pipeline with multiple export formats:
import json
import logging
from pathlib import Path
from collections.abc import Iterable

import yaml
from docling_core.types.doc import ImageRefMode

from docling.datamodel.base_models import ConversionStatus
from docling.datamodel.document import ConversionResult
from docling.document_converter import DocumentConverter

logging.basicConfig(level=logging.INFO)
_log = logging.getLogger(__name__)


def export_documents(
    conv_results: Iterable[ConversionResult],
    output_dir: Path,
):
    """Export batch conversion results to multiple formats."""
    output_dir.mkdir(parents=True, exist_ok=True)

    success_count = 0
    failure_count = 0
    partial_success_count = 0

    for conv_res in conv_results:
        if conv_res.status == ConversionStatus.SUCCESS:
            success_count += 1
            doc_filename = conv_res.input.file.stem

            # Export to multiple formats
            conv_res.document.save_as_json(
                output_dir / f"{doc_filename}.json",
                image_mode=ImageRefMode.PLACEHOLDER,
            )
            conv_res.document.save_as_html(
                output_dir / f"{doc_filename}.html",
                image_mode=ImageRefMode.EMBEDDED,
            )
            conv_res.document.save_as_markdown(
                output_dir / f"{doc_filename}.md",
                image_mode=ImageRefMode.PLACEHOLDER,
            )
            conv_res.document.save_as_doctags(
                output_dir / f"{doc_filename}.doctags.txt"
            )

            # Export as YAML
            with (output_dir / f"{doc_filename}.yaml").open("w") as fp:
                fp.write(yaml.safe_dump(conv_res.document.export_to_dict()))

        elif conv_res.status == ConversionStatus.PARTIAL_SUCCESS:
            _log.warning(
                f"Document {conv_res.input.file} partially converted:"
            )
            for item in conv_res.errors:
                _log.warning(f"  {item.error_message}")
            partial_success_count += 1
        else:
            _log.error(f"Document {conv_res.input.file} failed to convert.")
            failure_count += 1

    _log.info(
        f"Processed {success_count + partial_success_count + failure_count} docs: "
        f"{success_count} successful, {partial_success_count} partial, "
        f"{failure_count} failed"
    )
    return success_count, partial_success_count, failure_count


# Usage
input_doc_paths = [
    Path("documents/doc1.pdf"),
    Path("documents/doc2.pdf"),
    Path("documents/doc3.pdf"),
]

converter = DocumentConverter()

conv_results = converter.convert_all(
    input_doc_paths,
    raises_on_error=False,
)

success, partial, failure = export_documents(
    conv_results, 
    output_dir=Path("output")
)

if failure > 0:
    raise RuntimeError(f"Failed to convert {failure} documents")

Progress Tracking

Track progress with a progress bar:
from pathlib import Path
from tqdm import tqdm
from docling.document_converter import DocumentConverter
from docling.datamodel.base_models import ConversionStatus

input_files = list(Path("documents/").glob("**/*.pdf"))

converter = DocumentConverter()

success = 0
failed = 0

with tqdm(total=len(input_files), desc="Converting documents") as pbar:
    for result in converter.convert_all(input_files, raises_on_error=False):
        if result.status == ConversionStatus.SUCCESS:
            success += 1
            result.document.save_as_markdown(
                f"output/{result.input.file.stem}.md"
            )
        else:
            failed += 1
        
        pbar.update(1)
        pbar.set_postfix({"success": success, "failed": failed})

Memory-Efficient Processing

For very large batches, process in chunks to control memory usage:
from pathlib import Path
from docling.document_converter import DocumentConverter

def process_in_chunks(files: list[Path], chunk_size: int = 50):
    """Process files in chunks to limit memory usage."""
    converter = DocumentConverter()
    
    for i in range(0, len(files), chunk_size):
        chunk = files[i:i + chunk_size]
        print(f"Processing chunk {i//chunk_size + 1}: {len(chunk)} files")
        
        for result in converter.convert_all(chunk, raises_on_error=False):
            if result.status == ConversionStatus.SUCCESS:
                # Process and immediately save/discard
                result.document.save_as_markdown(
                    f"output/{result.input.file.stem}.md"
                )
                # Document is garbage collected after this iteration

input_files = list(Path("documents/").glob("**/*.pdf"))
print(f"Total files: {len(input_files)}")

process_in_chunks(input_files, chunk_size=50)

Error Summary Report

Collect detailed error information for troubleshooting:
import json
from pathlib import Path
from collections import defaultdict
from docling.document_converter import DocumentConverter
from docling.datamodel.base_models import ConversionStatus

input_files = list(Path("documents/").glob("**/*.pdf"))
converter = DocumentConverter()

error_report = defaultdict(list)
success_count = 0

for result in converter.convert_all(input_files, raises_on_error=False):
    if result.status == ConversionStatus.SUCCESS:
        success_count += 1
        result.document.save_as_markdown(
            f"output/{result.input.file.stem}.md"
        )
    else:
        # Collect error details
        error_info = {
            "file": str(result.input.file),
            "status": result.status.value,
            "errors": [
                {
                    "component": err.component_type.value,
                    "module": err.module_name,
                    "message": err.error_message,
                }
                for err in result.errors
            ],
        }
        error_report[result.status.value].append(error_info)

# Save error report
with open("error_report.json", "w") as f:
    json.dump(
        {
            "summary": {
                "total": len(input_files),
                "successful": success_count,
                "failed": len(input_files) - success_count,
            },
            "errors_by_status": dict(error_report),
        },
        f,
        indent=2,
    )

print(f"\nProcessed {len(input_files)} files")
print(f"Successful: {success_count}")
print(f"Failed: {len(input_files) - success_count}")
print(f"Error report saved to error_report.json")

Performance Optimization Tips

1

Disable unnecessary features

Only enable features you need:
from docling.datamodel.pipeline_options import PdfPipelineOptions

pipeline_options = PdfPipelineOptions(
    do_ocr=False,                    # Disable if PDFs have text layer
    do_table_structure=False,        # Disable if not extracting tables
    generate_page_images=False,      # Disable if not needed
    generate_picture_images=False,   # Disable if not needed
)
2

Use appropriate batch settings

Balance throughput and resource usage:
from docling.datamodel.settings import settings

# For CPU-bound workloads
settings.perf.doc_batch_size = 10
settings.perf.doc_batch_concurrency = 4

# For memory-constrained environments
settings.perf.doc_batch_size = 5
settings.perf.doc_batch_concurrency = 2
3

Set document timeouts

Prevent individual documents from blocking the pipeline:
pipeline_options = PdfPipelineOptions(
    document_timeout=120.0  # 2 minutes max per document
)
4

Filter documents upfront

Skip obviously problematic files:
input_files = [
    f for f in Path("documents/").glob("**/*.pdf")
    if f.stat().st_size < 50_000_000  # Skip files > 50MB
    and f.stat().st_size > 0          # Skip empty files
]

Next Steps

Advanced Options

Fine-tune conversion behavior with pipeline and backend options

Export Formats

Learn about all available export formats and their options

PDF Processing

Optimize PDF-specific processing for better batch performance

OCR Configuration

Configure OCR engines for scanned document batches

Build docs developers (and LLMs) love