Sunday, 17 September 2017

Adapting Hive partitions to different data sizes

Summary

Hive's table partitioning allows the user to have better querying performance as it avoids costly scans over data that is not relevant for the user. This partitioning is based on the data structure to be found in HDFS, as partitions match with directories.

One quite common usage for partitions is time series, which can be modeled like this:

Our TimeSeries entity

Note
: The time-stamp has been decomposed in several columns that will form the partitions. Month and day are TinyInt in order to save space (we might end up having millions of records).

This can be easily partitioned by using this folder structure in HDFS, if we take a look we will see these directories (data shown for our table being partitioned by year and month):

hdfs dfs -du -h /user/temp/year=2017
396.9 K 396.9 K /user/temp/year=2017/month=1
396.8 K 396.8 K /user/temp/year=2017/month=10
396.7 K 396.7 K /user/temp/year=2017/month=11
396.6 K 396.6 K /user/temp/year=2017/month=12
397.2 K 397.2 K /user/temp/year=2017/month=2
396.7 K 396.7 K /user/temp/year=2017/month=3
396.8 K 396.8 K /user/temp/year=2017/month=4
397.0 K 397.0 K /user/temp/year=2017/month=5
397.0 K 397.0 K /user/temp/year=2017/month=6
396.9 K 396.9 K /user/temp/year=2017/month=7
396.7 K 396.7 K /user/temp/year=2017/month=8
396.7 K 396.7 K /user/temp/year=2017/month=9

As per the Hive documentation, the most optimal partition size is 2 Gb. What does it happen if you have defined a partitioning system that ends up in production with too big or too small partitions? Here some examples on how can you adjust your partitions using Spark.

Where can I test? Get a Cloudera Quickstart!

To begin with, you will need a Hadoop distribution running somewhere to test the code shown in this post. If you don´t have a proper cluster at hand, you can always create a test cluster by using the QuickStart Cloudera Docker Image. It is quite handy as you have just to pull it and fire it up:

# Pulls the image (4Gb!)
docker pull cloudera/quickstart
# Ports to be opened (most commonly used)
# - 8888 expose hue interface
# - 7180 expose cloudera manager
# - 80 expose cloudera examples
# - 8983 expose port of Web UI solr search
# - 50070 expose name node web ui interface
# - 50090 expose secondary name node
# - 50075 expose data node
# - 50030 expose job tracker
# - 50060 expose task trackers
# - 60010 expose hbase master status
# - 60030 expose hbase region server
# - 9095 expose hbase thrift server
# - 8020 expose hdfs port
# - 8088 expose job tracker port
# - 4040 expose port of spark
# - 18088 expose history server web interface
# ...
ports="-p 8888:8888 -p 7180:7180 -p 80:80 -p 4040:4040 -p 4041:4041 -p 4042:4042 -p 4043:4043 -p 9092:9092 -p 2181:2181 -p 8020:8020 -p 18088:18088 -p 10000:10000 -p 21050:21050 -p 50070:50070  -p 50075:50075  -p 50060:50060  -p 50030:50030 -p 50010:50010"
# Volume used to exchange stuff with the running container
localVolumeDir=/home/me/cloudera_exchange_dir
# Runs it, exposing ports for Hue, Cloudera Manager and a tutorial
containerId=`docker run --hostname=quickstart.cloudera -d \
-v $localVolumeDir:/volume
--privileged=true -t -i $ports cloudera/quickstart /usr/bin/docker-quickstart`
# Now you can start Cloudera Manager (from within the container)
# sudo su
# cd /home/cloudera/
# ./cloudera-manager
echo Started Cloudera Quickstart (containerId=$containerId)
# Kafka does not come with Cloudera Quickstart
# See https://kafka.apache.org/quickstart
# Download the binaries and execute this command:
# > bin/kafka-server-start.sh config/server.properties
# If you want to access Kafka from outside the container, you need to change this line in server.properties
# zookeeper.connect=localhost:2181
# And replace it with the Docker host public IP address
view raw script.sh hosted with ❤ by GitHub

Of course, it will not be very powerful or fast, but it will allow you to perform some tests using the different components in the stack (HDFS, Spark, Hive, etc.).

Create a test table and fill it with some data

Let's start by creating the database and table in Hive:

