Sunday, 31 January 2016

Calculating Moving Averages with Spark MLLib

Update on Friday. 13rd of January, 2017

This post is about the implementation using the Java API. If you want to see the same example of calculating Moving Averages with Spark MLLib but using Scala, please check out this post.

Introduction

Since I began learning Spark, one of the things that given me the most pain is the Java API that wraps the Scala API used in Spark. So, I would like to share this problem I had using the Spark Machine Learning library in case is useful for anybody learning Spark. Also, it´s nice to take a look to the possibilities that the sliding windows give us when working on timed data.

The problem

As I wrote in my previous post, I created a new independent module to make studies on Stock data using Spark. One of the most interesting (and basic) studies one can make over Stock data, is to calculate the simple moving averages.

What is a simple moving average (SMA)?

If we take a window of time of length N, the SMA is:
Source Wikipedia: https://en.wikipedia.org/wiki/Moving_average
Wait, what? In plain English: For every element, we take the N precedent elements and calculate the average. The aim is to smooth the data in order to easily detect underlying patterns in the data.

One possible solution: Spark MLLib

Searching around in internet, I found that the Spark Machine Learning Libray has a sliding window operation over an RDD. Using this, it would be very easy to calculate the SMA as described in this StackOverflow answer.

import org.apache.spark.mllib.rdd.RDDFunctions._
sc.parallelize(1 to 100, 10)
.sliding(3)
.map(curSlice => (curSlice.sum / curSlice.size))
.collect()
view raw SMA.scala hosted with ❤ by GitHub

Note: The RDD must be sorted before appliying the sliding function!

The caveat is that the previous code is written in Scala. How would it look like in Java?


// Read a file containing the Stock Quotations
// You can also paralelize a collection of objects to create a RDD
JavaRDD<String> linesRDD = sc.textFile("some sample file containing stock prices");
// Convert the lines into our business objects
JavaRDD<StockQuotation> quotationsRDD = linesRDD.flatMap(new ConvertLineToStockQuotation());
// We need these two objects in order to use the MLLib RDDFunctions object
ClassTag<StockQuotation> classTag = scala.reflect.ClassManifestFactory.fromClass(StockQuotation.class);
RDD<StockQuotation> rdd = JavaRDD.toRDD(quotationsRDD);
// Instantiate a RDDFunctions object to work with
RDDFunctions<StockQuotation> rddFs = RDDFunctions.fromRDD(rdd, classTag);
// This applies the sliding function and return the (DATE,SMA) tuple
JavaPairRDD<Date, Double> smaPerDate = rddFs.sliding(slidingWindow).toJavaRDD().mapToPair(new MovingAvgByDateFunction());
List<Tuple2<Date, Double>> smaPerDateList = smaPerDate.collect();
view raw SMA.java hosted with ❤ by GitHub

That was painful! Moreover, you need to encapsulate the function calculating the value averages in a different class (apparently there is a serialization problem when working with anonymous function classes)

// Problem here is that the generic type I managed to get from the Scala
// RDD is Object (ouch!!)
public class MovingAvgByDateFunction implements PairFunction<Object,Date,Double> {
// Need to add the serialVersionUID to avoid serialization problems
private static final long serialVersionUID = 9220435667459839141L;
@Override
public Tuple2<Date, Double> call(Object t) throws Exception {
// Casting!!!
StockQuotation[] stocks = (StockQuotation[]) t;
List<StockQuotation> stockList = Arrays.asList(stocks);
Double result = stockList.stream().collect(Collectors.summingDouble(new ToDoubleFunction<StockQuotation>() {
@Override
public double applyAsDouble(StockQuotation value) {
return value.getValue();
}
}));
result = result / stockList.size();
return new Tuple2<Date, Double>(stockList.get(0).getTimestamp(),result);
}
}

So, how does the result of the calculation look like? I have taken 3 time series:

  • The original data for Mapfre (A Spanish insurance company - Ticker:MAP.MC) from 2014 until today.
  • The result of the calculation for SMA(30) days.
  • The result of the calculation for SMA(200) days.


The interpretation of the chart is pretty clear:

  • The price line (red) is rather chaotic.
  • The SMA(30) (blue) gives a better view on the price, without many spikes.
  • The SMA(200) (green) gives the background tendency, which was pretty much constant until the summer of 2014 (when the blue line crossed over it).


Summary

Although this way of doing things is a bit cumbersome, I think that this function give us a great deal of potential in doing the type of calculations that we are interested in. For instance:

  • Calculating fast (usually SMA(9)) and slow (usually SMA(30)) moving averages:
    • This is done to see where and how do they intersect each other to see if the market in changing in trending direction.
  • Calculating if the current prices are beating the yearly maximum values (this would be MAX(365)) or if they are breaking the yearly minimum values (MIN(365):
    • As Stock values have a relative strong momentum, if they begin doing this, they are very likely to continue in such direction.
Source code
You can find the calculation shown in the example above in this GitHub repository:
However, it is very like that it will differ from what is written here as I intend to get the data from Stokker (the micro-service gathering the Stock quotations from internet, as opposed to a text file).

Sadly, I still can not get everything working as I am having some class-loading issues when using Spark and Netflix Eureka together. Stay tuned for the solution!

Update on February 3rd 2016

Finally, I managed to solve the class-loading problem by excluding on dependency from Spark related to Commons Codec, which is already used by Feign to encode/decode the requests and responses. Having two of them, with different version caused the problem:
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>
view raw Pom.xml hosted with ❤ by GitHub

Also, I took the opportunity to increase other version numbers:

  • Spring-Boot to 1.3.2
  • Spring Framework Cloud to Brixton Milestone 4

3 comments:

  1. This comment has been removed by the author.

    ReplyDelete
  2. Can you show slidingWindow value/initialization?

    ReplyDelete
    Replies
    1. Hello,
      The only initialization needed for the window is to give a value to the rddFs.sliding(). In the examples above 30 and 200 days have been used.

      Cheers

      Delete