Skip to main content
MarkItDown seamlessly integrates with LangChain to provide high-quality document loading and processing for RAG systems, question-answering, and document analysis applications.

Why MarkItDown with LangChain?

MarkItDown enhances LangChain workflows by:
  • Converting diverse formats (PDF, DOCX, PPTX, images) to clean, structured Markdown
  • Preserving document structure (headers, tables, lists) for better chunking
  • Providing LLM-optimized output that’s already in a format LLMs understand natively
  • Supporting formats that standard LangChain loaders may struggle with
MarkItDown’s Markdown output is ideal for LangChain because it preserves semantic structure while being highly token-efficient.

Installation

Install MarkItDown and LangChain:
pip install 'markitdown[all]' langchain langchain-openai langchain-community
For vector stores:
pip install chromadb faiss-cpu  # or faiss-gpu

Custom Document Loader

Create a LangChain document loader using MarkItDown:
from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader
from markitdown import MarkItDown
from typing import List
from pathlib import Path

class MarkItDownLoader(BaseLoader):
    """LangChain document loader using MarkItDown."""
    
    def __init__(self, file_path: str, **markitdown_kwargs):
        self.file_path = file_path
        self.md = MarkItDown(**markitdown_kwargs)
    
    def load(self) -> List[Document]:
        """Load document and convert to Markdown."""
        result = self.md.convert(self.file_path)
        
        metadata = {
            "source": self.file_path,
            "file_name": Path(self.file_path).name,
        }
        
        if result.title:
            metadata["title"] = result.title
        
        return [Document(
            page_content=result.text_content,
            metadata=metadata
        )]

# Usage
loader = MarkItDownLoader("document.pdf")
documents = loader.load()
print(documents[0].page_content)

Directory Loader

Load all documents from a directory:
from langchain.docstore.document import Document
from langchain.document_loaders.base import BaseLoader
from markitdown import MarkItDown
from typing import List
from pathlib import Path
import logging

class MarkItDownDirectoryLoader(BaseLoader):
    """Load all documents from a directory using MarkItDown."""
    
    def __init__(
        self,
        path: str,
        glob: str = "**/*",
        show_progress: bool = True,
        **markitdown_kwargs
    ):
        self.path = Path(path)
        self.glob = glob
        self.show_progress = show_progress
        self.md = MarkItDown(**markitdown_kwargs)
    
    def load(self) -> List[Document]:
        """Load all documents from directory."""
        documents = []
        files = list(self.path.glob(self.glob))
        
        for file_path in files:
            if not file_path.is_file():
                continue
            
            try:
                result = self.md.convert(str(file_path))
                
                metadata = {
                    "source": str(file_path),
                    "file_name": file_path.name,
                    "file_type": file_path.suffix,
                }
                
                if result.title:
                    metadata["title"] = result.title
                
                documents.append(Document(
                    page_content=result.text_content,
                    metadata=metadata
                ))
                
                if self.show_progress:
                    print(f"Loaded: {file_path.name}")
                    
            except Exception as e:
                logging.warning(f"Failed to load {file_path}: {e}")
        
        return documents

# Usage
loader = MarkItDownDirectoryLoader(
    "./documents",
    glob="**/*.pdf",
    show_progress=True
)
documents = loader.load()
print(f"Loaded {len(documents)} documents")

RAG Pipeline

Build a complete RAG system with MarkItDown and LangChain:
from langchain.text_splitter import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from markitdown import MarkItDown
from langchain.docstore.document import Document
import os

