Carter Shore is an Intel Software Engineer, part of the Intel Distribution for Apache Hadoop Professional Services. He is an industry veteran, having witnessed the births of Unix, C, C++, the PC, Apple, Relational Databases, the Internet, Cellphones, Java, Social Media, Hadoop, and many, many, many iterations of what is considered to be ‘Big Data’.
In part 1 of this article, we discussed the Hadoop ‘Batch File Processing’ pattern, and how Hive can be used to craft a solution that provides read consistency, and decoupling from operational file processes.
To review, most large scale environments will require that solutions address requirements and issues like these:
- Data Latency – What is the allowable delay between when the files arrive, and the data within them must become ‘visible’ or available for consumption.
- Data Volume – What is the aggregate size of previously landed data, and what is the estimated volume of new files per process period.
- Landing Pattern – How often do the new files arrive, is it a push or a pull, is it synchronous or asynchronous.
- Data Quality – Business and operational rules that describe structure, content, and format of the data, and disposition of non-conforming data.
- Accountability – How to track and account for the handling and disposition of every record in every file.
- Data Retention – How long must previously landed data remain ‘visible’ or available.
- Read Consistency – What is the access or consumption profile, and effect of either ‘missing’ or ‘extra’ records when data is queried or exported.
- Efficiency and Performance – Does the size or volume of the data, or the amount of consumption activity dictate performance levels which may require special design or methods to achieve.
In part 1 we provided a simple example and a Hive solution that addressed some of the issues above.
In part 2 we will discuss how the Hadoop ‘Small Files Problem’ is related to Batch File Processing, and describe some features of Hive that can be used as a solution.
We will present a more complex Batch File example that has Small File issues, and a Hive based solution.
Small Files = Big Trouble
Current Hadoop distributions suggest that 10 million files is a practical threshold of concern. This is based on the size of the HDFS filesystem image that the namenode stores in memory, and the size of the entry needed for each file and each block of that file.
If we receive a large number of files each day (100,000 for example), it only takes 50 days to reach that threshold.
An easy solution would be to simply require that the data source aggregate the small files into larger files before sending them.
But recall that in many cases, the Enterprise deploying the consuming Hadoop solution is not the owner of the source data, and may have little influence on its’ file characteristics, record format, encoding, delivery method, or scheduling.
We may also be dealing with a large number of individual sources, or with transaction or event records, where latency requirements prevent accumulating the records for periodic aggregation.
In addition to the sheer data volume per process interval, it’s also important to consider the number of files, and average file size. This is important because of the way HDFS is designed, to supports tens of millions of files. First, files are partitioned into equal sized blocks, by default 64 Mb. If a file is smaller than the blocksize, it will obviously fit entirely into only one block. Second, each file, no matter the size, requires an entry into the HDFS metastore. So adding millions of small files each day, or week, or month will cause the metastore size to grow, perhaps eventually exceeding allocated memory space. Even if it fits, HDFS internal performance may be impacted. Some applications have seen performance gains of hundreds, even thousands of percent for the same volume, by applying aggregation. This is generally termed ‘The Hadoop Small Files Problem’.
It’s possible to specify the blocksize that will be used for a file at create time, but the limitations above still apply. Most hadoop clusters specify a minimum blocksize, nominally 1 Mb, to avoid creating a swarm of small files.
Average File Size = Interval Data Volume / Interval File Count
If the average file size is less than 50% of the default blocksize, then consider a file aggregation strategy. A simple aggregation method simply accumulates the files in a local filesystem landing area, and then periodically concatenates them into larger HDFS target file(s):
cat <landing_path>/<file_pattern> | hadoop fs –put - <hdfs_path>/<target_hdfs_filename>
The aggregation trigger might be the total landing filesize exceeding a threshold, or the end of a defined process interval, or some combination.
Requirements for data latency and availability may prevent using this simple strategy. A more complex alternate method would place the files directly into HDFS as they land. Then, at some later point, those smaller files would be aggregated into a large HDFS file, and the smaller HDFS files removed. This solution can result in temporary data inconsistency, since there will be multiple copies of same data during the time required to aggregate to a large file, and to delete the smaller files. If the process schedule and access requirements allow a maintenance window where read operations can be suspended, then this data inconsistency presents no issues. Otherwise, we must seek another solution.
Hive’s support of data partitioning offers a simple and effective solution to read consistency, but also enables us to meet data latency requirements, while addressing the ‘Small Files Problem’.
With partitioning, we define the Hive tables as usual, except that one or more elements that would normally be defined as columns, are instead defined as partition keys. We place the files containing data into folders with defined locations and names that correspond to the partition keys. The files in the partition folders will not become ‘visible’ as part of the table until we execute a Hive statement that explicitly adds the partition to the table.
In Hive, table definitions are pure metadata, they are persisted into a metastore database, and have no effect on the actual underlying HDFS files.
Hive itself is not a database, and it does not support transactions for dml statements, i.e. no commit/rollback. BUT, the metastore IS typically hosted in a database that does support transactional commit/rollback (MySql, Oracle, etc.). So ddl actions like CREATE/DROP/ALTER are atomic.
In particular, a partition is added to a table with defined partition keys and values. The LOCATION clause associates that partition with an actual physical location where the files containing the records may be found. Once the partition is created, we can execute an ALTER statement to redefine that location, as a ddl transaction.
This gives us a solution to the small file issue. We can create multiple distinct target HDFS folders that hold different ‘versions’ of our recordset, and ‘swap’ between them by executing an ALTER statement. While a static version of the recordset is visible to the consuming processes, we can perform intake and consolidation of new files in another working version, without affecting read consistency. At the latency interval, we halt intake and file processing, and swap locations. The most recently processed records are now visible in Hive, and we can resume intake and file processing in the newly swapped out folder.
A complex example
Logfiles from several sources land throughout the entire day. Logfile names contain a create timestamp ‘YYYYMMDD.hhmmss.nnnnnn’ The logfiles may contain a variable number of records, from 1 up to millions, but average record count for a logfile is 1000. The individual log records are only a couple of hundred bytes. Average daily volume is 10 million records per day, but can peak at 50 million records per day. Overall volume is projected to increase by 20% per year.
Our requirement is to process the log records and make them available in Hive within 5 minutes of landing. Retention period for the records is one year.
In the average case, we will get 10,000 files per day, of 1,000 records, and in worst case, 50 million files of one record each.
So we must implement a scheme to consolidate the smaller files to avoid overwhelming the namenode metastore, while still meeting our 5 minute latency requirement, and maintaining read consistency.
A Hive table, ‘all_logs’, contains the records that will be consumed:
CREATE EXTERNAL TABLE all_logs (
PARTITIONED BY (log_timestamp string)
<additional table specifications> ;
We choose the time granularity of the partition as one day, which yields on average 10 M records, using 2 Gb of storage, and worst case 50M records using 10 Gb.
An enterprise scheduler spawns a handler script every minute. The handler script examines the landing area for new logfiles, and checks to make sure that they are stable, i.e. fully landed. Each new stable file is streamed through data quality and formatting filters as it is copied to an HDFS partition folder.
We employ at least 2 distinctly named HDFS working partition folders. One folder contains all records that have arrived so far in that day, and is associated with the current day’s table partition. The other folder serves to accumulate the new logfiles as they are copied from the landing area. At 5 minute intervals (dictated by our latency requirement), the folder roles are swapped by altering the partition location settings.
For example, assume that process date is 2013/08/29. We create two working folders, ‘working_0’ and ‘working_1’, before the midnight transition.
ALTER TABLE all_logs ADD PARTITION(log_timestamp = ‘20130829’)
After 5 minutes of accumulating the new logfiles into ‘working_1’ we will swap:
ALTER TABLE all_logs PARTITION(log_timestamp = ‘20130829’)
SET LOCATION ‘<full URI>:<hdfs_path>/all_logs/working_1’;
Note that we used <full URI> as part of the SET LOCATION clause rather than just <hdfs_path>, because some Hadoop distributions will not accept a simple HDFS path when altering a partition location.
Now we can consolidate the existing files in folder ‘working_0’ into 1 or more larger files, and also start landing the new files into it.
After 5 minutes, we swap again. The swapping and accumulation continues throughout the process day. At day rollover, the final HDFS folder for the partition is created as ‘20130829’, all the files for the day are consolidated into it, and the final partition location is set to it. Working folders are truncated, redundant files are deleted, and a new daily process cycle begins.
The net result is a far smaller number of consolidated files in each daily partition, while meeting requirements for 5 minute latency and read consistency.
There are alternative ‘Small Files’ solutions using both standard and specialized components:
HAR Files - Hadoop Archive (HAR) files were introduced in 0.18.0. These can reduce the number of file objects that the namenode metastore must deal with by packing many smaller files into a few larger HAR files. HAR files do not currently support compression. No particular performance gain has been reported though.
Sequence Files – Code is written to process multiple small source files into a target Sequence File, using the filename as the key, and the file content as the value. That target is then used as source for subsequent queries and processing. SequenceFiles are splitable, and also support compression, particularly at block level. SequenceFile creation and reading can be somewhat slower, so performance must be factored in.
HBase – Stores data in MapFiles (indexed SequenceFiles). This can be a good choice when the predominant consumption profile is MapReduce style streaming analysis, with only occasional random lookup. Latency for random queries can be an issue.
Filecrush - The Hadoop file crush tool can be used as a map reduce job or standalone program. The file crush tool navigates an entire file tree (or just a single folder) and decides which files are below a threshold and combines those into bigger files. The file crush tool works with sequence or text files. It can work with any type of sequence files regardless of Key or Value type. It is highly configurable, and a downloadable jarfile is available.
Consolidator – A java Hadoop file consolidation tool written by Nathan Marz, can be found in the ‘dfs-datastores’ library. It can be integrated into custom code, or implemented as an add on component. There is little explicit documentation besides the code itself.
S3DistCp – If you are running in the Amazon world, especially EMR, this tool can solve a lot of issues. Apache DistCp is an open source tool to copy large amounts of data in a distributed manner - sharing the copy, error handling, recovery, and reporting tasks across several servers.
S3DistCp is an extension of DistCp that is optimized to work with Amazon Web Services (AWS), particularly Amazon Simple Storage Service (Amazon S3). You use S3DistCp by adding it as a step in a cluster. Using S3DistCp, you can efficiently copy large amounts of data from Amazon S3 into HDFS where it can be processed by subsequent steps in your Amazon Elastic MapReduce (Amazon EMR) cluster. You can also use S3DistCp to copy data between Amazon S3 buckets or from HDFS to Amazon S3.
Use argument ‘--groupBy,PATTERN’ to cause S3DistCp to concatenate input files whose names match the RegEx ‘PATTERN’ into a single target.
Additional arguments such as ‘--targetSize,SIZE’, ‘--outputCodec,CODEC’ enable fine grained control of the results.
We have described the ‘Batch File Processing’ use pattern, and shown how it relates to the ‘Small Files Problem’. We discussed some of the important requirements and issues, and provided examples of solutions that provide read consistency, decoupling from operational issues such as archiving, file count management, and satisfaction of latency requirements, using standard Hive features.