map reduce weather dataset

Pulling off the training wheels … A first MapReduce Job

Remember that feeling as a kid before your first bike ride without training wheels? I do. It was a tale of mixed emotions – initially excited and enthralled, a feeling of independence. Then slowly there are some flashes of anxiety  – a picture of a few falls and physical bruises (and mental as others watched you fall). But then you see someone whiz by you – and you regain your position and know it’s all worth it!

So I decided to take my Hadoop training wheels off and roll-up my sleeves for my first MapReduce application. It’s been a while since my last post of setting up a Hadoop cluster from scratch. As a refresher, here’s the link to that. I will be using that cluster for this job.

The example is straight out of Tom White’s book:”Hadoop: The Definitive Hadoop Guide”.

The Data Set

The data set is a weather data set from the National Climatic Data Center – data that has weather information stored in an ASCII format (fixed width). NCDC has weather data collected from weather sensors all the way back from 1901. I had some difficulty downloading all the files and after several attempts, I decided to deal with the data from 1901 to 1960. This still amounted to about 36 Gigabytes of data, something that could be equated to a reasonably sized database. The goal of the MR job is to find the highest recorded temperature for each year.

All files are gzipped by year, organized by date and weather station. For each year, there is a file for different weather stations for that year. Here is an example for 1990.

010080-99999-1990.gz
010100-99999-1990.gz
010150-99999-1990.gz

Each file has entries that look like this:

0029029070999991905010106004+64333+023450FM-12+000599999V0202301N008219999999N0000001N9-01391+99999102641ADDGF102991999999999999999999

Not very readable, right? You have to know where the field delimiters are and that definition is available from NCDC.

For the purpose of this example, let’s focus on the “bold” numbers above. The first one (029070) is the USAF weather station identifier. The next bold (19050101) represents the observation date. The next item of interest (-0139) represents the air temperature in celsius times ten.So the reading of -0139 equates to 13.9 degrees celsius. The nextbold item in red indicates a reading quality code. Lastly, a value of 99999 at a point where an entry should exist signifies a missing value.

Processing the data set

Knowing this much we can talk about the processing of this data, which is quite straightforward. We would extract two fields : the air temperature and quality code. We make sure the quality code is good and that the reading is not “99999”. If it’s good, we record that as the highest temperature so far in a variable till we find the next higher one. At the end of the year’s processing, the last recorded value in the variable is the highest temperature. We spit out the year and the value divided by 10 along with the time taken. The output looks like this

1901 12.2

1902 11.5

1905 13.9

and so on

Time : 1395 seconds

Processing the Old fashioned Way

There are several approaches you can take here. I took Tom’s awk script, modified it slightly for my own environment and needs and ran the script against the data set. This is the simplest and probably the most common approach you would apply to such a data set.

If you are bold enough and have the time to try, you could build a relational data model out of this and write your favorite application to perform the logic (or stored procedure). I don’t know why , but you could :-).

Now remember, this is 36 GB of raw data. I ran the script on the beefiest machine in my cluster. The entire crunching with an awk script took a little over 23 minutes.

Processing with Hadoop

Before describing the processing with Hadoop, I want to point out a few things:

  • I did not really tune the cluster itself – obviously it’s a bare bones cluster running with 3 laptops on a shared network. You can’t really do too much with that
  • It’s important to keep in mind that Hadoop works well with large files. Therefore to take advantage of that, again,per Tom’s book, I created one large file for every year and labeled it “1901.all”, 1902.all” and so on. I therefore now have weather files that are gigabytes in size.

For my hadoop cluster, the volume that hadoop recognizes is set to /home. The default settings in Cloudera Manager are set to /. This is important to know because in many default linux installations, the root partition is small , around 50 GB and the rest of the disk is under a single partition. So I have my namenode directory under /home/dfs/nn, my data node directory under /dfs/dn1 etc. I also set the log configurations to “ERROR” instead of “INFO” to save space.

I then distributed these files into my Hadoop cluster.

Sudo –u hdfs hadoop fs –mkdir –p /examples/input
Sudo –u hdfs hadoop fs –mkdir –p /examples/output1,2,3 etc
Sudo –u hdfs fs -put /home/rnair/WeatherFiles/*.all /examples/input

The java code to run this is also quite bare bones but it has a Mapper and a Reducer – just what we need!

For the mapper, the input split is a line from the file, with the offset as key and the value as text of the line. The offset is meaningless for us and we only care about the line itself. However, now unlike the awk job, the map task is run in parallel across the data nodes. The output/emit of the map tasks is a key value pair of (year, temperature) as in:

(1905, 11.3), (1905, 13.9), (1905, 12.2)

(1906, 10.3), (1905, 10.9), (1905, 12.5)

The shuffle step then passes the following to the reduce task:

(1905, [11.3, 13.9, 12.2]), (1906, [10.3, 10.9, 12.5])

After all the map tasks are complete, the Reducer task is called and in our case , all the reducer does is find the max reading for each key (which is year in our case).

That’s it. I didn’t have to put in any code to record timing, because the JobTracker UI has all that information.

And So..?

So how fast was it on a poor man’s cluster of 3 laptops on a regular shared network? It took a little over 6 minutes. 

The result  is compelling for at least 1 reason: It showed performance boosts for a relatively smaller “volume” of data. 36GB is no small measure, but it’s certainly not the terabytes or petabytes that Hadoop is normally associated with. I got at least a factor of 3 times performance boost with this data set. This means that when you go into terabytes , the results would look even better, because processing on a monolithic machine will reach a point of diminishing returns at a certain scale.

I would venture to say that it’s not too far from the kind of processing we do with our data – many of us deal with a lot of ETL/data integration processes that can benefit by applying Hadoop’s power to it.

I am glad I pulled off those training wheels – I did hit a few bumps, but now I am exhilarated.

Advertisements