Hadoop streaming with Python

Streaming is extremely powerful and has least path of resistance. Consider a case where you are decent with Python skills and don’t want to get into nitty gritty of a full blown map-reduce application in Java, then Hadoop streaming is something for you. I found it to be very productive since time to market is less and you are able to achieve quite a lot in very little time.

The basics,

  1. Streaming will work with any language that can read write to stdin & stdout AND Hadoop data nodes can interpret the language (meaning they should be able to execute the program)
  2. The streaming (stdin & stdout) app receives one line at a time in stdin and applies logic and writes to stdout
  3. Input location, output location, compression parameters, mapper code, reducer code and other job parameters are handled by streaming command
  4. Streaming command jars come with Hadoop

Getting hands dirty,

mapper.py

#!/usr/bin/env python
"""mapper.py"""

import sys

for line in sys.stdin:
    print(line)

 

Streaming command,

$ hadoop jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -input /tmp/input -output /tmp/output  -file ./mapper.py -mapper mapper.py

And that’s it!

Yes, that was your first streaming code that works, and does pretty much nothing except for reading input directory and writing to output directory. This code can be extended to process complex input (JSON, XML, custom line formats) and even output multiple lines with the print statement.

 

Other sample commands,

# Provide a code directory - entire directory gets pushed to datanodes for execution. It can contain all dependencies/ dependent files
hadoop jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -input /tmp/input -output /tmp/output  -file code-dir/* -mapper mapper.py

# Enable output compression
hadoop jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -input /tmp/input -output /tmp/output  -file code-dir/* -mapper mapper.py -D mapreduce.output.fileoutputformat.compress=true -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec

# Enable output compression and map memory
hadoop jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -input /tmp/input -output /tmp/output  -file code-dir/* -mapper mapper.py -D mapreduce.output.fileoutputformat.compress=true -D mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec -D mapreduce.map.memory.mb=2048

# Specify queue name with below argument
# -D mapred.job.queue.name=my_queue

# Specify job name with
# -D mapred.job.name='My Job'

 

You can also have reducer code passed to streaming job. In my case it was not required, though you can get more information here.

HTH

 

 

You may also like...

Leave a Reply

Your email address will not be published. Required fields are marked *