As we saw in the previous post, the whole idea of reactive programming floats around two concepts: Data comes in asynchronous streams and those streams are immutable, so any modification results in new streams being created.
These facts makes the code not very intuitive and it is even worse when it comes to testing (even more if we are interacting with external resources, like a Web Service).
To help in the task, Spring and Reactor come with some handy tools that can be applied to simplify the development and testing.
First, let´s take a look at the scenario being tested:
![]() |
Sequence diagram of the "production" code |
Basically, we are requesting the StockPublisher to get some quotes and, in this case, they are retrieved from a Web Service hosted by Yahoo. The component tasked with getting the quotes uses a RestTemplate that finally makes the REST GET call.
REST testing with Spring´s MockRestServiceServer
As you can see in the diagram above, we are invoking a web service that might return different results (or might even not be available) during the testing time. In order to eliminate such variability, we are going to use this handy class provided by the Spring Framework: MockRestServiceServer.
![]() |
Sequence diagram of the "test" code |
This is the test code:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Eventhough this Spring Bean is not referenced directly by the | |
// test, once you link it with the MockRestServerServer; all | |
// calls made by it will be intercepted and checked. | |
@Autowired | |
private RestTemplate restTemplate; | |
@Autowired | |
private StockPublisher publisher; | |
@Test | |
public void testPublisher() throws IOException { | |
String stock = "T"; | |
// Link the Rest Template bean with our MockRestServer | |
MockRestServiceServer mockedServer = MockRestServiceServer.createServer(restTemplate); | |
// Define which request should we expect.. | |
mockedServer.expect(requestTo(String.format(YahooFeeder.YAHOO_URL,stock))) | |
// ... and the response that the Mock will provide (read from a text file) | |
.andRespond(withSuccess(Files.readAllBytes(sampleFilePath), MediaType.TEXT_HTML)); | |
// Get the target flux and create a logger flux from it | |
Flux<StockQuotation> flux = publisher.getQuotes(Arrays.asList(stock)).log(); | |
// Reactor class that will help executing assertions over our tested flux | |
TestSubscriber<StockQuotation> subscriber = new TestSubscriber<>(); | |
subscriber.bindTo(flux) | |
// Block until onComplete() | |
.await() | |
// Sample period contains 36 entries | |
.assertValueCount(36) | |
.assertComplete(); | |
// Verify that all the REST operations are performed according our specs | |
mockedServer.verify(); | |
} |
I don´t want to go too deep on the MockRestServiceServer, but what we do with it is:
- Specify which invocations are we going to perform (HTTP method, URL, etc.)
- What we should expect in return (headers, HTTP statuses, etc.)
- We can even provide the response, so no invocation to the target URL is ever done!
Reactive Junits with TestSubscriber
Now is time to talk about how to perform tests on our Flux/Mono streams using the TestSubscriber API.
As you can in the code above, we can subscribe to the Flux being tested and perform a large number of test assertions over it:
- Whether it has finished with an error or complete signal.
- How many values has the stream received? Which ones exactly?
- Also the TestSubscriber is a stream itself, so you can use it to launch new asynchronous events with the common onNext(..), onError(..), etc. methods.
Lets see an example of a test with three events correctly processed and an error raised while processing the fourth element:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// The flux will feed from these values | |
Mono<Integer> testMono = Flux.fromArray(new String[]{"1","2","foo","4","5"}) | |
// All events will be logged | |
.log() | |
// Each incoming string will be parsed to an int | |
.map(numberStr -> Integer.parseInt(numberStr)) | |
// Then, all will be summed | |
.reduce((a,b) -> a + b); | |
TestSubscriber<Integer> subscriber = new TestSubscriber<>(); | |
// This will fail as the flux ends with error | |
subscriber.bindTo(testMono).await().assertComplete().assertValues(12); |
Note: To ease up debugging and testing, the Reactor API comes with a log() method that creates a new Flux (remember, they are immutable) with the particular behavior that logs all the events received. This is the log trace:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@4bbfb90a) | |
INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.MonoAggregate$AggregateSubscriber@5e853265) | |
INFO reactor.core.publisher.FluxLog - request(unbounded) | |
INFO reactor.core.publisher.FluxLog - request(unbounded) | |
INFO reactor.core.publisher.FluxLog - onNext(1) | |
INFO reactor.core.publisher.FluxLog - onNext(2) | |
INFO reactor.core.publisher.FluxLog - onNext(foo) | |
INFO reactor.core.publisher.FluxLog - cancel() | |
ERROR reactor.core.publisher.FluxLog - onError(java.lang.NumberFormatException: For input string: "foo") |
Now, let´s give a fallback value of 0, if an error is thrown when parsing the String:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// The flux will feed from these values | |
Mono<Integer> testMono = Flux.fromArray(new String[]{"1","2","foo","4","5"}) | |
// All events will be logged | |
.log() | |
// Each incoming string will be parsed to an int | |
.map(numberStr -> Integer.parseInt(numberStr)) | |
// If something happens in the previous operation 0 will be returned | |
.onErrorReturn(0) | |
// Then, all will be summed | |
.reduce((a,b) -> a + b); | |
TestSubscriber<Integer> subscriber = new TestSubscriber<>(); | |
// Will this work now? | |
subscriber.bindTo(testMono).await().assertComplete().assertValues(12); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@5d76b067) | |
INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.MonoAggregate$AggregateSubscriber@614ddd49) | |
INFO reactor.core.publisher.FluxLog - request(unbounded) | |
INFO reactor.core.publisher.FluxLog - request(unbounded) | |
INFO reactor.core.publisher.FluxLog - onNext(1) | |
INFO reactor.core.publisher.FluxLog - onNext(2) | |
INFO reactor.core.publisher.FluxLog - onNext(foo) | |
// After handling the error, the subscriptions are cancelled | |
INFO reactor.core.publisher.FluxLog - cancel() | |
INFO reactor.core.publisher.FluxLog - cancel() | |
// This is the reduce phase | |
INFO reactor.core.publisher.FluxLog - onNext(3) | |
INFO reactor.core.publisher.FluxLog - onComplete() |
Why is the test still failing? Why is it yielding 3 instead of 12? Let´s see why.
Error handling
The API policy on errors is that they are terminal events. That is, a stream is terminated when a "Complete" or an "Error" signal is received.
Therefore, the only methods I have seen so far to handle errors are:
![]() |
Credits to the Reactor Flux API docs. |
![]() |
Credits to the Reactor Flux API docs. |
However, I am missing a method that allows handling discrete errors within the Flux without terminating it. Let´s take a look to the method being tested in the beginning of the post (and shown in the initial diagram):
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public Flux<StockQuotation> getQuotes(List<String> tickers) | |
{ | |
return Flux.fromIterable(tickers) | |
// Get the quotes in a separate thread | |
.flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s))) | |
// Convert each list of raw quotes string in a new Flux<String> | |
.flatMap(list -> Flux.fromIterable(list)) | |
// Convert the string to POJOs | |
.flatMap(x -> | |
{ | |
// Flatmap merges the results of all transformations | |
// (mappings) in the output flux. So, in order to | |
// prevent the IllegalArgumentException from breaking the | |
// processing, we catch it and return an empty Flux | |
try { | |
return Flux.just(converter.convertHistoricalCSVToStockQuotation(x)); | |
} | |
catch (IllegalArgumentException ex) | |
{ | |
return Flux.empty(); | |
}}); | |
} | |
} |
As you can see we need a rather ugly piece that:
- Catches the exception
- Returns an empty Flux that will be merged with the rest of mapping results.
Summary
I´m still exploring the API to see if there is a better way to handle the errors (I even opened a ticket in the Reactor project issue tracker), and I have questions for you:
I´m still exploring the API to see if there is a better way to handle the errors (I even opened a ticket in the Reactor project issue tracker), and I have questions for you:
- Do you have any feedback using Reactor? Would you accomplish this in other way?
- Do you feel that is mature enough?
- Do you use any of the other modules (I/O, the EventBus, etc.)
No comments:
Post a Comment