class MarkItDownRAG:
    """RAG system using MarkItDown for document loading."""
    
    def __init__(self, openai_api_key: str, collection_name: str = "documents"):
        os.environ["OPENAI_API_KEY"] = openai_api_key
        
        self.md = MarkItDown(
            llm_client=None  # Can add OpenAI client for image descriptions
        )
        
        # Initialize embeddings and LLM
        self.embeddings = OpenAIEmbeddings()
        self.llm = ChatOpenAI(model="gpt-4", temperature=0)
        
        # Initialize vector store
        self.vectorstore = Chroma(
            collection_name=collection_name,
            embedding_function=self.embeddings
        )
        
        # Initialize text splitter for Markdown
        self.headers_to_split_on = [
            ("#", "Header 1"),
            ("##", "Header 2"),
            ("###", "Header 3"),
        ]
        
        self.markdown_splitter = MarkdownHeaderTextSplitter(
            headers_to_split_on=self.headers_to_split_on
        )
        
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
    
    def load_document(self, file_path: str) -> List[Document]:
        """Load and split document."""
        # Convert to Markdown
        result = self.md.convert(file_path)
        
        # Split by headers first
        header_splits = self.markdown_splitter.split_text(result.text_content)
        
        # Further split large sections
        final_docs = []
        for doc in header_splits:
            # Add source metadata
            doc.metadata["source"] = file_path
            
            # Split if too large
            if len(doc.page_content) > 1000:
                splits = self.text_splitter.split_documents([doc])
                final_docs.extend(splits)
            else:
                final_docs.append(doc)
        
        return final_docs
    
    def index_documents(self, file_paths: List[str]):
        """Index multiple documents."""
        all_docs = []
        
        for path in file_paths:
            print(f"Processing {path}...")
            docs = self.load_document(path)
            all_docs.extend(docs)
        
        # Add to vector store
        self.vectorstore.add_documents(all_docs)
        print(f"Indexed {len(all_docs)} chunks from {len(file_paths)} documents")
    
    def query(self, question: str, k: int = 4):
        """Query the RAG system."""
        qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",
            retriever=self.vectorstore.as_retriever(search_kwargs={"k": k}),
            return_source_documents=True
        )
        
        result = qa_chain({"query": question})
        
        return {
            "answer": result["result"],
            "sources": [
                doc.metadata.get("source", "Unknown")
                for doc in result["source_documents"]
            ]
        }

# Usage
rag = MarkItDownRAG(openai_api_key="your-api-key")

# Index documents
rag.index_documents([
    "company_policy.pdf",
    "employee_handbook.docx",
    "org_chart.pptx"
])

# Query
result = rag.query("What is the remote work policy?")
print(f"Answer: {result['answer']}")
print(f"Sources: {result['sources']}")

Document Q&A Chain

Build a simple question-answering chain:
from langchain_openai import ChatOpenAI
from langchain.chains.question_answering import load_qa_chain
from langchain.docstore.document import Document
from markitdown import MarkItDown

def document_qa(file_path: str, question: str, api_key: str):
    """Answer questions about a document."""
    # Convert document
    md = MarkItDown()
    result = md.convert(file_path)
    
    # Create document
    doc = Document(
        page_content=result.text_content,
        metadata={"source": file_path}
    )
    
    # Create QA chain
    llm = ChatOpenAI(model="gpt-4", temperature=0, api_key=api_key)
    chain = load_qa_chain(llm, chain_type="stuff")
    
    # Query
    answer = chain.run(input_documents=[doc], question=question)
    return answer

# Usage
answer = document_qa(
    "contract.pdf",
    "What is the termination notice period?",
    "your-api-key"
)
print(answer)

Multi-Document QA with Sources

from langchain.chains import RetrievalQAWithSourcesChain
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.vectorstores import FAISS
from langchain.docstore.document import Document
from markitdown import MarkItDown
from pathlib import Path

def multi_document_qa(directory: str, question: str, api_key: str):
    """Answer questions across multiple documents with source attribution."""
    
    # Load all documents
    md = MarkItDown()
    documents = []
    
    for file_path in Path(directory).rglob("*"):
        if file_path.is_file():
            try:
                result = md.convert(str(file_path))
                doc = Document(
                    page_content=result.text_content,
                    metadata={
                        "source": file_path.name,
                        "path": str(file_path)
                    }
                )
                documents.append(doc)
            except Exception as e:
                print(f"Failed to load {file_path}: {e}")
    
    # Create vector store
    embeddings = OpenAIEmbeddings(api_key=api_key)
    vectorstore = FAISS.from_documents(documents, embeddings)
    
    # Create chain with sources
    llm = ChatOpenAI(model="gpt-4", temperature=0, api_key=api_key)
    chain = RetrievalQAWithSourcesChain.from_chain_type(
        llm=llm,
        chain_type="stuff",
        retriever=vectorstore.as_retriever()
    )
    
    # Query
    result = chain({"question": question})
    
    return {
        "answer": result["answer"],
        "sources": result["sources"]
    }

