Passing arguments to Python Hadoop Streaming
NOVEMBER 23, 2013
This is not well documented, but you can pass arguments to your mapper and reducer in Hadoop Streaming. Using this technique, you can even consolidate your map and reduce files into one.
When you write mapreduce in Python, you may use some framework, like mrjob. However, if you cannot install 3rd party libraries on your hadoop cluster, you typically use Hadoop Streaming directly and write something like this:
import sys for line in sys.stdin: line = line.strip() item, count = line.split(',') print '%s\t%s' % (item, count)
import sys current_item = None count_sum = 0 def output(): if current_item: print '%s,%d' % (current_item, count_sum) for line in sys.stdin: line = line.strip() item, count = line.split('\t', 1) if current_item != item: output() current_item = item count_sum = 0 count_sum += int(count) #Print the last item count sum. output()
Then, you pass both mapper and reducer files to hadoop streaming command:
> hadoop jar hadoop-streaming.jar \ > -mapper count_mapper.py -file count_mapper.py \ > -reducer count_reducer.py -file count_reducer.py \ > ...
If you want to pass arguments to mapper and reducer, you can do this instead. After all, you can pass any executables as mapper or reducer.
> hadoop jar hadoop-streaming.jar \ > -mapper 'count_mapper.py arg1 arg2' -file count_mapper.py \ > -reducer 'count_reducer.py arg3' -file count_reducer.py \ > ...
Then you can read arguments in mapper and reducer files, just like any other Python scripts:
count_mapper.py with arguments
import sys arg1 = sys.argv arg2 = sys.argv for line in sys.stdin: line = line.strip() item, count = line.split(',') print '%s\t%s' % (item, count)
Consolidating mapper and reducer files into one
Using this argument technique, you can combine mapper and reducer files together. In your real code, you might want to create a template super class to extract common logic and use
yield instead of
import sys def CountMapReduce(): def __init__(self): self.current_item = None self.count_sum = 0 def __output(self): if self.current_item: print '%s,%d' % (self.current_item, self.count_sum) def map(self, line): item, count = line.split(',') print '%s\t%s' % (item, count) def reduce(self, item, count): if self.current_item != item: self.__output() self.current_item = item self.count_sum = 0 self.count_sum += int(count) def reduce_end(self): #Print the last item count sum. self.__output() if __name__ == '__main__': mr_flag = argv mapreduce = CountMapReduce() if mr_flag == 'map': for line in sys.stdin: line = line.strip() mapreduce.map(line) elif mr_flag == 'reduce': for line in sys.stdin: line = line.strip() item, count = line.split('\t', 1) mapreduce.reduce(item, count) mapreduce.reduce_end()
Then new command should be updated like this:
> hadoop jar hadoop-streaming.jar \ > -mapper 'count_mapreduce.py map' \ > -reducer 'count_mapreduce.py reduce' -file count_mapreduce.py \ > ...
You can do the same to include a combiner as well. If you use
yield, it's easier to write unit test, too.