Skip to main content

Overview

Preprocessing transforms raw documents into chunks before extraction and embedding. REMem provides a flexible TextPreprocessor class that supports multiple chunking strategies and custom text preprocessing functions.

Base Preprocessor Interface

All preprocessors inherit from BasePreprocessor (graph/preprocessing/base.py:20-45):
from abc import ABC
from remem.utils.config_utils import BaseConfig

class BasePreprocessor(ABC):
    """Abstract base class for all preprocessors."""
    
    global_config: BaseConfig
    working_dir: str
    
    def __init__(self, working_dir: Optional[str] = None, global_config: Optional[BaseConfig] = None):
        if global_config is None:
            self.global_config = BaseConfig()
        else:
            self.global_config = global_config
        
        if working_dir is None:
            self.working_dir = os.path.join(
                self.global_config.save_dir, 
                f"Preprocessor_{datetime.now().strftime('%Y-%m-%d-%H:%M:%S')}"
            )
        else:
            self.working_dir = working_dir
        
        if not os.path.exists(self.working_dir):
            os.makedirs(self.working_dir, exist_ok=True)

Built-in Chunking Strategies

Configure via BaseConfig.preprocess_chunk_func:
StrategyFunctionUse Case
by_tokenchunk_by_token_count()Fixed token-size chunks with overlap
by_wordchunk_by_word_count()Word-based chunks respecting sentence boundaries
by_messagechunk_by_message_and_token_count()Chat/conversation data
by_sessionchunk_by_session()Session-based grouping by date
noneNo chunkingSingle chunk per document

Example: Token-based Chunking

From text_preprocessing.py:28-63:
def chunk_by_token_count(
    content: str, 
    overlap_token_size=128, 
    max_token_size=1024, 
    encoder=None
) -> List[Dict[str, Any]]:
    """
    Splits the input content into chunks based on token size limits.
    
    Args:
        content: The input text content to be chunked
        overlap_token_size: Number of tokens to overlap between chunks
        max_token_size: Maximum number of tokens per chunk
        encoder: Tokenizer (e.g., tiktoken encoder)
    
    Returns:
        List of chunk dicts with keys: 'num_tokens', 'content', 'chunk_order'
    """
    tokens = encoder.encode(content)
    
    if max_token_size is None:
        return [{"num_tokens": len(tokens), "content": content, "chunk_order": 0}]
    
    results = []
    for index, start in enumerate(range(0, len(tokens), max_token_size - overlap_token_size)):
        chunk_content = encoder.decode(tokens[start : start + max_token_size])
        results.append({
            "num_tokens": min(max_token_size, len(tokens) - start),
            "content": chunk_content,
            "chunk_order": index,
        })
    return results
Configure:
config = BaseConfig(
    preprocess_chunk_func="by_token",
    preprocess_chunk_max_token_size=1024,
    preprocess_chunk_overlap_token_size=128,
    preprocess_encoder_name="gpt-4o",
)

Example: Word-based Chunking

From text_preprocessing.py:66-133:
def chunk_by_word_count(
    content: str, 
    max_words_per_chunk: int = 256, 
    overlap_words: int = 50,
    keep_first_line: bool = True
) -> List[Dict[str, Any]]:
    """
    Splits content into chunks based on word count and sentence boundaries.
    
    Returns:
        List of chunk dicts with keys: 'num_words', 'content', 'chunk_order'
    """
    import nltk
    sentences = nltk.sent_tokenize(content)
    
    chunks = []
    current_chunk = []
    current_word_count = 0
    first_line = content.split("\n")[0]
    
    for sentence in sentences:
        words_in_sentence = sentence.split()
        word_count = len(words_in_sentence)
        
        if current_word_count + word_count > max_words_per_chunk:
            # Finalize current chunk
            chunks.append({
                "num_words": current_word_count,
                "content": " ".join(current_chunk),
                "chunk_order": len(chunks)
            })
            # Start new chunk
            current_chunk = words_in_sentence
            current_word_count = word_count
        else:
            current_chunk.extend(words_in_sentence)
            current_word_count += word_count
    
    # Add last chunk
    if current_chunk:
        chunks.append({
            "num_words": current_word_count,
            "content": " ".join(current_chunk),
            "chunk_order": len(chunks)
        })
    
    # Apply overlap and keep first line if requested
    # ... (see source for full implementation)
    
    return chunks

TextPreprocessor Class

