Overview
InterviewGuide uses Redis Streams for asynchronous processing of long-running AI tasks. This allows the API to return immediately while work continues in the background.
Why Redis Streams?
Consumer Groups Multiple consumers can process tasks in parallel with automatic load balancing
At-Least-Once Delivery Messages are acknowledged (ACK) after processing, ensuring no task is lost
Retry Support Failed tasks are automatically retried with exponential backoff
Persistence Redis persists streams to disk, surviving restarts
Architecture
Producer-Consumer Pattern
Three Task Types
Stream Purpose Producer Consumer resume:analyze:streamAI resume analysis AnalyzeStreamProducerAnalyzeStreamConsumerknowledgebase:vectorize:streamDocument vectorization VectorizeStreamProducerVectorizeStreamConsumerinterview:evaluate:streamInterview evaluation EvaluateStreamProducerEvaluateStreamConsumer
AbstractStreamProducer
Base class for all producers, providing a consistent message sending pattern.
Location : common/async/AbstractStreamProducer.java:1
Template Method Pattern
// common/async/AbstractStreamProducer.java:14
public abstract class AbstractStreamProducer < T > {
private final RedisService redisService ;
protected void sendTask ( T payload ) {
try {
String messageId = redisService . streamAdd (
streamKey (),
buildMessage (payload),
AsyncTaskStreamConstants . STREAM_MAX_LEN
);
log . info ( "{}任务已发送到Stream: {}, messageId={}" ,
taskDisplayName (), payloadIdentifier (payload), messageId);
} catch ( Exception e ) {
log . error ( "发送{}任务失败: {}, error={}" ,
taskDisplayName (), payloadIdentifier (payload), e . getMessage (), e);
onSendFailed (payload, "任务入队失败: " + e . getMessage ());
}
}
// Template methods for subclasses to implement
protected abstract String taskDisplayName ();
protected abstract String streamKey ();
protected abstract Map < String , String > buildMessage ( T payload );
protected abstract String payloadIdentifier ( T payload );
protected abstract void onSendFailed ( T payload , String error );
}
Subclasses only need to implement the abstract methods. The sendTask method handles Redis communication, logging, and error handling.
Message Structure
All messages include:
Entity ID : Resume ID, KB ID, or Session ID
Content : The data to process (resume text, document content, etc.)
Retry Count : Tracks how many times this task has been retried
AnalyzeStreamProducer
Produces resume analysis tasks.
Location : modules/resume/listener/AnalyzeStreamProducer.java:1
Implementation
// modules/resume/listener/AnalyzeStreamProducer.java:19
@ Component
public class AnalyzeStreamProducer extends AbstractStreamProducer < AnalyzeTaskPayload > {
private final ResumeRepository resumeRepository ;
record AnalyzeTaskPayload ( Long resumeId, String content) {}
public void sendAnalyzeTask ( Long resumeId , String content ) {
sendTask ( new AnalyzeTaskPayload (resumeId, content));
}
@ Override
protected String taskDisplayName () {
return "分析" ;
}
@ Override
protected String streamKey () {
return AsyncTaskStreamConstants . RESUME_ANALYZE_STREAM_KEY ;
}
@ Override
protected Map < String , String > buildMessage ( AnalyzeTaskPayload payload ) {
return Map . of (
AsyncTaskStreamConstants . FIELD_RESUME_ID , payload . resumeId (). toString (),
AsyncTaskStreamConstants . FIELD_CONTENT , payload . content (),
AsyncTaskStreamConstants . FIELD_RETRY_COUNT , "0"
);
}
@ Override
protected String payloadIdentifier ( AnalyzeTaskPayload payload ) {
return "resumeId=" + payload . resumeId ();
}
@ Override
protected void onSendFailed ( AnalyzeTaskPayload payload , String error ) {
// Update database to mark task as failed
resumeRepository . findById ( payload . resumeId ()). ifPresent (resume -> {
resume . setAnalyzeStatus ( AsyncTaskStatus . FAILED );
resume . setAnalyzeError ( truncateError (error));
resumeRepository . save (resume);
});
}
}
Usage in Service Layer
// Called from ResumeUploadService after file upload
analyzeStreamProducer . sendAnalyzeTask ( savedResume . getId (), resumeText);
AbstractStreamConsumer
Base class for all consumers, managing the consumption loop, ACK, and retry logic.
Location : common/async/AbstractStreamConsumer.java:1
Lifecycle Management
// common/async/AbstractStreamConsumer.java:22
public abstract class AbstractStreamConsumer < T > {
private final RedisService redisService ;
private final AtomicBoolean running = new AtomicBoolean ( false );
private ExecutorService executorService ;
private String consumerName ;
@ PostConstruct
public void init () {
// Generate unique consumer name
this . consumerName = consumerPrefix () + UUID . randomUUID (). toString (). substring ( 0 , 8 );
// Create consumer group (idempotent)
try {
redisService . createStreamGroup ( streamKey (), groupName ());
log . info ( "Redis Stream group created or exists: {}" , groupName ());
} catch ( Exception e ) {
log . warn ( "Group creation exception (may already exist): {}" , e . getMessage ());
}
// Start consumer thread
this . executorService = Executors . newSingleThreadExecutor (r -> {
Thread t = new Thread (r, threadName ());
t . setDaemon ( true );
return t;
});
running . set ( true );
executorService . submit ( this :: consumeLoop);
log . info ( "{}消费者已启动: consumerName={}" , taskDisplayName (), consumerName);
}
@ PreDestroy
public void shutdown () {
running . set ( false );
if (executorService != null ) {
executorService . shutdown ();
}
log . info ( "{}消费者已关闭: consumerName={}" , taskDisplayName (), consumerName);
}
}
Each consumer runs in a dedicated daemon thread. On application shutdown, the @PreDestroy hook gracefully stops the consumer loop.
Consumption Loop
// common/async/AbstractStreamConsumer.java:64
private void consumeLoop () {
while ( running . get ()) {
try {
redisService . streamConsumeMessages (
streamKey (),
groupName (),
consumerName,
AsyncTaskStreamConstants . BATCH_SIZE , // 10 messages per poll
AsyncTaskStreamConstants . POLL_INTERVAL_MS , // 1 second timeout
this :: processMessage
);
} catch ( Exception e ) {
if ( Thread . currentThread (). isInterrupted ()) {
log . info ( "Consumer thread interrupted" );
break ;
}
log . error ( "Error consuming messages: {}" , e . getMessage (), e);
}
}
}
Message Processing with Retry
// common/async/AbstractStreamConsumer.java:85
private void processMessage ( StreamMessageId messageId, Map < String, String > data) {
T payload = parsePayload (messageId, data);
if (payload == null ) {
ackMessage (messageId);
return ;
}
int retryCount = parseRetryCount (data);
log . info ( "Processing {}: {}, messageId={}, retryCount={}" ,
taskDisplayName (), payloadIdentifier (payload), messageId, retryCount);
try {
markProcessing (payload);
processBusiness (payload); // Subclass implements business logic
markCompleted (payload);
ackMessage (messageId);
log . info ( "{}任务完成: {}" , taskDisplayName (), payloadIdentifier (payload));
} catch ( Exception e ) {
log . error ( "{}任务失败: {}, error={}" ,
taskDisplayName (), payloadIdentifier (payload), e . getMessage (), e);
if (retryCount < AsyncTaskStreamConstants . MAX_RETRY_COUNT ) {
// Retry by re-adding to stream
retryMessage (payload, retryCount + 1 );
} else {
// Max retries exceeded, mark as failed
markFailed (payload, truncateError (
taskDisplayName () + "失败(已重试" + retryCount + "次): " + e . getMessage ()
));
}
ackMessage (messageId);
}
}
ACK Always : Messages are acknowledged even on failure to prevent infinite retries. The retry logic re-adds the message with an incremented retry count.
AnalyzeStreamConsumer
Consumes resume analysis tasks and invokes AI grading service.
Location : modules/resume/listener/AnalyzeStreamConsumer.java:1
Implementation
// modules/resume/listener/AnalyzeStreamConsumer.java:24
@ Component
public class AnalyzeStreamConsumer extends AbstractStreamConsumer < AnalyzePayload > {
private final ResumeGradingService gradingService ;
private final ResumePersistenceService persistenceService ;
private final ResumeRepository resumeRepository ;
record AnalyzePayload ( Long resumeId, String content) {}
@ Override
protected String taskDisplayName () {
return "简历分析" ;
}
@ Override
protected String streamKey () {
return AsyncTaskStreamConstants . RESUME_ANALYZE_STREAM_KEY ;
}
@ Override
protected String groupName () {
return AsyncTaskStreamConstants . RESUME_ANALYZE_GROUP_NAME ;
}
@ Override
protected AnalyzePayload parsePayload ( StreamMessageId messageId , Map < String , String > data ) {
String resumeIdStr = data . get ( AsyncTaskStreamConstants . FIELD_RESUME_ID );
String content = data . get ( AsyncTaskStreamConstants . FIELD_CONTENT );
if (resumeIdStr == null || content == null ) {
log . warn ( "Invalid message format, skipping: messageId={}" , messageId);
return null ;
}
return new AnalyzePayload ( Long . parseLong (resumeIdStr), content);
}
@ Override
protected void markProcessing ( AnalyzePayload payload ) {
updateAnalyzeStatus ( payload . resumeId (), AsyncTaskStatus . PROCESSING , null );
}
@ Override
protected void processBusiness ( AnalyzePayload payload ) {
Long resumeId = payload . resumeId ();
// Check if resume still exists (user may have deleted it)
if ( ! resumeRepository . existsById (resumeId)) {
log . warn ( "Resume deleted, skipping analysis: resumeId={}" , resumeId);
return ;
}
// Call AI service to analyze resume
ResumeAnalysisResponse analysis = gradingService . analyzeResume ( payload . content ());
// Save results to database
ResumeEntity resume = resumeRepository . findById (resumeId). orElse ( null );
if (resume == null ) {
log . warn ( "Resume deleted during analysis, skipping save: resumeId={}" , resumeId);
return ;
}
persistenceService . saveAnalysis (resume, analysis);
}
@ Override
protected void markCompleted ( AnalyzePayload payload ) {
updateAnalyzeStatus ( payload . resumeId (), AsyncTaskStatus . COMPLETED , null );
}
@ Override
protected void markFailed ( AnalyzePayload payload , String error ) {
updateAnalyzeStatus ( payload . resumeId (), AsyncTaskStatus . FAILED , error);
}
@ Override
protected void retryMessage ( AnalyzePayload payload , int retryCount ) {
try {
Map < String , String > message = Map . of (
AsyncTaskStreamConstants . FIELD_RESUME_ID , payload . resumeId (). toString (),
AsyncTaskStreamConstants . FIELD_CONTENT , payload . content (),
AsyncTaskStreamConstants . FIELD_RETRY_COUNT , String . valueOf (retryCount)
);
redisService (). streamAdd (
AsyncTaskStreamConstants . RESUME_ANALYZE_STREAM_KEY ,
message,
AsyncTaskStreamConstants . STREAM_MAX_LEN
);
log . info ( "Resume analysis task re-queued: resumeId={}, retryCount={}" ,
payload . resumeId (), retryCount);
} catch ( Exception e ) {
log . error ( "Retry enqueue failed: resumeId={}, error={}" ,
payload . resumeId (), e . getMessage (), e);
updateAnalyzeStatus ( payload . resumeId (), AsyncTaskStatus . FAILED ,
truncateError ( "重试入队失败: " + e . getMessage ()));
}
}
private void updateAnalyzeStatus ( Long resumeId , AsyncTaskStatus status , String error ) {
try {
resumeRepository . findById (resumeId). ifPresent (resume -> {
resume . setAnalyzeStatus (status);
resume . setAnalyzeError (error);
resumeRepository . save (resume);
log . debug ( "Analysis status updated: resumeId={}, status={}" , resumeId, status);
});
} catch ( Exception e ) {
log . error ( "Failed to update analysis status: resumeId={}, status={}, error={}" ,
resumeId, status, e . getMessage (), e);
}
}
}
State Transitions
VectorizeStreamProducer & Consumer
Handles asynchronous document vectorization for the knowledge base.
Producer
Location : modules/knowledgebase/listener/VectorizeStreamProducer.java:1
// modules/knowledgebase/listener/VectorizeStreamProducer.java:19
@ Component
public class VectorizeStreamProducer extends AbstractStreamProducer < VectorizeTaskPayload > {
record VectorizeTaskPayload ( Long kbId, String content) {}
public void sendVectorizeTask ( Long kbId , String content ) {
sendTask ( new VectorizeTaskPayload (kbId, content));
}
@ Override
protected String streamKey () {
return AsyncTaskStreamConstants . KB_VECTORIZE_STREAM_KEY ;
}
@ Override
protected Map < String , String > buildMessage ( VectorizeTaskPayload payload ) {
return Map . of (
AsyncTaskStreamConstants . FIELD_KB_ID , payload . kbId (). toString (),
AsyncTaskStreamConstants . FIELD_CONTENT , payload . content (),
AsyncTaskStreamConstants . FIELD_RETRY_COUNT , "0"
);
}
}
Consumer
Location : modules/knowledgebase/listener/VectorizeStreamConsumer.java:1
// modules/knowledgebase/listener/VectorizeStreamConsumer.java:21
@ Component
public class VectorizeStreamConsumer extends AbstractStreamConsumer < VectorizePayload > {
private final KnowledgeBaseVectorService vectorService ;
record VectorizePayload ( Long kbId, String content) {}
@ Override
protected void processBusiness ( VectorizePayload payload ) {
// Split document into chunks, generate embeddings, and store in pgvector
vectorService . vectorizeAndStore ( payload . kbId (), payload . content ());
}
@ Override
protected void markProcessing ( VectorizePayload payload ) {
updateVectorStatus ( payload . kbId (), VectorStatus . PROCESSING , null );
}
@ Override
protected void markCompleted ( VectorizePayload payload ) {
updateVectorStatus ( payload . kbId (), VectorStatus . COMPLETED , null );
}
@ Override
protected void markFailed ( VectorizePayload payload , String error ) {
updateVectorStatus ( payload . kbId (), VectorStatus . FAILED , error);
}
}
Configuration Constants
Location : common/constant/AsyncTaskStreamConstants.java:1
// common/constant/AsyncTaskStreamConstants.java:7
public final class AsyncTaskStreamConstants {
// ========== General Configuration ==========
public static final String FIELD_RETRY_COUNT = "retryCount" ;
public static final String FIELD_CONTENT = "content" ;
public static final int MAX_RETRY_COUNT = 3 ;
public static final int BATCH_SIZE = 10 ; // Messages per poll
public static final long POLL_INTERVAL_MS = 1000 ; // 1 second
public static final int STREAM_MAX_LEN = 1000 ; // Auto-trim old messages
// ========== Resume Analysis Stream ==========
public static final String RESUME_ANALYZE_STREAM_KEY = "resume:analyze:stream" ;
public static final String RESUME_ANALYZE_GROUP_NAME = "analyze-group" ;
public static final String RESUME_ANALYZE_CONSUMER_PREFIX = "analyze-consumer-" ;
public static final String FIELD_RESUME_ID = "resumeId" ;
// ========== Knowledge Base Vectorization Stream ==========
public static final String KB_VECTORIZE_STREAM_KEY = "knowledgebase:vectorize:stream" ;
public static final String KB_VECTORIZE_GROUP_NAME = "vectorize-group" ;
public static final String KB_VECTORIZE_CONSUMER_PREFIX = "vectorize-consumer-" ;
public static final String FIELD_KB_ID = "kbId" ;
// ========== Interview Evaluation Stream ==========
public static final String INTERVIEW_EVALUATE_STREAM_KEY = "interview:evaluate:stream" ;
public static final String INTERVIEW_EVALUATE_GROUP_NAME = "evaluate-group" ;
public static final String INTERVIEW_EVALUATE_CONSUMER_PREFIX = "evaluate-consumer-" ;
public static final String FIELD_SESSION_ID = "sessionId" ;
}
Example: Resume Analysis Message
{
"resumeId" : "12345" ,
"content" : "张三,5年Java开发经验..." ,
"retryCount" : "0"
}
Example: Vectorization Message
{
"kbId" : "67890" ,
"content" : "Redis Streams provide a powerful..." ,
"retryCount" : "0"
}
Monitoring & Debugging
Redis CLI Commands
Stream Info
Consumer Group Info
Pending Messages
Read Messages
# View stream length and consumer groups
redis-cli XINFO STREAM resume:analyze:stream
Application Logs
Look for these log patterns:
[INFO] 分析任务已发送到Stream: resumeId=12345, messageId=1678901234567-0
[INFO] 开始处理简历分析任务: resumeId=12345, messageId=1678901234567-0, retryCount=0
[INFO] 简历分析任务完成: resumeId=12345
Failure logs:
[ERROR] 简历分析任务失败: resumeId=12345, error=AI service timeout
[INFO] 简历分析任务已重新入队: resumeId=12345, retryCount=1
Best Practices
Avoid embedding large payloads in messages. Store large data in the database and pass only IDs in the message. Exception : Resume text is passed directly because it’s needed for AI analysis and may not be cached in DB.
Consumers should handle duplicate processing gracefully. Check entity state before processing. if ( ! resumeRepository . existsById (resumeId)) {
log . warn ( "Resume deleted, skipping" );
return ;
}
Always update entity status in the database:
PENDING : Task queued
PROCESSING : Consumer working
COMPLETED : Success
FAILED : All retries exhausted
Limit error message length to avoid database column overflow: protected String truncateError ( String error) {
if (error == null ) return null ;
return error . length () > 500 ? error . substring ( 0 , 500 ) : error;
}
Use @PreDestroy to stop consumers cleanly: @ PreDestroy
public void shutdown () {
running . set ( false );
executorService . shutdown ();
}
See Also
Service Layer How services coordinate with Redis Streams
Vector Store Async vectorization with VectorizeStreamConsumer
Async Processing High-level architecture overview
Configuration Redis connection and stream settings