Skip to main content

What is Reactive Programming?

Reactive Programming is a programming paradigm focused on asynchronous data streams and non-blocking operations. The Blackjack API uses Spring WebFlux with Project Reactor to achieve high scalability and efficient resource usage.

Why Reactive?

Non-Blocking I/O

Threads aren’t blocked waiting for database or network operations, enabling high concurrency.

Backpressure

Consumers can signal producers to slow down, preventing overwhelming of downstream components.

Resource Efficiency

Handle thousands of concurrent requests with a small thread pool.

Composability

Chain and transform asynchronous operations declaratively.

Reactive Stack

TechnologyPurpose
Spring WebFluxReactive web framework
Project ReactorReactive streams implementation (Mono, Flux)
Spring Data R2DBCReactive relational database access (MySQL)
Spring Data MongoDB ReactiveReactive MongoDB access

Mono and Flux

Project Reactor provides two main types:

Mono<T>

Represents a stream of 0 or 1 element.
Mono<Game> gameStream = gameRepo.findById(gameId);
// Emits: either one Game or empty (not found)
Use cases:
  • Single database record lookup
  • HTTP response with one result
  • Creating/updating a single entity

Flux<T>

Represents a stream of 0 to N elements.
Flux<Player> playerStream = playerRepo.findRanking();
// Emits: multiple Player objects, one by one
Use cases:
  • Querying multiple records
  • Streaming large datasets
  • Server-Sent Events (SSE)

Reactive Repositories

Repositories in the Blackjack API return reactive types.

GameRepositoryPort (Interface)

Location: domain/port/GameRepositoryPort.java:7-11
public interface GameRepositoryPort {
    Mono<Game> save(Game game);
    Mono<Game> findById(GameId id);
    Mono<Void> deleteById(GameId id);
}
Mono<Void> represents an operation that completes successfully but returns no value (like a DELETE operation).

PlayerRepositoryPort (Interface)

Location: domain/port/PlayerRepositoryPort.java:10-15
public interface PlayerRepositoryPort {
    Mono<Player> save(Player player);
    Mono<Player> findById(PlayerId id);
    Mono<Player> findOrCreateByName(PlayerName name);
    Flux<Player> findRanking();  // Multiple players!
}
findRanking() returns Flux<Player> because it retrieves multiple players ordered by score.

Reactive MongoDB Adapter

Location: infrastructure/persistence/mongo/MongoGameRepositoryAdapter.java:18-28
@Override
public Mono<Game> save(Game game) {
    MongoGameDocument doc = MongoGameMapper.toDocument(game);
    return repo.save(doc).thenReturn(game);
}

@Override
public Mono<Game> findById(GameId id) {
    return repo.findById(id.value())
            .map(MongoGameMapper::toDomain);
}

Spring Data MongoDB Reactive

Location: infrastructure/persistence/mongo/SpringDataMongoGameRepository.java:5-6
public interface SpringDataMongoGameRepository 
        extends ReactiveMongoRepository<MongoGameDocument, String> {
}
ReactiveMongoRepository automatically provides reactive methods like:
  • Mono<T> save(T entity)
  • Mono<T> findById(ID id)
  • Flux<T> findAll()
  • Mono<Void> deleteById(ID id)

Reactive MySQL Adapter

Location: infrastructure/persistence/mysql/MySqlPlayerRepositoryAdapter.java:20-36
@Override
public Mono<Player> save(Player player) {
    return repo.findByExternalId(player.id().value())
            .defaultIfEmpty(new PlayerRow(
                    null,
                    player.id().value(),
                    player.name().value(),
                    player.wins(),
                    player.losses()
            ))
            .flatMap(existing -> {
                existing.setName(player.name().value());
                existing.setWins(player.wins());
                existing.setLosses(player.losses());
                return repo.save(existing).map(this::toDomain);
            });
}

Reactive Operators Explained

If the stream is empty (player not found), emit a default value (new PlayerRow).
repo.findByExternalId(id)
    .defaultIfEmpty(newPlayerRow)  // Use this if not found
