Back-Pressure
Back-pressure is a mechanism in reactive systems that allows the consumer of data to signal to the producer that it is unable to handle the current rate of data production. It is a way to prevent the consumer from being overwhelmed and to ensure that the system remains responsive and stable.
Reactor provides several strategies for handling back-pressure:
- Buffer: The
buffer
operator allows you to buffer a specified number of elements from the upstream publisher. If the consumer is slower than the producer, the buffer will fill up and apply back-pressure to the producer.
Flux.range(1, 1000)
.buffer(10)
.subscribe(System.out::println);
- Drop: The
onBackpressureDrop
operator drops the incoming elements if the consumer is not ready to receive them. This can be useful when you want to discard old data in favor of newer data.
Flux.range(1, 1000)
.onBackpressureDrop()
.subscribe(System.out::println);
- Latest: The
onBackpressureLatest
operator keeps only the latest element from the upstream publisher when the consumer is not ready. This ensures that the consumer always receives the most recent data.
Flux.range(1, 1000)
.onBackpressureLatest()
.subscribe(System.out::println);
- Error: The
onBackpressureError
operator terminates the sequence with an error signal if the consumer is not able to keep up with the producer. This can be used to fail fast and notify the consumer about the backpressure issue.
Flux.range(1, 1000)
.onBackpressureError()
.subscribe(
System.out::println,
error -> System.err.println("Back-pressure error: " + error)
);
Error Handling
Error handling is an important aspect of reactive programming. In Reactor, errors are represented as error signals that can be propagated through the reactive stream. Reactor provides several operators for handling errors:
- onError: The
onError
operator allows you to handle errors that occur in the reactive stream. You can provide a fallback value, perform error recovery, or rethrow the error.
Flux.range(1, 10)
.map(i -> {
if (i == 5) {
throw new RuntimeException("Error at element 5");
}
return i;
})
.onError(e -> System.err.println("Error occurred: " + e))
.subscribe(System.out::println);
- onErrorResume: The
onErrorResume
operator allows you to switch to an alternate sequence when an error occurs. This can be used for error recovery or providing fallback values.
Flux.range(1, 10)
.map(i -> {
if (i == 5) {
throw new RuntimeException("Error at element 5");
}
return i;
})
.onErrorResume(e -> Flux.just(-1))
.subscribe(System.out::println);
- onErrorMap: The
onErrorMap
operator allows you to transform an error signal into a different error or value signal. This can be used for error classification or providing more meaningful error messages.
Flux.range(1, 10)
.map(i -> {
if (i == 5) {
throw new RuntimeException("Error at element 5");
}
return i;
})
.onErrorMap(e -> new IllegalStateException("Mapped error: " + e))
.subscribe(
System.out::println,
error -> System.err.println("Error occurred: " + error)
);
- retry: The
retry
operator allows you to retry the sequence a specified number of times if an error occurs. This can be useful for handling transient errors or network failures.
Flux.range(1, 10)
.map(i -> {
if (i == 5) {
throw new RuntimeException("Error at element 5");
}
return i;
})
.retry(2)
.subscribe(
System.out::println,
error -> System.err.println("Error occurred: " + error)
);