The Document Processor API enables custom processing of documents and document updates in a processing pipeline. Document processors are chained together to transform, validate, and enrich documents before they are indexed.
Overview
Document processors are asynchronous components that:
Process documents in feed operations (put, update, remove)
Chain together to form processing pipelines
Support async operations by returning LATER to resume processing
Transform data before indexing
Validate documents and reject invalid input
Source: docproc/src/main/java/com/yahoo/docproc/DocumentProcessor.java:15
Basic Document Processor
Every document processor extends DocumentProcessor and implements the process method:
package com.example;
import com.yahoo.docproc.DocumentProcessor;
import com.yahoo.docproc.Processing;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentOperation;
import com.yahoo.document.DocumentPut;
public class SimpleDocumentProcessor extends DocumentProcessor {
@ Override
public Progress process ( Processing processing ) {
for ( DocumentOperation op : processing . getDocumentOperations ()) {
if (op instanceof DocumentPut) {
Document doc = ((DocumentPut) op). getDocument ();
// Process the document
processDocument (doc);
}
}
return Progress . DONE ;
}
private void processDocument ( Document doc ) {
// Your processing logic here
}
}
Source: docproc/src/main/java/com/yahoo/docproc/DocumentProcessor.java:73
Processing Return Values
Document processors return a Progress value indicating the processing state:
DONE
LATER
FAILED
PERMANENT_FAILURE
INVALID_INPUT
OVERLOAD
// Processing is complete, move to next processor
return Progress . DONE ;
Source: docproc/src/main/java/com/yahoo/docproc/DocumentProcessor.java:107
Document Processing Examples
Transform document fields:
import com.yahoo.docproc.DocumentProcessor;
import com.yahoo.docproc.Processing;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentOperation;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.datatypes.StringFieldValue;
public class Rot13DocumentProcessor extends DocumentProcessor {
private static final String FIELD_NAME = "title" ;
@ Override
public Progress process ( Processing processing ) {
for ( DocumentOperation op : processing . getDocumentOperations ()) {
if (op instanceof DocumentPut) {
Document document = ((DocumentPut) op). getDocument ();
StringFieldValue oldTitle =
(StringFieldValue) document . getFieldValue (FIELD_NAME);
if (oldTitle != null ) {
document . setFieldValue (FIELD_NAME, rot13 ( oldTitle . getString ()));
}
}
}
return Progress . DONE ;
}
private String rot13 ( String s ) {
StringBuilder output = new StringBuilder ();
for ( int i = 0 ; i < s . length (); i ++ ) {
char c = s . charAt (i);
if (c >= 'a' && c <= 'm' || c >= 'A' && c <= 'M' ) {
c += 13 ;
} else if (c >= 'n' && c <= 'z' || c >= 'N' && c <= 'Z' ) {
c -= 13 ;
}
output . append (c);
}
return output . toString ();
}
}
Example from: application/src/test/java/com/yahoo/application/container/docprocs/Rot13DocumentProcessor.java:16
Document Validation
Validate documents and reject invalid ones:
import com.yahoo.docproc.DocumentProcessor;
import com.yahoo.docproc.Processing;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentOperation;
import com.yahoo.document.DocumentPut;
public class ValidationProcessor extends DocumentProcessor {
@ Override
public Progress process ( Processing processing ) {
for ( DocumentOperation op : processing . getDocumentOperations ()) {
if (op instanceof DocumentPut) {
Document doc = ((DocumentPut) op). getDocument ();
// Validate required field
if ( doc . getFieldValue ( "title" ) == null ) {
return Progress . INVALID_INPUT
. withReason ( "Missing required field: title" );
}
// Validate field constraints
String category = (String) doc . getFieldValue ( "category" );
if (category != null && ! isValidCategory (category)) {
return Progress . INVALID_INPUT
. withReason ( "Invalid category: " + category);
}
}
}
return Progress . DONE ;
}
private boolean isValidCategory ( String category ) {
return category . matches ( "^[a-z-]+$" );
}
}
Document Enrichment
Add fields based on external data:
import com.yahoo.docproc.DocumentProcessor;
import com.yahoo.docproc.Processing;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentOperation;
import com.yahoo.document.DocumentPut;
public class EnrichmentProcessor extends DocumentProcessor {
private final ExternalService externalService ;
@ Inject
public EnrichmentProcessor ( ExternalService externalService ) {
this . externalService = externalService;
}
@ Override
public Progress process ( Processing processing ) {
for ( DocumentOperation op : processing . getDocumentOperations ()) {
if (op instanceof DocumentPut) {
Document doc = ((DocumentPut) op). getDocument ();
// Fetch additional data
String id = doc . getId (). toString ();
String enrichedData = externalService . lookup (id);
// Add to document
doc . setFieldValue ( "enriched_field" , enrichedData);
}
}
return Progress . DONE ;
}
}
Asynchronous Processing
Handle async operations with LATER:
import com.yahoo.docproc.DocumentProcessor;
import com.yahoo.docproc.Processing;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentOperation;
import com.yahoo.document.DocumentPut;
import java.util.concurrent.CompletableFuture;
public class AsyncProcessor extends DocumentProcessor {
private static final String FUTURE_KEY = "asyncFuture" ;
@ Override
public Progress process ( Processing processing ) {
// Check if we have a pending async operation
CompletableFuture < String > future =
( CompletableFuture < String > ) processing . getVariable (FUTURE_KEY);
if (future == null ) {
// First call - start async operation
for ( DocumentOperation op : processing . getDocumentOperations ()) {
if (op instanceof DocumentPut) {
Document doc = ((DocumentPut) op). getDocument ();
String id = doc . getId (). toString ();
// Start async fetch
future = fetchDataAsync (id);
processing . setVariable (FUTURE_KEY, future);
// Return LATER - we'll be called again
return Progress . later ( 50 );
}
}
} else if ( future . isDone ()) {
// Async operation complete - apply result
try {
String data = future . get ();
for ( DocumentOperation op : processing . getDocumentOperations ()) {
if (op instanceof DocumentPut) {
Document doc = ((DocumentPut) op). getDocument ();
doc . setFieldValue ( "async_data" , data);
}
}
processing . removeVariable (FUTURE_KEY);
return Progress . DONE ;
} catch ( Exception e ) {
return Progress . FAILED . withReason ( "Async fetch failed: " + e . getMessage ());
}
} else {
// Still waiting - return LATER again
return Progress . later ( 50 );
}
return Progress . DONE ;
}
private CompletableFuture < String > fetchDataAsync ( String id ) {
return CompletableFuture . supplyAsync (() -> {
// Simulate async operation
return "data-for-" + id;
});
}
}
Source: docproc/src/main/java/com/yahoo/docproc/DocumentProcessor.java:194
Processing Context
Use the Processing object to share data between calls:
@ Override
public Progress process ( Processing processing) {
// Get context variable
Integer counter = (Integer) processing . getVariable ( "counter" );
if (counter == null ) {
counter = 0 ;
}
// Set context variable
processing . setVariable ( "counter" , counter + 1 );
// Remove variable when done
processing . removeVariable ( "counter" );
// Check if variable exists
if ( processing . hasVariable ( "someKey" )) {
// ...
}
return Progress . DONE ;
}
Source: docproc/src/main/java/com/yahoo/docproc/Processing.java:140
Working with Document Operations
Document processors handle different operation types:
import com.yahoo.document.DocumentOperation;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentUpdate;
import com.yahoo.document.DocumentRemove;
@ Override
public Progress process ( Processing processing) {
for ( DocumentOperation op : processing . getDocumentOperations ()) {
if (op instanceof DocumentPut) {
// Handle document put (new or full update)
DocumentPut put = (DocumentPut) op;
Document doc = put . getDocument ();
processDocument (doc);
} else if (op instanceof DocumentUpdate) {
// Handle partial document update
DocumentUpdate update = (DocumentUpdate) op;
processUpdate (update);
} else if (op instanceof DocumentRemove) {
// Handle document removal
DocumentRemove remove = (DocumentRemove) op;
processRemove (remove);
}
}
return Progress . DONE ;
}
Source: docproc/src/main/java/com/yahoo/docproc/Processing.java:201
Configuration
Document processors can receive configuration through dependency injection:
import com.yahoo.docproc.DocumentProcessor;
import com.yahoo.docproc.Processing;
public class ConfiguredProcessor extends DocumentProcessor {
private final String apiEndpoint ;
private final int maxRetries ;
@ Inject
public ConfiguredProcessor ( MyProcessorConfig config ) {
this . apiEndpoint = config . apiEndpoint ();
this . maxRetries = config . maxRetries ();
}
@ Override
public Progress process ( Processing processing ) {
// Use configuration
return Progress . DONE ;
}
}
Timeout Handling
Check remaining time to avoid timeout:
import java.time.Duration;
@ Override
public Progress process ( Processing processing) {
// Check time remaining
Duration timeLeft = processing . timeLeft ();
if ( timeLeft . compareTo ( Duration . ofMillis ( 100 )) < 0 ) {
// Not enough time for processing
return Progress . TIMEOUT . withReason ( "Insufficient time remaining" );
}
// Check if processing has no timeout
if ( timeLeft . equals ( Processing . NO_TIMEOUT )) {
// No timeout set
}
// Do processing
return Progress . DONE ;
}
Source: docproc/src/main/java/com/yahoo/docproc/Processing.java:232
Thread Safety
Document processors are called by multiple threads concurrently. All mutable shared state must be thread-safe.
public class ThreadSafeProcessor extends DocumentProcessor {
// Safe: Immutable configuration built in constructor
private final Map < String , String > categoryMapping ;
// Unsafe: Mutable shared state
private int processedCount = 0 ; // DON'T DO THIS
public ThreadSafeProcessor ( ProcessorConfig config ) {
Map < String , String > temp = new HashMap <>();
for ( var mapping : config . categoryMappings ()) {
temp . put ( mapping . from (), mapping . to ());
}
this . categoryMapping = Collections . unmodifiableMap (temp);
}
@ Override
public Progress process ( Processing processing ) {
// Safe: Read-only access to immutable data
String mapped = categoryMapping . get ( "someKey" );
// Safe: Processing context is per-request
processing . setVariable ( "localCounter" , 1 );
return Progress . DONE ;
}
}
Source: docproc/src/main/java/com/yahoo/docproc/DocumentProcessor.java:37
Error Handling
Use Progress.FAILED for temporary failures: @ Override
public Progress process ( Processing processing) {
try {
callExternalService ();
return Progress . DONE ;
} catch ( TemporaryException e ) {
return Progress . FAILED . withReason ( "Service unavailable: " + e . getMessage ());
}
}
Use Progress.INVALID_INPUT for bad input: @ Override
public Progress process ( Processing processing) {
if ( ! isValid (processing)) {
return Progress . INVALID_INPUT . withReason ( "Document validation failed" );
}
return Progress . DONE ;
}
Use Progress.OVERLOAD for rate limiting: @ Override
public Progress process ( Processing processing) {
if ( rateLimiter . isOverloaded ()) {
return Progress . OVERLOAD . withReason ( "Embedder service overloaded" );
}
return Progress . DONE ;
}
Use Progress.PERMANENT_FAILURE for unrecoverable errors: @ Override
public Progress process ( Processing processing) {
if ( ! isInitialized) {
return Progress . PERMANENT_FAILURE . withReason ( "Failed to initialize" );
}
return Progress . DONE ;
}
Best Practices
Document processors are in the critical path for feeding. Keep processing time minimal: // Good - fast lookup
@ Override
public Progress process ( Processing processing) {
String value = inMemoryCache . get (key);
return Progress . DONE ;
}
// Bad - slow blocking operation
@ Override
public Progress process ( Processing processing) {
String value = slowDatabaseCall (key); // Blocks feed pipeline!
return Progress . DONE ;
}
Use LATER for Async Operations
For long-running operations, use Progress.LATER: @ Override
public Progress process ( Processing processing) {
CompletableFuture < Data > future = getOrStartAsync (processing);
if ( ! future . isDone ()) {
return Progress . later ( 50 ); // Check again in 50ms
}
applyResult ( future . get (), processing);
return Progress . DONE ;
}
Handle All Operation Types
Don’t assume only DocumentPut operations: @ Override
public Progress process ( Processing processing) {
for ( DocumentOperation op : processing . getDocumentOperations ()) {
if (op instanceof DocumentPut) {
// Handle put
} else if (op instanceof DocumentUpdate) {
// Handle update
} else if (op instanceof DocumentRemove) {
// Handle remove (or skip if not applicable)
}
}
return Progress . DONE ;
}
Clean Up Context Variables
Remove context variables when done: @ Override
public Progress process ( Processing processing) {
try {
processing . setVariable ( "tempData" , someData);
// Process...
return Progress . DONE ;
} finally {
processing . removeVariable ( "tempData" );
}
}
Testing Document Processors
import com.yahoo.docproc.Processing;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentType;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions. * ;
class MyProcessorTest {
@ Test
void testProcessing () {
MyProcessor processor = new MyProcessor ();
// Create test document
DocumentType docType = new DocumentType ( "testdoc" );
Document doc = new Document (docType, "id:test:testdoc::1" );
doc . setFieldValue ( "title" , "Test Title" );
// Create processing
Processing processing = Processing . of ( new DocumentPut (doc));
// Process
DocumentProcessor . Progress result = processor . process (processing);
// Assert
assertEquals ( DocumentProcessor . Progress . DONE , result);
assertEquals ( "Modified Title" ,
doc . getFieldValue ( "title" ). toString ());
}
}