The main preprocessor (text_preprocessing.py:207-353):
class TextPreprocessor(BasePreprocessor):
    def __init__(
        self,
        working_dir: Optional[str] = None,
        global_config: Optional[BaseConfig] = None,
        text_preprocessing_func: Any = None,
    ):
        super().__init__(working_dir, global_config)
        
        # Select chunking function based on config
        if self.global_config.preprocess_chunk_func == "by_word":
            self.chunking_func = partial(chunk_by_word_count)
        elif self.global_config.preprocess_chunk_func == "by_token":
            self.encoder = _get_text_encoder(self.global_config.preprocess_encoder_name)
            self.chunking_func = partial(
                chunk_by_token_count,
                overlap_token_size=self.global_config.preprocess_chunk_overlap_token_size,
                max_token_size=self.global_config.preprocess_chunk_max_token_size,
                encoder=self.encoder,
            )
        elif self.global_config.preprocess_chunk_func == "by_message":
            self.chunking_func = partial(chunk_by_message_and_token_count)
        # ... more options
        
        # Set text preprocessing function
        if text_preprocessing_func is None:
            self.text_preprocessing_func = remem_text_processing
        else:
            self.text_preprocessing_func = text_preprocessing_func
    
    def preprocess_doc(self, input: str) -> List[Dict[str, Any]]:
        """Preprocess a single document."""
        results = self.batch_preprocess_doc([input])
        return results[0]
    
    def batch_preprocess_doc(self, input: List[str]) -> List[List[Dict[str, Any]]]:
        """Preprocess a batch of documents."""
        results = []
        for doc in input:
            doc_chunks = self.chunking_func(doc)
            results.append(doc_chunks)
        return results

Creating Custom Chunking Strategies

1. Define Your Chunking Function

# my_custom_chunking.py
from typing import Any, Dict, List

def chunk_by_paragraph(
    content: str,
    min_words_per_chunk: int = 100,
    **kwargs
) -> List[Dict[str, Any]]:
    """
    Split content by paragraphs, ensuring minimum word count.
    
    Returns:
        List of chunk dicts with 'content' and 'chunk_order' keys
    """
    paragraphs = content.split("\n\n")
    chunks = []
    current_chunk = []
    current_word_count = 0
    
    for para in paragraphs:
        words = para.split()
        word_count = len(words)
        
        if current_word_count + word_count >= min_words_per_chunk:
            # Finalize current chunk
            chunks.append({
                "content": "\n\n".join(current_chunk),
                "num_words": current_word_count,
                "chunk_order": len(chunks)
            })
            current_chunk = [para]
            current_word_count = word_count
        else:
            current_chunk.append(para)
            current_word_count += word_count
    
    # Add remaining paragraphs
    if current_chunk:
        chunks.append({
            "content": "\n\n".join(current_chunk),
            "num_words": current_word_count,
            "chunk_order": len(chunks)
        })
    
    return chunks

2. Integrate into TextPreprocessor

Extend the TextPreprocessor class:
from remem.graph.preprocessing import TextPreprocessor
from functools import partial
from my_custom_chunking import chunk_by_paragraph

class CustomPreprocessor(TextPreprocessor):
    def __init__(self, working_dir=None, global_config=None, text_preprocessing_func=None):
        super().__init__(working_dir, global_config, text_preprocessing_func)
        
        # Override chunking function
        if self.global_config.preprocess_chunk_func == "by_paragraph":
            self.chunking_func = partial(
                chunk_by_paragraph,
                min_words_per_chunk=200
            )

3. Use Your Custom Preprocessor

from remem.remem import ReMem
from my_preprocessor import CustomPreprocessor

config = BaseConfig(
    preprocess_chunk_func="by_paragraph",
    dataset="test"
)

rag = ReMem(global_config=config)
# Replace default preprocessor
rag.preprocessor = CustomPreprocessor(global_config=config)

docs = ["Paragraph 1\n\nParagraph 2\n\nParagraph 3"]
rag.index(docs)

Custom Text Preprocessing Functions

The default function (text_preprocessing.py:201-205):
def remem_text_processing(text):
    """Normalize text: lowercase, remove special chars."""
    if not isinstance(text, str):
        text = str(text)
    return re.sub("[^A-Za-z0-9 ]", " ", text.lower()).strip()

Define Custom Normalization

import unicodedata
import re

def custom_text_processing(text: str) -> str:
    """Custom normalization: preserve case, keep hyphens."""
    if not isinstance(text, str):
        text = str(text)
    
    # Normalize unicode characters
    text = unicodedata.normalize('NFKD', text)
    
    # Keep alphanumeric, spaces, and hyphens
    text = re.sub(r"[^A-Za-z0-9\s-]", " ", text)
    
    # Collapse multiple spaces
    text = re.sub(r"\s+", " ", text).strip()
    
    return text

# Use it
preprocessor = TextPreprocessor(
    global_config=config,
    text_preprocessing_func=custom_text_processing
)

Async Preprocessing

For large-scale processing:
async def apreprocess_doc(self, input: str) -> List[Dict[str, Any]]:
    """Async preprocessing for a single document."""
    doc_chunks = self.chunking_func(content=input)
    return doc_chunks

async def abatch_preprocess_doc(self, input: List[str]) -> List[List[Dict[str, Any]]]:
    """Async batch preprocessing."""
    results = await asyncio.gather(
        *[self.apreprocess_doc(input=doc) for doc in input],
        return_exceptions=False
    )
    return results

Configuration Reference

Token-based chunking:
BaseConfig(
    preprocess_chunk_func="by_token",
    preprocess_chunk_max_token_size=1024,
    preprocess_chunk_overlap_token_size=128,
    preprocess_encoder_name="gpt-4o",
)
Word-based chunking:
BaseConfig(
    preprocess_chunk_func="by_word",
    # max_words_per_chunk and overlap_words set in chunking function
)
No chunking:
BaseConfig(
    preprocess_chunk_func="none",
)

Next Steps

Build docs developers (and LLMs) love