This is the first of a series of posts on how to develop data-driven micro-services with Spring Cloud Dataflow (SCDF from now on). For now, we will see what is the approach proposed by this framework and how to build locally the basic components: Source, Sink and Processor.
Also, if you are familiar with Spring, we will take a look to the already-made components that are available for you to use so you don´t have to reinvent the wheel.
Contents
- What is Spring Cloud DataFlow?
- Introduction to the API: A simple Producer (Source), Consumer (sink) setup.
- Prerequisites: Kafka and Zookeeper.
- Coding a simple Source: HTTP Poller that retrieves live stock prices.
- Coding a simple Sink that store the results in a file.
- Writing an additional Batch Source.
- Summary & Resources.
What is Spring Cloud Dataflow?
The idea behind SCDF is to go another step further in the abstraction race: With Spring and the concept of Inversion of Control (IoC) paradigm, you let the framework invoke your Beans (components) which perform a small particular task.
Now, the idea is to have, instead of components:
- Fully independent micro-services.
- Running in completely independent JVMs (or even hosts).
- Communicating using a messaging middleware (for the moment, either Kafka or Rabbit).
In order to get started, we will build an example currently used in this blog, in a past Spring Integration post. That is:
- A producer that periodically gathers some live information from the Web: Stock prices.
- A consumer that will store them, for instance, in a file.
As said before, the idea now is to achieve even higher decoupling. as both components are independent micro-services, and know nothing from each other.
Prerequisites: Kafka and Zookeeper
You need Kafka for delivering the messaged from and to our microservices, and Kafka needs Zookeeper to organize itself.
For the sake of simplicity, we are going to get a local Kafka installation that comes with Zookeeper (please note that this is not the usual production setup!).
- First, get a local Kafka installation, here.(There are Windows .bat files under /bin/Windows)
- Run Zookeeper and a Kafka server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Launch Zookeeper | |
$ ./bin/zookeeper-server-start ./config/zookeeper.properties | |
# Launch Kafka | |
$ ./bin/kafka-server-start ./config/server.properties |
Coding a simple Source: HTTP Poller that retrieves live stock prices
There are a couple of concepts to understand in this API: Binders and Component Types.
Binders: These are the technologies used for transporting the messages. Currently either Kafka or Rabbit. Therefore, in your pom.xml you need to import either dependency.
Component Types: There are three component types in SCDF:
- Sources: They introduce events into our system (get HTTP requests, read files, etc.). They listen by default to a Spring Integration channel called "output".
- Sinks: The take events from our system and produce some results(write files, databases, etc.) and they always listen to a channel called "input".
- Processors: They are a mixture of both Sources and Sinks.
So, basically you are going to write Spring Integration applications that read or write to these channels. That´s all.
And the way of working is pretty much always the same: A couple of Java classes with your business logic and importing the proper bindings, a properties file and some Spring Integration flows:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Here we are defining our comoponents as a producer | |
@EnableBinding(Source.class) | |
// A spring integration flow to modify a little bit the trigger starter | |
// provided by SCDF. I needed that because I had to put a splitter after the HTTP | |
// request before sending the quotes to Kafka | |
@ImportResource("classpath:META-INF/liveQuotations.xml") | |
public class StockProducerConfig | |
{ | |
// This is my business. This method will be called according to the trigger | |
// setup given in the properties file | |
@InboundChannelAdapter("splitChannel") | |
public List<StockQuotation> publishLatestPrices() { | |
// Omitted for brevity, check the full code at: | |
// https://github.com/victor-ferrer/scdf_stokker/tree/master/producer | |
} | |
} |
These are the properties for the Producer, note the URL to get the stocks and the Kafka topic to write the messages:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is the Kafka output topic used in this example | |
spring.cloud.stream.bindings.output.destination=stocktopic_bulk | |
# URL to retrieve the stock prices from | |
stocks.url=https://docs.google.com/spreadsheets/d/16CrPHjsp8MBXfK8hEYsSDQHHHp_kFCi8vuidwTv02z0/pub?output=csv | |
# Tomcat port | |
server.port=8081 | |
# We want the collection every 15 seconds | |
spring.integration.poller.fixed-delay=15000 |
Then, a convenient way to test your Source process is to use a simple Kafka Console Consumer (provided with the Kafka installation in the bin folder):
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ ./kafka-console-consumer --topic stocktopic_bulk --zookeeper 127.0.0.1:2181 |
If everything is OK, you should see a stream of raw Java Stock Quotations being written to your console.
Create a simple Sink that store the results in a file
In this case, we are going to use the SCDF-specific Starters Initializr page in order to create our project.
Just choose the file sink and Kafka Binder dependencies and you will get an almost-working file sink:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@SpringBootApplication | |
@EnableBinding(Sink.class) | |
@Import(FileSinkConfiguration.class) // Just import this Spring Configuration class | |
public class FileWriterApplication { | |
public static void main(String[] args) { | |
SpringApplication.run(FileWriterApplication.class, args); | |
}} |
OK, that was an innocent lie. Actually I needed a couple of lines more as the starter provided only works with Strings, Files, ByteArrays and IntputStreams and I wanted to send my domain object: StockQuotation.
So what I did is to modify the class provided by the starter in order to add a Spring Integration object-to-string-transformer before passing the message to the MessageHandler.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<!-- Converts the list of quotations in one message per quotation --> | |
<int:object-to-string-transformer input-channel="input" output-channel="stringChannel"> | |
</int:object-to-string-transformer> | |
<!-- Channel that the FileWritingMessageHandler will listen instead of "input" --> | |
<int:channel id="stringChannel"></int:channel> |
Then in the main class, I just imported the new Spring Integration file:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@SpringBootApplication | |
@Import(FileSinkConfiguration.class) | |
@ImportResource("classpath:META-INF/entryConverter.xml") | |
public class FileWriterApplication { |
And these were the properties used:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is the Kafka input topic used in this example | |
spring.cloud.stream.bindings.input.destination=stocktopic_bulk | |
# File properties | |
file.directory=C:/test | |
file.mode=APPEND |
Again, you can test your file sink in complete isolation, this time using a simple Kafka console producer:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ ./kafka-console-producer --topic stocktopic_bulk --broker-list 127.0.0.1:9092 |
Just start typing text in the console and each line will be stored in the file specified above.
And that´s it! Just by adding the starter dependency and the application.properties, you are using a component that connects to Kafka, reads the messages and dumps them to a file.
OK, it is neither very sophisticated nor very useful, but think of all sinks that you can reuse by just changing one line of code and a properties file:
- File
- SFTP/FTP
- Databases: JDBC, Cassandra
- HDFS
- Messaging frameworks: Gemfire, Rabbit MQ,..
Writing an additional Batch Source
You can think now: Yeah right, but what have you accomplished here that was not already in the original Stokker? And also the set-up is much more complex (Kafka, several JVMs, etc.)!
You can think now: Yeah right, but what have you accomplished here that was not already in the original Stokker? And also the set-up is much more complex (Kafka, several JVMs, etc.)!
That´s a fair point. The main accomplishment here is decoupling, as we have two process that do their stuff and know nothing about the rest of the world.
Let´s think of an example: Now, I got these two brand new requirements:
- I want to recover "historical stock quotations" apart from the "live ones".
- I want them to end in the same file/database as the live ones.
- I want this new process to be one-off and on-demand.
Now, instead of trashing up the already-in-production producer, we are going to code a new module that fulfills these requirements, write these Stock Quotations in Kafka and lets the Consumer deal with them. That´s the benefit of decoupling.
So, how does a Processor looks like? Actually is just the combination of a Sink and a Source which means that, by default, it will listen for Kafka messages in input and whatever message written to output will be sent to Kafka as well.
As mentioned before, we are going to use a ready made component: HTTP Processor and just add an HTTP Inbound Channel Adapter so we can trigger the process by just curling an URL:
Now, a little bit of Spring Integration magic to map those parameters to headers in the message passed to our HTTPProcessor (which is listening to channel input)
And finally, the customary import and properties.file (really, we did ALL the code go??).
Note that the URL expression is using the headers that we provided before.
So, with this totally independent module, we gave the system the ability to import historical stock quotations by just executing this curl, thus fulfilling the new requirements, and without changing any other component (or breaking it).
So, how does a Processor looks like? Actually is just the combination of a Sink and a Source which means that, by default, it will listen for Kafka messages in input and whatever message written to output will be sent to Kafka as well.
As mentioned before, we are going to use a ready made component: HTTP Processor and just add an HTTP Inbound Channel Adapter so we can trigger the process by just curling an URL:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# We want to get Apple (AAPL) quotations sine January 1st 2000 | |
$ curl -X GET http://localhost:8082/stock/AAPL/startYear/2000/startMonth/1/startDay/1 |
Now, a little bit of Spring Integration magic to map those parameters to headers in the message passed to our HTTPProcessor (which is listening to channel input)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<int-http:inbound-channel-adapter channel="input" | |
path="/stock/{stock}/startYear/{startYear}/startMonth/{startMonth}/startDay/{startDay}" | |
supported-methods="GET" > | |
<int-http:header name="ticker" expression="#pathVariables.stock"/> | |
<int-http:header name="startYear" expression="#pathVariables.startYear"/> | |
<int-http:header name="startMonth" expression="#pathVariables.startMonth"/> | |
<int-http:header name="startDay" expression="#pathVariables.startDay"/> | |
</int-http:inbound-channel-adapter> |
And finally, the customary import and properties.file (really, we did ALL the code go??).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@SpringBootApplication | |
@Import(HttpclientProcessorConfiguration.class) | |
@ImportResource("classpath:META-INF/httpTrigger.xml") | |
public class HistoricalProducerApplication { | |
public static void main(String[] args) { | |
SpringApplication.run(HistoricalProducerApplication.class, args); | |
} | |
} |
Note that the URL expression is using the headers that we provided before.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Tomcat server port | |
server.port=8082 | |
# This is the kafka output used when running "manually" | |
spring.cloud.stream.bindings.output.destination=stocktopic_bulk | |
#The kind of http method to use. (HttpMethod, default: <none>, possible values: GET,HEAD,POST,PUT,PATCH,DELETE,OPTIONS,TRACE) | |
httpclient.http-method=GET | |
#A SpEL expression against incoming message to determine the URL to use. (Expression, default: <none>) | |
httpclient.url-expression="http://ichart.finance.yahoo.com/table.csv?s=" + headers.get('ticker') + "&a=" + headers.get('startMonth') + "&b=" + headers.get('startDay') + "&c=" + headers.get('startYear') + "&g=ds" |
So, with this totally independent module, we gave the system the ability to import historical stock quotations by just executing this curl, thus fulfilling the new requirements, and without changing any other component (or breaking it).
Summary
We have achieved our aim of decoupling a producer and a consumer and run them in separate JVMs (or even hosts) while relaying on Kafka. Also, introducing new producers is much easier than before.
However, there is still a lot of manual work to do (start and stop the processes, define the topics, etc.) and we are not profiting from some of the Kafka benefits (such as partitions).
So, in the next post, we will see how can we benefit from the usage of the Spring Cloud Dataflow server implementations, that should ease a lot the deploying and scaling of our distributed systems while running in different platforms (locally, in Cloud Foundry, Yarn, etc.).
Finally, I am developing new Sinks that should do more interesting stuff than just writing a text file.
Stay tuned!
Resources
You can find the sample code in this GiHub repository.
Spring Webinar: Data Microservices with Spring Cloud Dataflow
Reference documentation: Stream Applications reference documentation
We have achieved our aim of decoupling a producer and a consumer and run them in separate JVMs (or even hosts) while relaying on Kafka. Also, introducing new producers is much easier than before.
However, there is still a lot of manual work to do (start and stop the processes, define the topics, etc.) and we are not profiting from some of the Kafka benefits (such as partitions).
So, in the next post, we will see how can we benefit from the usage of the Spring Cloud Dataflow server implementations, that should ease a lot the deploying and scaling of our distributed systems while running in different platforms (locally, in Cloud Foundry, Yarn, etc.).
Finally, I am developing new Sinks that should do more interesting stuff than just writing a text file.
Stay tuned!
Resources
You can find the sample code in this GiHub repository.
Spring Webinar: Data Microservices with Spring Cloud Dataflow
Reference documentation: Stream Applications reference documentation
No comments:
Post a Comment