AsyncCache and AsyncLoadingCache.
Why Async Caching?
Asynchronous caches provide several key benefits:Non-Blocking
Threads aren’t blocked waiting for cache operations
Better Scalability
Handle more concurrent requests with fewer threads
Composable
Chain operations using CompletableFuture API
Reactive Ready
Integrates with reactive frameworks
AsyncCache Basics
Simple Async Operations
import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.concurrent.CompletableFuture;
AsyncCache<String, User> cache = Caffeine.newBuilder()
.maximumSize(10_000)
.expireAfterWrite(Duration.ofMinutes(10))
.buildAsync();
// Non-blocking get
CompletableFuture<User> userFuture = cache.get(userId, key -> {
// This executes asynchronously
return database.loadUser(key);
});
// Process result when ready
userFuture.thenAccept(user -> {
System.out.println("Loaded: " + user.getName());
});
Providing CompletableFuture
For truly async loading, provide aCompletableFuture directly:
AsyncCache<String, User> cache = Caffeine.newBuilder()
.buildAsync();
// Mapping function receives an Executor
CompletableFuture<User> future = cache.get(userId,
(key, executor) -> {
// Use the executor for async operations
return CompletableFuture.supplyAsync(
() -> database.loadUser(key),
executor
);
}
);
Using the provided executor ensures cache operations use the configured thread pool.
AsyncLoadingCache Patterns
Basic Async Loading
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
AsyncLoadingCache<String, User> cache = Caffeine.newBuilder()
.maximumSize(10_000)
.buildAsync(key -> database.loadUserAsync(key));
// Automatically loads if not present
CompletableFuture<User> future = cache.get(userId);
User user = future.join(); // Block if needed
Advanced Async Loader
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
class UserAsyncLoader implements AsyncCacheLoader<String, User> {
private final UserDatabase database;
private final MetricsCollector metrics;
@Override
public CompletableFuture<User> asyncLoad(String key, Executor executor) {
metrics.recordCacheMiss(key);
return CompletableFuture.supplyAsync(() -> {
User user = database.loadUser(key);
metrics.recordLoadTime(key);
return user;
}, executor);
}
@Override
public CompletableFuture<Map<String, User>> asyncLoadAll(
Set<? extends String> keys,
Executor executor) {
metrics.recordBulkLoad(keys.size());
return CompletableFuture.supplyAsync(
() -> database.loadUsers(keys),
executor
);
}
@Override
public CompletableFuture<User> asyncReload(
String key,
User oldValue,
Executor executor) {
// Conditional reload based on old value
return CompletableFuture.supplyAsync(() -> {
if (oldValue.isExpired()) {
return database.loadUser(key);
}
return oldValue;
}, executor);
}
}
AsyncLoadingCache<String, User> cache = Caffeine.newBuilder()
.maximumSize(10_000)
.refreshAfterWrite(Duration.ofMinutes(5))
.buildAsync(new UserAsyncLoader());
Composing Async Operations
Chaining Operations
AsyncCache<String, User> cache = Caffeine.newBuilder().buildAsync();
CompletableFuture<String> result = cache.get(userId, key ->
database.loadUserAsync(key)
)
.thenApply(user -> user.getEmail())
.thenApply(email -> email.toLowerCase())
.thenCompose(email -> emailService.validateAsync(email));
result.thenAccept(validEmail ->
System.out.println("Valid email: " + validEmail)
);
Combining Multiple Futures
AsyncLoadingCache<String, User> userCache =
Caffeine.newBuilder().buildAsync(this::loadUser);
AsyncLoadingCache<String, Profile> profileCache =
Caffeine.newBuilder().buildAsync(this::loadProfile);
// Load user and profile in parallel
CompletableFuture<User> userFuture = userCache.get(userId);
CompletableFuture<Profile> profileFuture = profileCache.get(userId);
CompletableFuture<UserView> combined = userFuture.thenCombine(
profileFuture,
(user, profile) -> new UserView(user, profile)
);
Error Handling
AsyncCache<String, User> cache = Caffeine.newBuilder().buildAsync();
CompletableFuture<User> future = cache.get(userId, key ->
database.loadUserAsync(key)
)
.exceptionally(throwable -> {
log.error("Failed to load user: " + userId, throwable);
return getDefaultUser();
})
.whenComplete((user, throwable) -> {
if (throwable != null) {
metrics.recordFailure();
} else {
metrics.recordSuccess();
}
});
Failed CompletableFutures are automatically removed from the cache. Ensure you have proper retry logic.
Bulk Async Operations
Efficient Batch Loading
AsyncLoadingCache<String, User> cache = Caffeine.newBuilder()
.maximumSize(10_000)
.buildAsync(new AsyncCacheLoader<String, User>() {
@Override
public CompletableFuture<User> asyncLoad(
String key,
Executor executor) {
return CompletableFuture.supplyAsync(
() -> database.loadUser(key),
executor
);
}
@Override
public CompletableFuture<Map<String, User>> asyncLoadAll(
Set<? extends String> keys,
Executor executor) {
return CompletableFuture.supplyAsync(() -> {
// Single database query for efficiency
return database.loadUsersInBatch(keys);
}, executor);
}
});
// Load multiple users efficiently
Set<String> userIds = Set.of("user1", "user2", "user3");
CompletableFuture<Map<String, User>> future = cache.getAll(userIds);
future.thenAccept(users -> {
users.forEach((id, user) ->
System.out.println(id + ": " + user.getName())
);
});
Parallel Async Loads
public CompletableFuture<Dashboard> loadDashboard(String userId) {
AsyncLoadingCache<String, User> userCache = getUserCache();
AsyncLoadingCache<String, Stats> statsCache = getStatsCache();
AsyncLoadingCache<String, Activity> activityCache = getActivityCache();
// All load in parallel
CompletableFuture<User> userFuture = userCache.get(userId);
CompletableFuture<Stats> statsFuture = statsCache.get(userId);
CompletableFuture<Activity> activityFuture = activityCache.get(userId);
// Combine when all complete
return CompletableFuture.allOf(
userFuture, statsFuture, activityFuture
).thenApply(v -> new Dashboard(
userFuture.join(),
statsFuture.join(),
activityFuture.join()
));
}
Refresh Strategies
Automatic Background Refresh
AsyncLoadingCache<String, PriceData> cache = Caffeine.newBuilder()
.refreshAfterWrite(Duration.ofSeconds(30))
.buildAsync(new AsyncCacheLoader<String, PriceData>() {
@Override
public CompletableFuture<PriceData> asyncLoad(
String symbol,
Executor executor) {
return priceService.fetchPriceAsync(symbol);
}
@Override
public CompletableFuture<PriceData> asyncReload(
String symbol,
PriceData oldPrice,
Executor executor) {
// Fetch only if price changed
return priceService.fetchIfChangedAsync(symbol, oldPrice)
.exceptionally(throwable -> {
// On error, keep old value
log.warn("Refresh failed, using cached value", throwable);
return oldPrice;
});
}
});
// First request blocks, subsequent requests get stale value
// while refresh happens in background
CompletableFuture<PriceData> price = cache.get("AAPL");
During refresh, the old value is returned immediately while the new value loads in the background.
Manual Refresh Control
public class RefreshableCache<K, V> {
private final AsyncLoadingCache<K, V> cache;
private final ScheduledExecutorService scheduler;
public RefreshableCache(AsyncCacheLoader<K, V> loader) {
this.cache = Caffeine.newBuilder()
.maximumSize(10_000)
.buildAsync(loader);
this.scheduler = Executors.newScheduledThreadPool(1);
}
public void startPeriodicRefresh(Set<K> keys, Duration interval) {
scheduler.scheduleAtFixedRate(
() -> refreshKeys(keys),
interval.toMillis(),
interval.toMillis(),
TimeUnit.MILLISECONDS
);
}
private void refreshKeys(Set<K> keys) {
// Refresh doesn't require type casting since cache is AsyncLoadingCache
keys.forEach(key -> {
if (cache instanceof AsyncLoadingCache) {
((AsyncLoadingCache<K, V>) cache).get(key)
.exceptionally(throwable -> {
log.error("Refresh failed for key: " + key, throwable);
return null;
});
}
});
}
}
Integration Patterns
With Spring WebFlux
import reactor.core.publisher.Mono;
import org.springframework.web.bind.annotation.*;
@RestController
public class UserController {
private final AsyncLoadingCache<String, User> cache;
public UserController() {
this.cache = Caffeine.newBuilder()
.maximumSize(10_000)
.buildAsync(this::loadUser);
}
@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable String id) {
return Mono.fromFuture(cache.get(id));
}
@GetMapping("/users")
public Mono<Map<String, User>> getUsers(
@RequestParam Set<String> ids) {
return Mono.fromFuture(cache.getAll(ids));
}
private User loadUser(String id) {
return userRepository.findById(id)
.orElseThrow(() -> new UserNotFoundException(id));
}
}
With CompletableFuture Utilities
public class AsyncCacheService {
private final AsyncLoadingCache<String, Data> cache;
public CompletableFuture<List<Data>> getMultiple(List<String> ids) {
List<CompletableFuture<Data>> futures = ids.stream()
.map(cache::get)
.toList();
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
).thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.toList()
);
}
public CompletableFuture<Data> getWithTimeout(
String id,
Duration timeout) {
return cache.get(id)
.orTimeout(timeout.toMillis(), TimeUnit.MILLISECONDS)
.exceptionally(throwable -> {
log.warn("Cache load timed out for: " + id);
return getDefaultData();
});
}
}
Coalescing Bulk Loader
Batch multiple individual requests into bulk operations:import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class CoalescingLoader<K, V> implements AsyncCacheLoader<K, V> {
private final Function<Set<K>, Map<K, V>> bulkLoader;
private final ConcurrentLinkedQueue<K> pendingKeys;
private final ConcurrentHashMap<K, CompletableFuture<V>> pending;
private final ScheduledExecutorService scheduler;
private final Duration maxWait;
private final int maxBatchSize;
@Override
public CompletableFuture<V> asyncLoad(K key, Executor executor) {
CompletableFuture<V> future = new CompletableFuture<>();
pending.put(key, future);
pendingKeys.add(key);
// Schedule batch load if not already scheduled
if (pendingKeys.size() >= maxBatchSize) {
executeBatchLoad(executor);
}
return future;
}
private void executeBatchLoad(Executor executor) {
Set<K> keys = new HashSet<>();
Map<K, CompletableFuture<V>> futures = new HashMap<>();
// Collect pending requests
K key;
while ((key = pendingKeys.poll()) != null && keys.size() < maxBatchSize) {
keys.add(key);
futures.put(key, pending.remove(key));
}
if (keys.isEmpty()) return;
// Execute bulk load
CompletableFuture.supplyAsync(() -> bulkLoader.apply(keys), executor)
.thenAccept(results -> {
futures.forEach((k, f) -> f.complete(results.get(k)));
})
.exceptionally(throwable -> {
futures.values().forEach(f -> f.completeExceptionally(throwable));
return null;
});
}
}
Performance Considerations
- Executor Configuration
- Avoiding Blocking
- Memory Management
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
// Use custom executor for better control
Executor customExecutor = Executors.newFixedThreadPool(20);
AsyncLoadingCache<String, User> cache = Caffeine.newBuilder()
.executor(customExecutor)
.maximumSize(10_000)
.buildAsync(key -> loadUser(key));
// For CPU-bound operations, use ForkJoinPool
AsyncLoadingCache<String, Report> reportCache = Caffeine.newBuilder()
.executor(ForkJoinPool.commonPool())
.buildAsync(key -> generateReport(key));
// BAD: Blocking in async context
AsyncCache<String, User> cache = Caffeine.newBuilder().buildAsync();
cache.get(userId, key -> {
return database.loadUser(key); // Blocks!
});
// GOOD: Truly async
cache.get(userId, (key, executor) ->
CompletableFuture.supplyAsync(
() -> database.loadUser(key),
executor
)
);
// Async caches store CompletableFuture objects
// Configure size based on future overhead
AsyncLoadingCache<String, LargeObject> cache = Caffeine.newBuilder()
.maximumSize(1_000) // Lower than sync cache
.buildAsync(key -> loadLargeObject(key));
// Use weigher for better control
AsyncCache<String, Data> weightedCache = Caffeine.newBuilder()
.maximumWeight(10_000)
.weigher((key, future) -> {
// Weight based on result size
return 1; // Base weight for future
})
.buildAsync();
Best Practices
Use Provided Executor
Use Provided Executor
Always use the executor provided in the mapping function. This ensures proper thread pool management and prevents resource exhaustion.
Handle Failures Gracefully
Handle Failures Gracefully
Failed futures are automatically evicted. Implement retry logic and fallback values to handle transient failures.
Implement Bulk Operations
Implement Bulk Operations
Always implement
asyncLoadAll for AsyncLoadingCache. Batching significantly improves performance.Monitor CompletableFuture States
Monitor CompletableFuture States
Track how many futures are pending, completed, or failed. This helps identify bottlenecks and tune configuration.
Avoid Blocking Calls
Avoid Blocking Calls
Never call
.join() or .get() in async contexts. Use .thenApply(), .thenCompose(), and other non-blocking methods.Troubleshooting
Cache Loading Too Slow
// Add timeout to prevent hanging
CompletableFuture<User> future = cache.get(userId)
.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(throwable -> getDefaultUser());
Too Many Pending Futures
// Limit concurrent loads
Semaphore loadLimiter = new Semaphore(100);
AsyncCache<String, User> cache = Caffeine.newBuilder()
.buildAsync();
CompletableFuture<User> future = CompletableFuture.supplyAsync(() -> {
loadLimiter.acquire();
try {
return cache.get(userId, key -> loadUser(key)).join();
} finally {
loadLimiter.release();
}
});
Next Steps
Computing Values
Learn about atomic compute operations
Performance Tuning
Optimize async cache performance