Transform each emitted item into a new Mono/Flux and flatten the result.
repo.findById(id)
    .flatMap(player -> repo.save(updatedPlayer))  // Chain async operations
Synchronously transform each emitted item.
repo.findById(id)
    .map(MongoGameMapper::toDomain)  // Convert document to domain model
Switch to an alternative stream if the original is empty.
repo.findByName(name)
    .switchIfEmpty(Mono.error(new NotFoundException()))  // Throw error if not found

Spring Data R2DBC

Location: infrastructure/persistence/mysql/SpringDataR2dbcPlayerRepository.java:8-18
public interface SpringDataR2dbcPlayerRepository 
        extends ReactiveCrudRepository<PlayerRow, Long> {
    
    Mono<PlayerRow> findByName(String name);
    Mono<PlayerRow> findByExternalId(String externalId);

    @Query("""
    SELECT *
    FROM players
    ORDER BY (wins - losses) DESC, wins DESC, losses ASC, name ASC
    """)
    Flux<PlayerRow> findRanking();
}
R2DBC (Reactive Relational Database Connectivity) provides reactive, non-blocking access to relational databases like MySQL and PostgreSQL.

Reactive Use Cases

CreateNewGameUseCase

Location: application/usecase/CreateNewGameUseCase.java:22-40
public Mono<CreateGameResult> execute(CreateGameCommand cmd) {
    final PlayerName name;
    try {
        name = new PlayerName(cmd.playerName());
    } catch (IllegalArgumentException e) {
        return Mono.error(new InvalidPlayerNameException(e.getMessage()));
    }

    return playerRepo.findOrCreateByName(name)
            .flatMap(player -> {
                Game game = Game.newGame(player.id());
                return gameRepo.save(game)
                        .map(saved -> new CreateGameResult(
                                saved.id().value(),
                                player.id().value(),
                                saved.status().name()
                        ));
            });
}

Reactive Flow Breakdown

1

Validate Input

Create PlayerName value object (synchronous validation)
2

Find or Create Player

playerRepo.findOrCreateByName(name) returns Mono<Player>
3

Create Game (flatMap)

Once player is emitted, create Game domain object (synchronous)
4

Save Game

gameRepo.save(game) returns Mono<Game>
5

Map to Result

Transform Game to CreateGameResult DTO
6

Return Mono

Return Mono<CreateGameResult> to controller
Common Mistake: Using map() when you need flatMap().
// Wrong: map returns Mono<Mono<Game>>
playerRepo.findById(id)
    .map(player -> gameRepo.save(game));  // ❌ Nested Mono!

// Correct: flatMap flattens Mono<Mono<Game>> to Mono<Game>
playerRepo.findById(id)
    .flatMap(player -> gameRepo.save(game));  // ✅ Flat Mono

PlayMoveUseCase

Location: application/usecase/PlayMoveUseCase.java:24-59
public Mono<GameStateResult> execute(PlayMoveCommand cmd) {
    MoveAction action = MoveAction.valueOf(cmd.action().toUpperCase());

    return gameRepo.findById(new GameId(cmd.gameId()))
            .switchIfEmpty(Mono.error(new GameNotFoundException(cmd.gameId())))
            .map(game -> applyMove(game, action))
            .flatMap(this::persistAndUpdateStats)
            .map(mapper::toResultForPlayer);
}

private Game applyMove(Game game, MoveAction action) {
    return switch (action) {
        case HIT -> game.hit();
        case STAND -> game.stand();
    };
}

private Mono<Void> updatePlayerStatsIfFinished(Game game) {
    if (game.status() == GameStatus.IN_PROGRESS) return Mono.empty();

    return playerRepo.findById(game.playerId())
            .flatMap(player -> {
                Player updated = switch (game.status()) {
                    case PLAYER_WINS -> player.registerWin();
                    case DEALER_WINS -> player.registerLoss();
                    case PUSH -> player;
                    default -> player;
                };
                return playerRepo.save(updated).then();
            });
}

