Carter Shore is an Intel Software Engineer, part of the Intel Distribution for Apache Hadoop Professional Services. He is an industry veteran, having witnessed the births of Unix, C, C++, the PC, Apple, Relational Databases, the Internet, Cellphones, Java, Social Media, Hadoop, and many, many, many iterations of what is considered to be ‘Big Data’.
One common Hadoop use case involves landing, storing, and processing periodically arriving files from external sources. These files may contain logging information, transaction records, status updates, network messages, web impressions, web click-throughs, etc.
This processing pattern is commonly referred to as ‘Batch File Processing’.
When designing a Hadoop application to handle these records, it’s important to carefully consider the requirements.
Some common issues:
- Data Latency – What is the allowable delay between when the files arrive, and the data within them must become ‘visible’ or available for consumption.
- Data Volume – What is the aggregate size of previously landed data, and what is the estimated volume of new files per process period.
- Landing Pattern – How often do the new files arrive, is it a push or a pull, is it synchronous or asynchronous.
- Data Quality – Business and operational rules that describe structure, content, and format of the data, and disposition of non-conforming data.
- Accountability – How to track and account for the handling and disposition of every record in every file.
- Data Retention – How long must previously landed data remain ‘visible’ or available.
- Read Consistency – What is the access or consumption profile, and effect of either ‘missing’ or ‘extra’ records when data is queried or exported.
- Efficiency and Performance – Does the size or volume of the data, or the amount of consumption activity dictate performance levels which may require special design or methods to achieve.
In a smaller or limited scope homogenous environment, where the business user, analyst, designer, and developer might even be the same person, the scope of ownership and responsibility allows wide latitude and freedom in the decisions that address and resolve these issues.
But since Hadoop’s sweet spot is dealing with Big Data, it is nowdays likely to be deployed in a large heterogeneous Enterprise production infrastructure, where Hadoop is just another player in the orchestra, and the ‘new kid on the block’.
In this scenario, we are often constrained by policy, by circumstances, and by the organizational culture, where robust, hardened, production ready software is not just expected, but demanded. Those who cannot meet the criteria will simply not be allowed onto the playground.
In many cases, the Enterprise deploying the consuming Hadoop solution is not the owner of the source data, and may have little influence on its file characteristics, record format, encoding, delivery method, or scheduling.
It is important then to be able to understand exactly what is expected, and be prepared to meet every requirement. Here is where negotiation skills can be important. You must understand what is impossible to do, vs. what is difficult or inconvenient to do. Often, the pure Hadoop parts are easy, it’s integrating the solution into the Enterprise production infrastructure that takes time and effort.
In addition to the sheer data volume per process interval, it’s also important to consider the number of files, and average file size. This is important because of the way HDFS is designed, to supports tens of millions of files. First, files are partitioned into equal sized blocks, by default 64 Mb. If a file is smaller than the blocksize, it will obviously fit entirely into only one block. Second, each file, no matter the size, requires an entry into the HDFS metastore. So adding millions of small files each day, or week, or month will cause the metastore size to grow, perhaps eventually exceeding allocated memory space. Even if it fits, HDFS internal performance may be impacted. Some applications have seen performance gains of hundreds, even thousands of percent for the same volume, by applying aggregation. This is generally termed ‘The Hadoop Small Files Problem’.
It’s possible to specify the blocksize that will be used for a file at create time, but the limitations above still apply. Most hadoop clusters specify a minimum blocksize, nominally 1 Mb, to avoid creating a swarm of small files.
Average File Size = Interval Data Volume / Interval File Count
If the average file size is less than 50% of the default blocksize, then consider a file aggregation strategy. A simple aggregation method simply accumulates the files in a local filesystem landing area, and then periodically concatenates them into larger HDFS target file(s):
cat <landing_path>/<file_pattern> | hadoop fs –put - <hdfs_path>/<target_hdfs_filename>
The aggregation trigger might be the total landing filesize exceeding a threshold, or the end of a defined process interval, or some combination.
Requirements for data latency and availability may prevent using this simple strategy. A more complex alternate method would place the files directly into HDFS as they land. Then, at some later point, those smaller files would be aggregated into a large HDFS file, and the smaller HDFS files removed. This solution will result in temporary data inconsistency, since there will be multiple copies of same data during the time required to aggregate to a large file, and to delete the smaller files. If the process schedule and access requirements allow a maintenance window where read operations can be suspended, then this data inconsistency presents no issues. Otherwise, we must seek another solution.
Hive’s support of data partitioning offers a simple and effective solution. It allows us to impose batch level transaction semantics onto our data. With partitioning, we define the Hive tables as usual, except that one or more elements that would normally be defined as columns, are instead defined as partition keys. We place the files containing data into folders with defined locations and names that correspond to the partition keys. The files in the partition folders will not become ‘visible’ as part of the table until we execute a Hive statement that explicitly adds the partition to the table. In Hive, table definitions are pure metadata, they are persisted into a metastore database, and have no effect on the actual underlying HDFS files. Hive itself is not a database, and it does not support transactions, i.e. commit/rollback. BUT, the metastore IS typically hosted in a database that supports transactional commit/rollback (MySql, Oracle, etc.). So ddl actions like CREATE/DROP/ALTER are atomic.
This gives us a solution to the read consistency issue. We can create target HDFS folders, with appropriate path names, add data files (either as-landed, or aggregated), and then ‘commit’ them to the target Hive table dataset by executing a ddl statement to ADD the partition. Note that the DROP statement does the opposite, which can be useful when performing operational tasks, such as removing data that no longer meets retention requirements.
A simple example
Each day after midnight, a daily file of all transactions for the previous 24 hour process day is pushed from an external source to a landing zone in the local filesystem of one of the Gateway nodes. The transaction date is embedded into the file name, ‘tran.YYYYMMDD.dat’. The average file size is 900 Mb. The transaction records are in ASCII csv format.
The captured transactions in Hadoop are subject to query 7x24, and the SLA states that new transactions must be available on the cluster by 8:00 AM each day. Only the transaction records for the previous 90 days will be stored for queries, older records get moved to archive.
A Hive table, ‘daily_transaction’, contains the records that will be consumed:
CREATE EXTERNAL TABLE daily_transaction (
PARTITIONED BY (transaction_date string)
<additional table specifications> ;
The table partition key ‘transaction_date’ is a typed pseudo column. It is not stored within the record itself, but in the Hive metastore. The value is declared when a new Partition is added to the table, and that value is associated with every record in that partition. It can otherwise be used just like a column, in a SELECT list, in joins, in WHERE clauses, etc. Use the partition key name and value to reference the partition in SQL like this: ‘… transaction_date = ‘20130829’ …’ If a partition key value is not specified, then the entire dataset of table ‘daily_transaction’ will be queried.
An Enterprise scheduler spawns a filewatcher job at midnight. Upon detecting the file, a handler script is spawned.
The handler script waits until the file is fully received, extracts the date component of the filename, performs some data quality checks (record count, duplicate file, etc.), creates the new target partition folder in HDFS, employing the date as the leaf level foldername. It copies the records to the HDFS target folder, perhaps applying some additional data quality checks (field count of the records, etc.), and transforming comma delimiters from comma to the Hive defaiult’ 0x01’.
The handler ksh script would contain statements like this:
hadoop fs –mkdir <hdfs_data_path>/daily_transaction/<YYYYMMDD>
cat <local_landing_path>/<filename> |
<data_quality_filter> | tr ‘,’ ‘\001’ |
hadoop fs –put - <hdfs_data_path>/daily_transaction/<YYYYMMDD>/<filename>
If no errors occurred, the data file is now in HDFS, but still not visible from Hive, i.e. we have maintained read consistency. Next, the script executes a Hive ‘ALTER …’ statement to add the new partition and its data to the current ‘daily_transaction’ table:
hive –e “ALTER TABLE daily_transaction ADD PARTITION (transaction_date = ‘YYYYMMDD’)
If the ALTER statement executes without errors, then the new data has been added to the table, and is available for consumption. We have maintained read consistency, since queries will now ‘see’ the existing data set plus the entire new data.
Post-load data quality steps could be performed here, such as record count reconciliation, a Hive query for the record count where transaction_date = YYYYMMDD from the new file, raw record count, error record count, etc.
Now consider the 90 day data retention requirement. We can run derive a list of the partition names which contain dates that are older than 90 days. For each partition in the list, we execute ddl to drop it from the table in one statement:
ALTER TABLE daily_transaction
DROP PARTITION (transaction_date = ‘<YYYYMMDD>’)
DROP PARTITION (transaction_date = ‘<YYYYMMDD>’)
DROP PARTITION (transaction_date = ‘<YYYYMMDD>’)
The data files still exist, we have simply changed the table metadata to remove them from the scope of the table. Read consistency has been maintained. We can now process the files in the dropped partitions at our leisure, perhaps publishing them to an external archive, end eventually deleting them from HDFS to regain the storage.
In part 2 of this article, we will examine a more complex example.
We have described the ‘Batch File Processing’ use pattern, discussed some of the important requirements and issues, and provided some examples of Hive solutions that provide read consistency, and decoupling from operational file operations using standard Hive features.