# Usage
result = multi_document_qa(
    "./company_docs",
    "What are the vacation policies?",
    "your-api-key"
)

print(f"Answer: {result['answer']}")
print(f"\nSources: {result['sources']}")

Conversational Retrieval

Build a chatbot that remembers conversation history:
from langchain.chains import ConversationalRetrievalChain
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.memory import ConversationBufferMemory
from langchain.docstore.document import Document
from markitdown import MarkItDown

class DocumentChatbot:
    """Conversational chatbot for document Q&A."""
    
    def __init__(self, api_key: str):
        self.md = MarkItDown()
        
        # Initialize LLM and embeddings
        self.llm = ChatOpenAI(model="gpt-4", temperature=0, api_key=api_key)
        self.embeddings = OpenAIEmbeddings(api_key=api_key)
        
        # Initialize memory
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True,
            output_key="answer"
        )
        
        self.vectorstore = None
        self.chain = None
    
    def load_document(self, file_path: str):
        """Load a document into the chatbot."""
        # Convert document
        result = self.md.convert(file_path)
        
        # Create document
        doc = Document(
            page_content=result.text_content,
            metadata={"source": file_path}
        )
        
        # Create or update vector store
        if self.vectorstore is None:
            self.vectorstore = Chroma.from_documents(
                [doc],
                self.embeddings
            )
        else:
            self.vectorstore.add_documents([doc])
        
        # Create conversational chain
        self.chain = ConversationalRetrievalChain.from_llm(
            llm=self.llm,
            retriever=self.vectorstore.as_retriever(),
            memory=self.memory,
            return_source_documents=True
        )
        
        print(f"Loaded {file_path}")
    
    def chat(self, message: str):
        """Chat with the document."""
        if self.chain is None:
            return "Please load a document first."
        
        result = self.chain({"question": message})
        return result["answer"]
    
    def reset(self):
        """Reset conversation history."""
        self.memory.clear()

# Usage
chatbot = DocumentChatbot(api_key="your-api-key")
chatbot.load_document("user_manual.pdf")

print(chatbot.chat("What is this document about?"))
print(chatbot.chat("Can you tell me more about the installation process?"))
print(chatbot.chat("What are the system requirements?"))

Summarization Chain

Summarize long documents:
from langchain.chains.summarize import load_summarize_chain
from langchain_openai import ChatOpenAI
from langchain.docstore.document import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from markitdown import MarkItDown

def summarize_document(
    file_path: str,
    api_key: str,
    chain_type: str = "map_reduce"
):
    """Summarize a document."""
    # Convert document
    md = MarkItDown()
    result = md.convert(file_path)
    
    # Split into chunks for long documents
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=4000,
        chunk_overlap=200
    )
    
    docs = text_splitter.create_documents([result.text_content])
    
    # Create summarization chain
    llm = ChatOpenAI(model="gpt-4", temperature=0, api_key=api_key)
    chain = load_summarize_chain(
        llm,
        chain_type=chain_type,
        verbose=True
    )
    
    # Generate summary
    summary = chain.run(docs)
    return summary

# Usage
summary = summarize_document(
    "annual_report.pdf",
    "your-api-key",
    chain_type="map_reduce"  # or "stuff" for short docs, "refine" for detailed summaries
)
print(summary)

With Image Descriptions

Enhance document processing with AI-generated image descriptions:
from markitdown import MarkItDown
from openai import OpenAI
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.docstore.document import Document

def load_with_image_descriptions(file_path: str, api_key: str):
    """Load document with image descriptions."""
    # Initialize MarkItDown with vision support
    openai_client = OpenAI(api_key=api_key)
    md = MarkItDown(
        llm_client=openai_client,
        llm_model="gpt-4o"
    )
    
    # Convert (images will have AI-generated descriptions)
    result = md.convert(file_path)
    
    # Create LangChain document
    doc = Document(
        page_content=result.text_content,
        metadata={"source": file_path}
    )
    
    return doc

# Usage - great for presentations, infographics, etc.
doc = load_with_image_descriptions("presentation.pptx", "your-api-key")
print(doc.page_content)  # Includes image descriptions

Advanced: Custom Chunking Strategy

Preserve document structure during chunking:
from langchain.text_splitter import MarkdownHeaderTextSplitter
from markitdown import MarkItDown
from langchain.docstore.document import Document

