Document processors transform documents and updates as they flow through the document processing chain during feeding. They enable validation, enrichment, transformation, and custom processing logic before documents reach the content nodes.
Overview
Document processors can:
Validate document structure and content
Enrich documents with computed fields or external data
Transform document structure or field values
Filter or reject invalid documents
Route documents to different chains based on content
Basic DocumentProcessor
All document processors extend com.yahoo.docproc.DocumentProcessor:
package com.example;
import com.yahoo.docproc.DocumentProcessor;
import com.yahoo.docproc.Processing;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentOperation;
public class MyDocumentProcessor extends DocumentProcessor {
@ Override
public Progress process ( Processing processing ) {
for ( DocumentOperation op : processing . getDocumentOperations ()) {
if (op instanceof DocumentPut put) {
// Process the document
put . getDocument (). setFieldValue ( "processed" , true );
}
}
return Progress . DONE ;
}
}
Progress States
Document processors return a Progress value to control processing flow:
Progress.DONE
Progress.LATER
Progress.FAILED
Progress.INVALID_INPUT
// Processing completed successfully
return Progress . DONE ;
Additional Progress States
Progress.PERMANENT_FAILURE : Disable the service until reconfigured
Progress.OVERLOAD : Operation rejected due to rate limiting (returns 429)
Progress.TIMEOUT : Processing timed out (non-retryable)
SimpleDocumentProcessor
For straightforward use cases, extend SimpleDocumentProcessor to handle operations individually:
import com.yahoo.docproc.SimpleDocumentProcessor;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentUpdate;
import com.yahoo.document.DocumentRemove;
public class SimpleProcessor extends SimpleDocumentProcessor {
@ Override
public void process ( DocumentPut put ) {
var doc = put . getDocument ();
// Enrich document
doc . setFieldValue ( "timestamp" , System . currentTimeMillis ());
doc . setFieldValue ( "processor" , "simple" );
}
@ Override
public void process ( DocumentUpdate update ) {
// Handle updates
getLogger (). info ( "Processing update: " + update . getId ());
}
@ Override
public void process ( DocumentRemove remove ) {
// Handle removes
getLogger (). info ( "Processing remove: " + remove . getId ());
}
}
SimpleDocumentProcessor cannot return Progress.LATER for asynchronous processing. For async operations, extend DocumentProcessor directly.
Document Processing Patterns
Field Validation
import com.yahoo.document.Document;
import com.yahoo.document.datatypes.StringFieldValue;
public class ValidationProcessor extends DocumentProcessor {
@ Override
public Progress process ( Processing processing ) {
for ( DocumentOperation op : processing . getDocumentOperations ()) {
if (op instanceof DocumentPut put) {
Document doc = put . getDocument ();
// Validate required fields
if ( doc . getFieldValue ( "title" ) == null ) {
return Progress . INVALID_INPUT
. withReason ( "Missing required field: title" );
}
// Validate field format
StringFieldValue email =
(StringFieldValue) doc . getFieldValue ( "email" );
if (email != null && ! isValidEmail ( email . getString ())) {
return Progress . INVALID_INPUT
. withReason ( "Invalid email format" );
}
}
}
return Progress . DONE ;
}
private boolean isValidEmail ( String email ) {
return email . matches ( "^[A-Za-z0-9+_.-]+@(.+)$" );
}
}
Document Enrichment
import com.yahoo.document.datatypes.Array;
import com.yahoo.document.datatypes.StringFieldValue;
import java.util.List;
public class EnrichmentProcessor extends DocumentProcessor {
private final ExternalService externalService ;
public EnrichmentProcessor ( ExternalService service ) {
this . externalService = service;
}
@ Override
public Progress process ( Processing processing ) {
for ( DocumentOperation op : processing . getDocumentOperations ()) {
if (op instanceof DocumentPut put) {
Document doc = put . getDocument ();
// Add computed fields
String title = doc . getFieldValue ( "title" ). toString ();
doc . setFieldValue ( "title_length" , title . length ());
// Fetch external data
List < String > categories =
externalService . categorize (title);
Array < StringFieldValue > catArray =
new Array <>( doc . getField ( "categories" ). getDataType ());
categories . forEach (cat ->
catArray . add ( new StringFieldValue (cat)));
doc . setFieldValue ( "categories" , catArray);
}
}
return Progress . DONE ;
}
}
Asynchronous Processing
import java.util.concurrent.CompletableFuture;
public class AsyncProcessor extends DocumentProcessor {
private final AsyncService asyncService ;
public AsyncProcessor ( AsyncService service ) {
this . asyncService = service;
}
@ Override
public Progress process ( Processing processing ) {
for ( DocumentOperation op : processing . getDocumentOperations ()) {
if (op instanceof DocumentPut put) {
Document doc = put . getDocument ();
String key = "async_" + doc . getId ();
// Check if async operation was started
if ( processing . getVariable (key) == null ) {
// Start async operation
CompletableFuture < String > future =
asyncService . fetchDataAsync (doc);
processing . setVariable (key, future);
return Progress . LATER ;
}
// Retrieve result
CompletableFuture < String > future =
( CompletableFuture < String > ) processing . getVariable (key);
if ( ! future . isDone ()) {
return Progress . later ( 50 ); // Wait 50ms
}
try {
String data = future . get ();
doc . setFieldValue ( "enriched_data" , data);
} catch ( Exception e ) {
return Progress . FAILED
. withReason ( "Async operation failed: " + e . getMessage ());
}
}
}
return Progress . DONE ;
}
}
Document Type Routing
import com.yahoo.docproc.CallStack;
public class RoutingProcessor extends DocumentProcessor {
@ Override
public Progress process ( Processing processing ) {
for ( DocumentOperation op : processing . getDocumentOperations ()) {
if (op instanceof DocumentPut put) {
Document doc = put . getDocument ();
String docType = doc . getDataType (). getName ();
// Route based on document type
CallStack callStack = processing . callStack ();
switch (docType) {
case "article" ->
callStack . addNext ( new Call ( "article-chain" ));
case "product" ->
callStack . addNext ( new Call ( "product-chain" ));
default ->
getLogger (). warning ( "Unknown type: " + docType);
}
}
}
return Progress . DONE ;
}
}
Field Mapping
Document processors can map field names using schema mapping:
public class MappingProcessor extends DocumentProcessor {
@ Override
public Progress process ( Processing processing ) {
for ( DocumentOperation op : processing . getDocumentOperations ()) {
if (op instanceof DocumentPut put) {
Document doc = put . getDocument ();
String docType = doc . getDataType (). getName ();
// Get field mappings for this document type
Map < String , String > fieldMap = getDocMap (docType);
// Apply mappings
for ( Map . Entry < String , String > entry : fieldMap . entrySet ()) {
String from = entry . getKey ();
String to = entry . getValue ();
var value = doc . getFieldValue (from);
if (value != null ) {
doc . setFieldValue (to, value);
doc . removeFieldValue (from);
}
}
}
}
return Progress . DONE ;
}
}
Configuration
Add document processors to services.xml:
< container id = "default" version = "1.0" >
< document-processing >
< chain id = "default" >
< documentprocessor id = "com.example.ValidationProcessor" />
< documentprocessor id = "com.example.EnrichmentProcessor" bundle = "my-bundle" />
</ chain >
</ document-processing >
</ container >
Multiple Chains
< document-processing >
< chain id = "article-processing" >
< documentprocessor id = "com.example.ArticleProcessor" />
</ chain >
< chain id = "product-processing" >
< documentprocessor id = "com.example.ProductProcessor" />
</ chain >
< chain id = "default" inherits = "article-processing" >
< documentprocessor id = "com.example.CommonProcessor" />
</ chain >
</ document-processing >
Access Control with @Accesses
Document processors can declare field access patterns:
import com.yahoo.docproc.Accesses;
@ Accesses ({
@ Accesses.Field ( name = "title" , dataType = "string" ),
@ Accesses.Field ( name = "timestamp" , dataType = "long" )
})
public class AnnotatedProcessor extends DocumentProcessor {
// The framework can optimize based on declared field access
@ Override
public Progress process ( Processing processing ) {
// ...
}
}
Thread Safety
Document processors are called by multiple threads concurrently. Ensure thread-safe access to shared mutable state.
import java.util.concurrent.atomic.AtomicLong;
public class ThreadSafeProcessor extends DocumentProcessor {
// Thread-safe counter
private final AtomicLong processedCount = new AtomicLong ( 0 );
// Immutable configuration (safe)
private final String configuration ;
public ThreadSafeProcessor ( MyConfig config ) {
this . configuration = config . value ();
}
@ Override
public Progress process ( Processing processing ) {
processedCount . incrementAndGet ();
// Process documents
return Progress . DONE ;
}
public long getProcessedCount () {
return processedCount . get ();
}
}
Real-World Example from Vespa
Based on ~/workspace/source/docproc/src/main/java/com/yahoo/docproc/SimpleDocumentProcessor.java:27:
public abstract class SimpleDocumentProcessor extends DocumentProcessor {
public void process ( DocumentPut put ) {
// Override to process puts
}
public void process ( DocumentUpdate update ) {
// Override to process updates
}
public void process ( DocumentRemove remove ) {
// Override to process removes
}
@ Override
public final Progress process ( Processing processing ) {
int initialSize = processing . getDocumentOperations (). size ();
for ( DocumentOperation op : processing . getDocumentOperations ()) {
try {
if (op instanceof DocumentPut) {
process ((DocumentPut) op);
} else if (op instanceof DocumentUpdate) {
process ((DocumentUpdate) op);
} else if (op instanceof DocumentRemove) {
process ((DocumentRemove) op);
}
} catch ( RuntimeException e ) {
if ( log . isLoggable ( Level . FINE ) && initialSize != 1 ) {
log . log ( Level . FINE ,
"Processing failed for batch of " +
initialSize + " operations." , e);
}
throw e;
}
}
return Progress . DONE ;
}
}
Testing
import com.yahoo.docproc.Processing;
import com.yahoo.document.Document;
import com.yahoo.document.DocumentType;
import com.yahoo.document.DocumentPut;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions. * ;
public class MyProcessorTest {
@ Test
public void testDocumentProcessing () {
MyDocumentProcessor processor = new MyDocumentProcessor ();
// Create test document
DocumentType type = new DocumentType ( "test" );
Document doc = new Document (type, "id:test:test::1" );
doc . setFieldValue ( "title" , "Test Document" );
DocumentPut put = new DocumentPut (doc);
Processing processing = new Processing ();
processing . addDocumentOperation (put);
// Process
DocumentProcessor . Progress result = processor . process (processing);
// Verify
assertEquals ( DocumentProcessor . Progress . DONE , result);
assertTrue ((Boolean) doc . getFieldValue ( "processed" ));
}
}
Batch operations : Process multiple operations together when possible
Minimize external calls : Cache data or use async processing
Avoid blocking : Use Progress.LATER for long-running operations
Monitor processing time : Document processors are on the critical feed path
Next Steps