private Mono<Game> persistAndUpdateStats(Game updated) {
    return gameRepo.save(updated)
            .flatMap(saved -> updatePlayerStatsIfFinished(saved).thenReturn(saved));
}

Advanced Reactive Patterns

then(): Ignore the emitted value and complete.
playerRepo.save(updated).then()  // Returns Mono<Void>
thenReturn(value): Ignore emitted value and return a specific value.
updateStats(game).thenReturn(game)  // Returns Mono<Game>
Returns a Mono that completes without emitting any value.
if (game.status() == GameStatus.IN_PROGRESS) 
    return Mono.empty();  // No stats to update
Returns a Mono that immediately signals an error.
.switchIfEmpty(Mono.error(new GameNotFoundException(id)))

Reactive Controllers

GameController

Location: infrastructure/web/controller/GameController.java:43-47
@PostMapping("/new")
@ResponseStatus(HttpStatus.CREATED)
public Mono<CreateGameResult> create(@Valid @RequestBody CreateGameRequest request) {
    return createNewGame.execute(new CreateGameCommand(request.playerName()));
}
Spring WebFlux automatically:
  1. Subscribes to the Mono<CreateGameResult>
  2. Waits asynchronously for the result (non-blocking)
  3. Serializes the result to JSON
  4. Sends HTTP 201 response

Handling Multiple Results

Location: infrastructure/web/controller/RankingController.java
@GetMapping
public Flux<RankingEntryResult> getRanking() {
    return viewRanking.execute();
}
Returning Flux<T> from a controller:
  • Spring streams results to the client as they become available
  • Enables Server-Sent Events (SSE) or chunked responses
  • Efficient for large result sets

Error Handling

Reactive Error Propagation

Errors in reactive streams propagate downstream automatically.
return gameRepo.findById(id)
    .switchIfEmpty(Mono.error(new GameNotFoundException(id)))  // Error signal
    .map(game -> game.hit())  // Not executed if error
    .flatMap(gameRepo::save);  // Not executed if error

Global Exception Handler

Location: infrastructure/web/error/GlobalExceptionHandler.java
@ExceptionHandler(GameNotFoundException.class)
@ResponseStatus(HttpStatus.NOT_FOUND)
public Mono<ErrorResponse> handleGameNotFound(GameNotFoundException ex) {
    return Mono.just(new ErrorResponse(
        "GAME_NOT_FOUND",
        ex.getMessage()
    ));
}

@ExceptionHandler(InvalidMoveException.class)
@ResponseStatus(HttpStatus.CONFLICT)
public Mono<ErrorResponse> handleInvalidMove(InvalidMoveException ex) {
    return Mono.just(new ErrorResponse(
        "INVALID_MOVE",
        ex.getMessage()
    ));
}
Exceptions thrown in reactive streams are caught by @ExceptionHandler methods, which return Mono<ErrorResponse> for consistent error responses.

Reactive Composition Patterns

Sequential Operations

Use flatMap() to chain dependent operations:
playerRepo.findById(playerId)
    .flatMap(player -> gameRepo.save(Game.newGame(player.id())))
    .flatMap(game -> notifyPlayer(game))  // Wait for each step

Parallel Operations

Use Mono.zip() to run operations in parallel:
Mono<Player> playerMono = playerRepo.findById(playerId);
Mono<Game> gameMono = gameRepo.findById(gameId);

Mono.zip(playerMono, gameMono)
    .map(tuple -> {
        Player player = tuple.getT1();
        Game game = tuple.getT2();
        // Both loaded in parallel!
        return buildResult(player, game);
    });

Conditional Operations

if (game.status() == GameStatus.IN_PROGRESS) {
    return Mono.empty();  // Nothing to do
} else {
    return updatePlayerStats(game);  // Update stats
}

Subscription and Execution

Critical Concept: Reactive streams are lazy. Nothing happens until you subscribe!
// This does NOT execute the query!
Mono<Game> gameMono = gameRepo.findById(id);