def smart_chunk_document(file_path: str):
    """Chunk document while preserving structure."""
    # Convert to Markdown
    md = MarkItDown()
    result = md.convert(file_path)
    
    # Split by headers
    headers_to_split = [
        ("#", "h1"),
        ("##", "h2"),
        ("###", "h3"),
    ]
    
    markdown_splitter = MarkdownHeaderTextSplitter(
        headers_to_split_on=headers_to_split
    )
    
    docs = markdown_splitter.split_text(result.text_content)
    
    # Add source metadata
    for doc in docs:
        doc.metadata["source"] = file_path
        
        # Extract section title from headers
        if "h1" in doc.metadata:
            doc.metadata["section"] = doc.metadata["h1"]
        elif "h2" in doc.metadata:
            doc.metadata["section"] = doc.metadata["h2"]
    
    return docs

# Usage
chunks = smart_chunk_document("documentation.pdf")
for i, chunk in enumerate(chunks):
    print(f"Chunk {i}: {chunk.metadata.get('section', 'No section')}")
    print(f"Length: {len(chunk.page_content)}\n")

Best Practices

Smart Chunking: Use MarkdownHeaderTextSplitter to preserve document structure. MarkItDown’s header preservation makes this highly effective.
Metadata Enrichment: Add source, section titles, and document type to metadata for better retrieval.
Image Support: Enable LLM-powered image descriptions for documents with visual content (presentations, reports with charts).
Error Handling: Wrap conversions in try-except blocks when processing directories with mixed file types.

Complete Example: Production RAG System

from langchain.text_splitter import MarkdownHeaderTextSplitter, RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQAWithSourcesChain
from markitdown import MarkItDown
from openai import OpenAI
from pathlib import Path
import logging

class ProductionRAG:
    def __init__(self, api_key: str, persist_directory: str = "./chroma_db"):
        # Initialize MarkItDown with image support
        openai_client = OpenAI(api_key=api_key)
        self.md = MarkItDown(
            llm_client=openai_client,
            llm_model="gpt-4o"
        )
        
        # Initialize LangChain components
        self.embeddings = OpenAIEmbeddings(api_key=api_key)
        self.llm = ChatOpenAI(model="gpt-4", temperature=0, api_key=api_key)
        
        # Initialize or load vector store
        self.vectorstore = Chroma(
            persist_directory=persist_directory,
            embedding_function=self.embeddings
        )
        
        # Initialize splitters
        self.header_splitter = MarkdownHeaderTextSplitter(
            headers_to_split_on=[("#", "h1"), ("##", "h2"), ("###", "h3")]
        )
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200
        )
    
    def ingest_directory(self, directory: str, glob_pattern: str = "**/*"):
        all_docs = []
        
        for file_path in Path(directory).glob(glob_pattern):
            if not file_path.is_file():
                continue
            
            try:
                # Convert document
                result = self.md.convert(str(file_path))
                
                # Split by headers
                header_docs = self.header_splitter.split_text(result.text_content)
                
                # Further split and add metadata
                for doc in header_docs:
                    doc.metadata.update({
                        "source": str(file_path),
                        "file_name": file_path.name,
                        "file_type": file_path.suffix
                    })
                    
                    if len(doc.page_content) > 1000:
                        splits = self.text_splitter.split_documents([doc])
                        all_docs.extend(splits)
                    else:
                        all_docs.append(doc)
                
                logging.info(f"Processed: {file_path.name}")
                
            except Exception as e:
                logging.error(f"Failed to process {file_path}: {e}")
        
        # Add to vector store
        if all_docs:
            self.vectorstore.add_documents(all_docs)
            logging.info(f"Indexed {len(all_docs)} chunks")
    
    def query(self, question: str, k: int = 4):
        chain = RetrievalQAWithSourcesChain.from_chain_type(
            llm=self.llm,
            retriever=self.vectorstore.as_retriever(search_kwargs={"k": k})
        )
        
        return chain({"question": question})

# Usage
rag = ProductionRAG(api_key="your-api-key")
rag.ingest_directory("./company_knowledge_base")

result = rag.query("What is our data retention policy?")
print(result["answer"])

Resources

Build docs developers (and LLMs) love