Hive's table partitioning allows the user to have better querying performance as it avoids costly scans over data that is not relevant for the user. This partitioning is based on the data structure to be found in HDFS, as partitions match with directories.
One quite common usage for partitions is time series, which can be modeled like this:
![]() |
Our TimeSeries entity |
Note: The time-stamp has been decomposed in several columns that will form the partitions. Month and day are TinyInt in order to save space (we might end up having millions of records).
This can be easily partitioned by using this folder structure in HDFS, if we take a look we will see these directories (data shown for our table being partitioned by year and month):
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
hdfs dfs -du -h /user/temp/year=2017 | |
396.9 K 396.9 K /user/temp/year=2017/month=1 | |
396.8 K 396.8 K /user/temp/year=2017/month=10 | |
396.7 K 396.7 K /user/temp/year=2017/month=11 | |
396.6 K 396.6 K /user/temp/year=2017/month=12 | |
397.2 K 397.2 K /user/temp/year=2017/month=2 | |
396.7 K 396.7 K /user/temp/year=2017/month=3 | |
396.8 K 396.8 K /user/temp/year=2017/month=4 | |
397.0 K 397.0 K /user/temp/year=2017/month=5 | |
397.0 K 397.0 K /user/temp/year=2017/month=6 | |
396.9 K 396.9 K /user/temp/year=2017/month=7 | |
396.7 K 396.7 K /user/temp/year=2017/month=8 | |
396.7 K 396.7 K /user/temp/year=2017/month=9 |
As per the Hive documentation, the most optimal partition size is 2 Gb. What does it happen if you have defined a partitioning system that ends up in production with too big or too small partitions? Here some examples on how can you adjust your partitions using Spark.
Where can I test? Get a Cloudera Quickstart!
To begin with, you will need a Hadoop distribution running somewhere to test the code shown in this post. If you don´t have a proper cluster at hand, you can always create a test cluster by using the QuickStart Cloudera Docker Image. It is quite handy as you have just to pull it and fire it up:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Pulls the image (4Gb!) | |
docker pull cloudera/quickstart | |
# Ports to be opened (most commonly used) | |
# - 8888 expose hue interface | |
# - 7180 expose cloudera manager | |
# - 80 expose cloudera examples | |
# - 8983 expose port of Web UI solr search | |
# - 50070 expose name node web ui interface | |
# - 50090 expose secondary name node | |
# - 50075 expose data node | |
# - 50030 expose job tracker | |
# - 50060 expose task trackers | |
# - 60010 expose hbase master status | |
# - 60030 expose hbase region server | |
# - 9095 expose hbase thrift server | |
# - 8020 expose hdfs port | |
# - 8088 expose job tracker port | |
# - 4040 expose port of spark | |
# - 18088 expose history server web interface | |
# ... | |
ports="-p 8888:8888 -p 7180:7180 -p 80:80 -p 4040:4040 -p 4041:4041 -p 4042:4042 -p 4043:4043 -p 9092:9092 -p 2181:2181 -p 8020:8020 -p 18088:18088 -p 10000:10000 -p 21050:21050 -p 50070:50070 -p 50075:50075 -p 50060:50060 -p 50030:50030 -p 50010:50010" | |
# Volume used to exchange stuff with the running container | |
localVolumeDir=/home/me/cloudera_exchange_dir | |
# Runs it, exposing ports for Hue, Cloudera Manager and a tutorial | |
containerId=`docker run --hostname=quickstart.cloudera -d \ | |
-v $localVolumeDir:/volume | |
--privileged=true -t -i $ports cloudera/quickstart /usr/bin/docker-quickstart` | |
# Now you can start Cloudera Manager (from within the container) | |
# sudo su | |
# cd /home/cloudera/ | |
# ./cloudera-manager | |
echo Started Cloudera Quickstart (containerId=$containerId) | |
# Kafka does not come with Cloudera Quickstart | |
# See https://kafka.apache.org/quickstart | |
# Download the binaries and execute this command: | |
# > bin/kafka-server-start.sh config/server.properties | |
# If you want to access Kafka from outside the container, you need to change this line in server.properties | |
# zookeeper.connect=localhost:2181 | |
# And replace it with the Docker host public IP address | |
Of course, it will not be very powerful or fast, but it will allow you to perform some tests using the different components in the stack (HDFS, Spark, Hive, etc.).
Create a test table and fill it with some data
Let's start by creating the database and table in Hive:
Now, we will generate some dummy time-series to have some data to play with:
Let's start by creating the database and table in Hive:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Create the table | |
create external table parquet_table (value BIGINT,series STRING) | |
PARTITIONED BY (year INT, month TINYINT, day TINYINT) | |
STORED AS PARQUET | |
LOCATION '/user/victor'; | |
-- The resulting HDFS Parquet files will be stored in /user/victor |
Now, we will generate some dummy time-series to have some data to play with:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Define a POJO class to hold the columns | |
case class MyRow(val day: Int, val month: Int, val year: Int, val series:String, val value:Int) | |
// Create about 500K records (yes, we assume 31 days months all year round :) ) | |
var random = new java.util.Random() | |
val list = for (day <- 1 to 31; | |
month <- 1 to 12; | |
hour <- 0 to 23; | |
minute <- 0 to 59; | |
value = random.nextInt(1000)) | |
yield MyRow(day,month,2017,value,s"$hour:$minute",value) | |
// Create a DataFrame and write it to HDFS | |
val df = sc.parallelize(list).toDF | |
df.write.parquet("/user/victor") | |
// Add the new table partition programatically | |
sqlContext.sql("alter table parquet_test.parquet_table" + | |
" add partition(year=2017) location '/user/victor/2017'") |
How does this data look partition-wise? Let's ask Hive about it:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
show table extended like 'parquet_table' partition(year=2017) | |
/* Some rows returned... | |
tableName:parquet_table | |
location:hdfs://quickstart.cloudera:8020/user/victor/year=2017 | |
totalNumberFiles:7 | |
totalFileSize:723451 | |
*/ |
OK, we are dealing with a really small amount of data in this example, but I am sure you get the picture. All the data is in the same folder, and you need to scan the table completely (the whole year of data) each time you want to get the data of a particular month.
Increase the partition granularity
With the previous setup, all 2017 data is stored in the same folder. If the user wants just to load data from February, the whole table must be scanned. That's fine as long as the data remains small but, with larger datasets, the performance will suffer heavily.
The solution? Add an additional partition level (month). The problem, apart from the modification in the table DDL, is that we have to reorganize the Parquet files in sub-folders in HDFS.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Read the original data | |
val df = sqlContext.read.parquet("/user/victor") | |
// Rearrange the partitions by year and month | |
// and write back to a new location in HDFS | |
df.repartition($"day",$"month",$"year",$"series",$"value") | |
.write.partitionBy($"year",$"month") | |
.parquet("/user/temp") | |
// Recreate the table in its new form | |
sqlContext.sql("create external table parquet_table_2 (value INT,series STRING, day TINYINT)" + | |
" PARTITIONED BY (year INT,month TINYINT) " + | |
"STORED AS PARQUET LOCATION '/user/temp';") | |
// Instruct Hive to reload the partitions | |
sqlContext.sql("MSCK REPAIR TABLE parquet_table_2") |
Note: The "msck repair" command makes really easy to add all new missing partitions. Otherwise you will have to add them manually, one by one, using "alter table add partition..."
The resulting in HDFS files and directories are these:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
hdfs dfs -du -h /user/temp/year=2017 | |
396.9 K 396.9 K /user/temp/year=2017/month=1 | |
396.8 K 396.8 K /user/temp/year=2017/month=10 | |
396.7 K 396.7 K /user/temp/year=2017/month=11 | |
396.6 K 396.6 K /user/temp/year=2017/month=12 | |
397.2 K 397.2 K /user/temp/year=2017/month=2 | |
396.7 K 396.7 K /user/temp/year=2017/month=3 | |
396.8 K 396.8 K /user/temp/year=2017/month=4 | |
397.0 K 397.0 K /user/temp/year=2017/month=5 | |
397.0 K 397.0 K /user/temp/year=2017/month=6 | |
396.9 K 396.9 K /user/temp/year=2017/month=7 | |
396.7 K 396.7 K /user/temp/year=2017/month=8 | |
396.7 K 396.7 K /user/temp/year=2017/month=9 |
Reduce the partition granularity
Of course, if the partition size is too small, we end up spending as much time going recursively through partitions as scanning the whole table. In those cases, is much better to reduce the number of partitions and increase their size.
In this case, we can use Spark's coalesce method, as the final number of partitions is smaller than the original. In our example we will reduce the partitioning from (year,month) to (year) only.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Read again the data partitioned by year and month | |
val df = sqlContext.read.parquet("/user/temp") | |
// Condense all in one partition and write again | |
df.coalesce(1).write.parquet("/user/temp3") |
In all the examples shown, we are not compressing the data, which is a handy technique in order to save space and reduce I/O times. However, I hope you could get a broad picture on how can you get the right partition you need for getting the job done.
Resources
No comments:
Post a Comment