MapReduce
マルレクサブセミナーのメモ。
- MapReduceの入出力単位はKey:Valueの組(ペア)のリスト
- 処理単位はMapとReduceに加えて、隠された処理であるSort
- 大まかな処理の流れは
- Map : 処理に適したペアに組みなおす(入出力は一対一)
- Sort : 出力をReduceの入力として渡した際に処理を分散できるような形(同じキーごと)に並び替え
- Reduce : 同じキーの組をまとめて処理して結果として一対のペアを生成(入出力は多対一)
- MapとReduceはいくらでも分散可能
- Sortが一番重い
- MapにはCombineと言う処理も含まれる?
- ReduceにSortが含まれる?
- Shuffle = Sort + Partitioning?(三つの使い分けはよく分かりませんでした)
まだいろいろあったけど、とりあえずこの辺まででなんとなく感じたアルゴリズムをものっそ適当に、分散する代わりにThread使って表現してみる。オレオレメモなのでいろいろ嘘。コメントもなし。あとでまたdRb使ってもう少し真面目に書いて考える。
文章に含まれるアルファベットの登場回数を数える。
require 'thread' def key(ary) ary.nil? or ary.empty? ? 0 : ary[0] end module MapReducePpoi class Mapper def exec(input) combine map(input) end def combine(input) input.sort{|i1, i2| key(i1) <=> key(i2)} end def map(input) input.map{|i| [i, 1]} end end class Shuffler def initialize(num_of_reducers) @reducer_inputs = Array.new(num_of_reducers){Array.new} end def exec(input) sort partition(input) end def partition(input) input.each do |i| @reducer_inputs[key(i).hash % @reducer_inputs.size] << i end @reducer_inputs end def sort(input) # do nothing in this case input end end class Reducer def exec(input) reduce sort(input) end def sort(input) input.sort{|i1, i2| key(i1) <=> key(i2)} end def reduce(input) cur = input.shift input.inject([cur]) do |sum, e| if key(sum.last) == key(e) then sum.last[1] += 1 else sum << e end sum end end end NUM_MAPPERS = 4 NUM_REDUCERS = 3 def self.start(sentence) # initialize sentence.gsub! /\s/, '' mappers = Array.new(NUM_MAPPERS){Mapper.new} reducers = Array.new(NUM_REDUCERS){Reducer.new} shufflers = Array.new(mappers.size){Shuffler.new(reducers.size)} # map map_inputs = sentence.split(/(#{'.' * mappers.size})/).reject{|e| e.empty?}.map{|e| e.split ''} map_results = [] m_mapper = Mutex.new mappers.each do |mapper| Thread.start do result = mapper.exec map_inputs.pop m_mapper.synchronize {map_results << result} end end # shuffle shuffle_inputs = map_results tmp_shuffle_results = [] shufflers.each do |shuffler| tmp_shuffle_results << shuffler.exec(shuffle_inputs.pop) end shuffle_results = Array.new(NUM_REDUCERS){[]} tmp_shuffle_results.each do |s| s.each_with_index do |ss, i| shuffle_results[i] += ss end end # reduce reduce_inputs = shuffle_results reduce_results = [] m_reducer = Mutex.new reducers.each do |reducer| Thread.start do result = reducer.exec reduce_inputs.pop m_reducer.synchronize {reduce_results << result} end end # show result puts reduce_results.inspect end end if $0 == __FILE__ MapReducePpoi.start 'to be or not to be, that is the question.' end
結果
$ ruby mapreduce_ppoi.rb
[ [ [".", 1], ["e", 2], ["h", 1], ["n", 1], ["q", 1], ["t", 3] ], [ ["a", 1], ["s", 2] ], [ ["i", 2], ["o", 1], ["u", 1] ] ]