MapReduce

マルレクサブセミナーのメモ。

  • MapReduceの入出力単位はKey:Valueの組(ペア)のリスト
  • 処理単位はMapとReduceに加えて、隠された処理であるSort
  • 大まかな処理の流れは
    1. Map : 処理に適したペアに組みなおす(入出力は一対一)
    2. Sort : 出力をReduceの入力として渡した際に処理を分散できるような形(同じキーごと)に並び替え
    3. Reduce : 同じキーの組をまとめて処理して結果として一対のペアを生成(入出力は多対一)
  • MapとReduceはいくらでも分散可能
  • Sortが一番重い
  • MapにはCombineと言う処理も含まれる?
  • ReduceにSortが含まれる?
  • Shuffle = Sort + Partitioning?(三つの使い分けはよく分かりませんでした)


まだいろいろあったけど、とりあえずこの辺まででなんとなく感じたアルゴリズムをものっそ適当に、分散する代わりにThread使って表現してみる。オレオレメモなのでいろいろ嘘。コメントもなし。あとでまたdRb使ってもう少し真面目に書いて考える。


文章に含まれるアルファベットの登場回数を数える。

require 'thread'

def key(ary)
  ary.nil? or ary.empty? ? 0 : ary[0]
end

module MapReducePpoi
  class Mapper
    def exec(input)
      combine map(input)
    end

    def combine(input)
      input.sort{|i1, i2| key(i1) <=> key(i2)}
    end

    def map(input)
      input.map{|i| [i, 1]}
    end
  end

  class Shuffler
    def initialize(num_of_reducers)
      @reducer_inputs = Array.new(num_of_reducers){Array.new}
    end

    def exec(input)
      sort partition(input)
    end

    def partition(input)
      input.each do |i|
        @reducer_inputs[key(i).hash % @reducer_inputs.size] << i
      end
     @reducer_inputs
    end

    def sort(input)
      # do nothing in this case
      input
    end
  end

  class Reducer
    def exec(input)
      reduce sort(input)
    end

    def sort(input)
      input.sort{|i1, i2| key(i1) <=> key(i2)}
    end

    def reduce(input)
      cur = input.shift
      input.inject([cur]) do |sum, e|
        if key(sum.last) == key(e) then sum.last[1] += 1 else sum << e end
        sum
      end
    end
  end

  NUM_MAPPERS = 4
  NUM_REDUCERS = 3
  def self.start(sentence)
    # initialize
    sentence.gsub! /\s/, ''
    mappers = Array.new(NUM_MAPPERS){Mapper.new}
    reducers = Array.new(NUM_REDUCERS){Reducer.new}
    shufflers = Array.new(mappers.size){Shuffler.new(reducers.size)}

    # map
    map_inputs = sentence.split(/(#{'.' * mappers.size})/).reject{|e| e.empty?}.map{|e| e.split ''}
    map_results = []
    m_mapper = Mutex.new
    mappers.each do |mapper|
      Thread.start do
        result = mapper.exec map_inputs.pop
        m_mapper.synchronize {map_results << result}
      end
    end

    # shuffle
    shuffle_inputs = map_results
    tmp_shuffle_results = []
    shufflers.each do |shuffler|
      tmp_shuffle_results << shuffler.exec(shuffle_inputs.pop)
    end
    shuffle_results = Array.new(NUM_REDUCERS){[]}
    tmp_shuffle_results.each do |s|
      s.each_with_index do |ss, i|
        shuffle_results[i] += ss
      end
    end

    # reduce
    reduce_inputs = shuffle_results
    reduce_results = []
    m_reducer = Mutex.new
    reducers.each do |reducer|
      Thread.start do
        result = reducer.exec reduce_inputs.pop
        m_reducer.synchronize {reduce_results << result}
      end
    end

    # show result
    puts reduce_results.inspect
  end
end

if $0 == __FILE__
  MapReducePpoi.start 'to be or not to be, that is the question.'
end

結果

$ ruby mapreduce_ppoi.rb
[ [ [".", 1], ["e", 2], ["h", 1], ["n", 1], ["q", 1], ["t", 3] ], [ ["a", 1], ["s", 2] ], [ ["i", 2], ["o", 1], ["u", 1] ] ]