// Subscription triggers execution:
gameMono.subscribe(
    game -> System.out.println("Found: " + game),
    error -> System.err.println("Error: " + error),
    () -> System.out.println("Complete")
);
Spring WebFlux automatically subscribes when you return Mono<T> or Flux<T> from a controller. You don’t need to call .subscribe() manually.

Testing Reactive Code

Use StepVerifier from reactor-test:
@Test
void shouldCreateNewGame() {
    CreateGameCommand cmd = new CreateGameCommand("Alice");
    
    StepVerifier.create(useCase.execute(cmd))
        .assertNext(result -> {
            assertNotNull(result.gameId());
            assertNotNull(result.playerId());
            assertEquals("IN_PROGRESS", result.status());
        })
        .verifyComplete();
}

@Test
void shouldThrowErrorForInvalidPlayer() {
    CreateGameCommand cmd = new CreateGameCommand("");  // Invalid name
    
    StepVerifier.create(useCase.execute(cmd))
        .expectError(InvalidPlayerNameException.class)
        .verify();
}

Performance Benefits

Traditional Blocking Approach

// Each operation blocks a thread
Player player = playerRepo.findById(id);        // Blocks thread 1
Game game = gameRepo.save(Game.newGame(id));    // Blocks thread 1
notifyPlayer(game);                              // Blocks thread 1

// With 1000 concurrent requests, you need 1000 threads!

Reactive Non-Blocking Approach

// Operations are non-blocking
return playerRepo.findById(id)                  // Thread released
    .flatMap(p -> gameRepo.save(Game.newGame(p.id())))  // Thread released
    .flatMap(this::notifyPlayer);               // Thread released

// With 1000 concurrent requests, you might need only 10-20 threads!

Thread Pool Size

Blocking: 200+ threads
Reactive: 10-20 threads

Memory Usage

Blocking: ~200 MB (thread stacks)
Reactive: ~20 MB

Concurrent Requests

Blocking: Limited by threads
Reactive: Thousands easily

Latency

Blocking: Higher under load
Reactive: Consistent

Reactive Operators Summary

OperatorPurposeExample
map()Synchronous transformation.map(game -> toDTO(game))
flatMap()Async transformation (returns Mono/Flux).flatMap(id -> repo.findById(id))
filter()Conditionally emit items.filter(game -> game.status() == IN_PROGRESS)
switchIfEmpty()Fallback if empty.switchIfEmpty(Mono.error(new NotFoundException()))
defaultIfEmpty()Default value if empty.defaultIfEmpty(newPlayer)
then()Ignore value, signal completion.then(Mono.just("done"))
thenReturn()Ignore value, return new value.thenReturn(savedGame)
zip()Combine multiple MonosMono.zip(mono1, mono2)
Mono.error()Signal errorMono.error(new Exception())
Mono.empty()Complete without valueMono.empty()

Best Practices

When the transformation returns Mono<T> or Flux<T>, use flatMap(), not map().
// Right
.flatMap(id -> repo.findById(id))

// Wrong
.map(id -> repo.findById(id))  // Returns Mono<Mono<Game>>!
Always consider what happens if a stream is empty.
repo.findById(id)
    .switchIfEmpty(Mono.error(new NotFoundException()))  // Explicit handling
Never call .block() in reactive pipelines!
// Never do this!
Game game = gameRepo.findById(id).block();  // ❌ Defeats reactive purpose
Use StepVerifier to test reactive streams thoroughly.

Summary

The Blackjack API achieves high scalability through:
  1. Reactive repositories returning Mono<T> and Flux<T>
  2. Non-blocking database drivers (R2DBC for MySQL, Reactive MongoDB)
  3. Reactive use cases composing operations with flatMap(), map(), etc.
  4. Reactive controllers returning reactive types to Spring WebFlux
  5. Efficient resource usage with small thread pools handling high concurrency

Next Steps

Architecture Overview

See how reactive programming fits into the overall architecture

API Reference

Explore reactive API endpoints

Build docs developers (and LLMs) love