Passing arguments to Python Hadoop Streaming

NOVEMBER 23, 2013

This is not well documented, but you can pass arguments to your mapper and reducer in Hadoop Streaming. Using this technique, you can even consolidate your map and reduce files into one.


When you write mapreduce in Python, you may use some framework, like mrjob. However, if you cannot install 3rd party libraries on your hadoop cluster, you typically use Hadoop Streaming directly and write something like this:

count_mapper.py

import sys

for line in sys.stdin:
  line = line.strip()
  item, count = line.split(',')
  print '%s\t%s' % (item, count)


count_reducer.py

import sys

current_item = None
count_sum = 0

def output():
  if current_item:
    print '%s,%d' % (current_item, count_sum)

for line in sys.stdin:
  line = line.strip()
  item, count = line.split('\t', 1)
  if current_item != item:
    output()
    current_item = item
    count_sum = 0
  count_sum += int(count)

#Print the last item count sum.
output()


Then, you pass both mapper and reducer files to hadoop streaming command:

> hadoop jar hadoop-streaming.jar \
> -mapper count_mapper.py -file count_mapper.py \
> -reducer count_reducer.py -file count_reducer.py \
> ...


Passing arguments

If you want to pass arguments to mapper and reducer, you can do this instead. After all, you can pass any executables as mapper or reducer.

> hadoop jar hadoop-streaming.jar \
> -mapper 'count_mapper.py arg1 arg2' -file count_mapper.py \
> -reducer 'count_reducer.py arg3' -file count_reducer.py \
> ...


Then you can read arguments in mapper and reducer files, just like any other Python scripts:

count_mapper.py with arguments

import sys

arg1 = sys.argv[1]
arg2 = sys.argv[2]

for line in sys.stdin:
  line = line.strip()
  item, count = line.split(',')
  print '%s\t%s' % (item, count)


Consolidating mapper and reducer files into one

Using this argument technique, you can combine mapper and reducer files together. In your real code, you might want to create a template super class to extract common logic and use yield instead of print, but the code below just demonstrates a consolidation with minimal efforts.

count_mapreduce.py

import sys

def CountMapReduce():
  
  def __init__(self):
    self.current_item = None
    self.count_sum = 0

  def __output(self):
    if self.current_item:
      print '%s,%d' % (self.current_item, self.count_sum)

  def map(self, line):
    item, count = line.split(',')
    print '%s\t%s' % (item, count)

  def reduce(self, item, count):
    if self.current_item != item:
      self.__output()
      self.current_item = item
      self.count_sum = 0
    self.count_sum += int(count)

  def reduce_end(self):
    #Print the last item count sum.
    self.__output()

if __name__ == '__main__':
  mr_flag = argv[1]
  mapreduce = CountMapReduce()
  
  if mr_flag == 'map':
    for line in sys.stdin:
      line = line.strip()
      mapreduce.map(line)
  elif mr_flag == 'reduce':
    for line in sys.stdin:
      line = line.strip()
      item, count = line.split('\t', 1)
      mapreduce.reduce(item, count)
    mapreduce.reduce_end()


Then new command should be updated like this:

> hadoop jar hadoop-streaming.jar \
> -mapper 'count_mapreduce.py map' \
> -reducer 'count_mapreduce.py reduce' -file count_mapreduce.py \
> ...


You can do the same to include a combiner as well. If you use yield, it's easier to write unit test, too.


Related Posts

SVG-based Sparkline with d3.js

This is a tutorial of how to create various SVG based sparklines with d3.js. Sparkline, a tiny line chart, is often very effective and visually appealing, especially for today's small screen devices.

Interactive Map with d3.js

This is a step by step tutorial of how to make responsive, interactive and zoomable map with d3.js. Converting Shapefile into GeoJSON then Topojson, you can build web/mobile-ready map application.

Responsive d3.js

It's increasingly important to make d3.js charts responsive for touch-based devices such as mobile and tablet. While this approach doesn't solve everything, it's one way of making SVG based charts dynamic and responsive.