Writing a SQL Query to run in Hadoop Pt2: Map

In my previous post I wrote a simple query to convert to a MapReduce job to run under Hadoop and also described to generate and load the data to test against so if you have not done that already then start with Pt1. Now we need to write the job. To do this we need to write a map function and a reduce function both accepting key-value pairs for both input and output with the map function producing an intermediate output accepted by the reducer for input. There are options to use Hadoop Straming or Hadoop Pipes to use languages other than Java however in this case we will use Java to look at the default choice. if you don't have  strong Java skills then for the sake of example, this is not a problem, most administrators who already work with SQL and relational databases will not have Java as a core skill and therefore adapting to different languages is part of the evaluation process in adopting new technologies.

The aim for this example is more on functionality than achieving the highest level of optimisation possible. I should note that there is an existing example of TPC-H Query 1 written for Hadoop of which I am aware here, although it does not model the whole query so I will do it slighlty differently.

The Map method will extend the Mapper class. As we have noted this needs to accept key value pairs as input and write key value pairs as an intermediate output. From Pt1 we have already seen the format of the unstructured data, the first line of which is repeated here:

1|15518935|768951|1|17|33203.72|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the|

We need to define the key and values and extract these from each line of the data. Identifiying the key is straightforward, the SQL statement groups by l_returnflag, l_linestatus and therefore using the create table definition in Pt1 for reference we can identify these as fields 9 and 10 which in the line above are "N|O". These can be concatenated in text format as the key. Next we need to identify the values or the data that will be manipulated to form the values, working from the SQL Statement these are l_quantity, l_extendedprice, l_discount and l_tax which in the line above are fields 5 to 8 or "17|33203.72|0.04|0.02". We also want to restrict the data from the map phase to "where l_shipdate <= date '1998-12-01' - interval '112' day (3)". As we saw in Pt2 this equates to a date of less than or equal to 11-AUG-98 and therefore we also need to examine l_shipdate which in the line above is field 11 or "1996-03-13" to only output from the map stage data that qualifies. After the map stage this date value is no longer required, however if the date matches the where clause we can also calculate the discount price as (l_extendedprice * (1 - l_discount)) and the charge as (l_extendedprice * (1 - l_discount) * (1 + l_tax)) for that particular line to include in the map output. We could of course do these calculations earlier for every single line but doing it only if the where clause matches saves a small amount of unecessary work. Finally we need to maintain a count of lines which as we run the mapper on each line means a count of 1 is recorded as an output value. We can store all of the output values as an array that corresponds to the text key.

The following extract implements this approach in Java. We are parsing each line and creating a key by concatenating fields 9 and 10, the values are stored in a DoubleArrayWritable. Fields 5,6 and 7 are stored directly and we also examine the date in field 11 and if this meets the correct criteria we calculate the discount_price and charge and store it along with the row count along with the fields previously stored.


String line = value.toString(); 
StringTokenizer tokenizer = new StringTokenizer(line,"|"); 
int i = 1;
double l_extendedprice = 0.0;
double l_discount = 0.0;
double l_tax = 0.0;
        while (tokenizer.hasMoreTokens()) { 
String token = tokenizer.nextToken();
if(i == 5) {
outArray[0] = new DoubleWritable(Double.parseDouble(token));
if(i == 6) {
outArray[1] = new DoubleWritable(Double.parseDouble(token));
else if (i == 7) {
outArray[2] = new DoubleWritable(Double.parseDouble(token));
else if ( i == 8 ) {
l_tax = (Double.parseDouble(token));
else if (i == 9) {
tpchq1key.set(tpchq1key.toString()+" "+token);
else if (i == 10) {
tpchq1key.set(tpchq1key.toString()+" "+token);
else if (i == 11) {
StringTokenizer wheredate = new StringTokenizer(token,"-");
if(win.getTimeInMillis() <= wc.getTimeInMillis()) {
l_extendedprice = outArray[1].get();
l_discount = outArray[2].get();
double sum_disc_price = (l_extendedprice * (1 - l_discount));
double sum_charge = (l_extendedprice * (1 - l_discount) * (1 + l_tax));
outArray[3] = new DoubleWritable(1.0);
outArray[4] = new DoubleWritable(sum_disc_price);
outArray[5] = new DoubleWritable(sum_charge);
DoubleArrayWritable da = new DoubleArrayWritable();
context.write(tpchq1key, da);


Now we have written the map phase and have output the required intermediate key, value pair however so far we have worked on each line of input individually so have not as yet implemented any of the aggregate functions. We will look at this in the reduce phase in Pt 3.