-- Create the table
create external table parquet_table (value BIGINT,series STRING)
PARTITIONED BY (year INT, month TINYINT, day TINYINT)
STORED AS PARQUET
LOCATION '/user/victor';
-- The resulting HDFS Parquet files will be stored in /user/victor
view raw TableDDL.sql hosted with ❤ by GitHub

Now, we will generate some dummy time-series to have some data to play with:

// Define a POJO class to hold the columns
case class MyRow(val day: Int, val month: Int, val year: Int, val series:String, val value:Int)
// Create about 500K records (yes, we assume 31 days months all year round :) )
var random = new java.util.Random()
val list = for (day <- 1 to 31;
month <- 1 to 12;
hour <- 0 to 23;
minute <- 0 to 59;
value = random.nextInt(1000))
yield MyRow(day,month,2017,value,s"$hour:$minute",value)
// Create a DataFrame and write it to HDFS
val df = sc.parallelize(list).toDF
df.write.parquet("/user/victor")
// Add the new table partition programatically
sqlContext.sql("alter table parquet_test.parquet_table" +
" add partition(year=2017) location '/user/victor/2017'")
view raw TableLoad.scala hosted with ❤ by GitHub

How does this data look partition-wise? Let's ask Hive about it:

show table extended like 'parquet_table' partition(year=2017)
/* Some rows returned...
tableName:parquet_table
location:hdfs://quickstart.cloudera:8020/user/victor/year=2017
totalNumberFiles:7
totalFileSize:723451
*/

OK, we are dealing with a really small amount of data in this example, but I am sure you get the picture. All the data is in the same folder, and you need to scan the table completely (the whole year of data) each time you want to get the data of a particular month.

Increase the partition granularity

With the previous setup, all 2017 data is stored in the same folder. If the user wants just to load data from February, the whole table must be scanned. That's fine as long as the data remains small but, with larger datasets, the performance will suffer heavily.

The solution? Add an additional partition level (month). The problem, apart from the modification in the table DDL, is that we have to reorganize the Parquet files in sub-folders in HDFS.

// Read the original data
val df = sqlContext.read.parquet("/user/victor")
// Rearrange the partitions by year and month
// and write back to a new location in HDFS
df.repartition($"day",$"month",$"year",$"series",$"value")
.write.partitionBy($"year",$"month")
.parquet("/user/temp")
// Recreate the table in its new form
sqlContext.sql("create external table parquet_table_2 (value INT,series STRING, day TINYINT)" +
" PARTITIONED BY (year INT,month TINYINT) " +
"STORED AS PARQUET LOCATION '/user/temp';")
// Instruct Hive to reload the partitions
sqlContext.sql("MSCK REPAIR TABLE parquet_table_2")

Note: The "msck repair" command makes really easy to add all new missing partitions. Otherwise you will have to add them manually, one by one, using "alter table add partition..."

The resulting in HDFS files and directories are these:

hdfs dfs -du -h /user/temp/year=2017
396.9 K 396.9 K /user/temp/year=2017/month=1
396.8 K 396.8 K /user/temp/year=2017/month=10
396.7 K 396.7 K /user/temp/year=2017/month=11
396.6 K 396.6 K /user/temp/year=2017/month=12
397.2 K 397.2 K /user/temp/year=2017/month=2
396.7 K 396.7 K /user/temp/year=2017/month=3
396.8 K 396.8 K /user/temp/year=2017/month=4
397.0 K 397.0 K /user/temp/year=2017/month=5
397.0 K 397.0 K /user/temp/year=2017/month=6
396.9 K 396.9 K /user/temp/year=2017/month=7
396.7 K 396.7 K /user/temp/year=2017/month=8
396.7 K 396.7 K /user/temp/year=2017/month=9

Reduce the partition granularity

Of course, if the partition size is too small, we end up spending as much time going recursively through partitions as scanning the whole table. In those cases, is much better to reduce the number of partitions and increase their size.

In this case, we can use Spark's coalesce method, as the final number of partitions is smaller than the original. In our example we will reduce the partitioning from (year,month) to (year) only.

// Read again the data partitioned by year and month
val df = sqlContext.read.parquet("/user/temp")
// Condense all in one partition and write again
df.coalesce(1).write.parquet("/user/temp3")

In all the examples shown, we are not compressing the data, which is a handy technique in order to save space and reduce I/O times. However, I hope you could get a broad picture on how can you get the right partition you need for getting the job done.

Resources

No comments:

Post a Comment