What did I know last week about reactive programming? Nothing. That´s a fact. However, I started reading about and I find the topic really really interesting.
Unfortunately, there is not much documentation around (or at least, I had a hard time finding it) so I would like to share some resources with you along with an example that I have adapted to perform like our module Stokker (the module that used to fetch stock quotations from Yahoo Finance for us using Spring Integration).
What is Reactive Programming?
The best definition I could find was this amazing GitHub gist written by @andrestaltz that summarizes all the theoretical definitions to this line:
Reactive programming is programming with asynchronous data streams.
I really recommend to you its reading, but we can summarize it in:
- You will work with immutable event streams to which you will have to subscribe.
- Some operations will be provided to you, so you can transform these streams into new ones by filtering them, merging them o create new events from the previous ones.
The best way to understand this is, is to represent these operations graphically. For that purpose, we have this really cool web application implemented using JavaScript and RxObservables: RxMarbles (a must see!). For instance:
Two event streams: One with numbers, the other with each element multiplied by 10 |
The first line, represents an initial event stream. In the middle you can see the mapping function that is applied to every event in the stream and finally, the second stream with the results:
Note: Pay attention to the immutability here: Any transformation you apply to a stream creates a new one!
Let´s check out another transformation:
Two event streams: One with numbers, other with the average of the elements received |
Look who is here! Our old friend, the moving average. Whenever a new event arrives in the original stream, a new value for the average will be calculated in the resulting stream.
An example with Spring + Reactor
What is Reactor? Well the official definition provided in the project web page is:
Reactor is a second-generation Reactive library for building non-blocking applications on the JVM based on the Reactive Streams Specification.This picture represents graphically, the project ecosystem:
![]() |
As you can see here, one of the main advertising points is the ease of integration with Spring |
This is the code of the example trying to implement part of the Stokker functionality using a Reactive approach. This is the main program class:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is a regular Spring Boot application | |
@SpringBootApplication | |
public class RxStokkerApplication implements CommandLineRunner{ | |
@Bean | |
EventBus createEventBus() { | |
return EventBus.create(); | |
} | |
@Autowired | |
private StockPublisher publisher; | |
@Autowired | |
private StockConsumer consumer; | |
@Override | |
public void run(String... args) throws Exception | |
{ | |
// Request some stock quotes | |
publisher.publishQuotes(Arrays.asList("T","REE.MC", "AAPL", "OHI", "MAP.MC", "SAN.MC")); | |
//Shutdown and clean async resources | |
consumer.getSink().onComplete(); | |
} | |
} |
The publisher (written and injected as a Spring Bean):
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Service | |
public class StockPublisher { | |
// This will be used to distributed the events produced here | |
@Autowired | |
EventBus eventBus; | |
// This bean encapsulates the slow I/O requests (HTTP) | |
@Autowired | |
private YahooFeeder feeder; | |
// Convenience bean to transform the string into POJOs | |
@Autowired | |
private CSVStockQuotationConverter converter; | |
public void publishQuotes(List<String> tickers) { | |
Flux.fromIterable(tickers) | |
// Get the quotes in a separate thread so I/O does not block us | |
.flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s))) | |
// Convert each list of raw quotes string in a new Flux<String> | |
.flatMap(list -> Flux.fromIterable(list)) | |
// Convert the string to POJOs and notify them | |
.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock)) | |
.consume(quotation -> eventBus.notify("quotes", Event.wrap(quotation))); | |
} | |
} |
And finally the consumer. We will take the POJOs that have been retrieved from the net and stored them in a JPA repository for later usage.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// We will subscribe to certain events of this bus | |
@Autowired | |
private EventBus eventBus; | |
// a JPA repository to store the POJOs | |
@Autowired | |
private StockQuotationRepository stockRepo; | |
private WorkQueueProcessor<Event<StockQuotation>> sink; | |
@PostConstruct | |
public void init() | |
{ | |
// Create the round robing processor and subscribe it to the source Flux | |
sink = WorkQueueProcessor.create(); | |
eventBus.on($("quotes"), sink); | |
// Extract the POJO from the event | |
sink.map(Event::getData) | |
// Each save in the DB is executed in a separated thread | |
.flatMap(s -> Mono.fromCallable(() -> stockRepo.save(s))) | |
// Log summary results | |
.count() | |
.consume(i -> System.out.println(new Date() + " Processed " + i + " quotes")); | |
} |
Note: Please pay attention to the fact that both slow operations here (requesting the stocks doing a REST request and storing the POJO in a JPA repository) are executed in a separated thread (see Mono.fromCallable(...). In this way, we prevent the flow from blocking and greatly increase the processing throughput.
That´s it! That was my first try with Reactor and Spring, the basic functionality is achieved (streaming the requests done by the producer to the consumer without blocking each other), however:
- The IO library really deserves a couple of days of trial as it seems very interesting (non blocking I/O, HTTP, Web Sockets, UDP, ...)
- I´d like to understand the concepts of Promises and Backpressure.
You can find the code in this GitHub repository, feel free to contribute or comment in this learning process!
No comments:
Post a Comment