Class: Blodsband::Riak::Mr

Inherits:
Object
  • Object
show all
Defined in:
lib/blodsband/riak/mr.rb

Instance Method Summary collapse

Constructor Details

#initialize(url) ⇒ Mr

Create a new map reduce reference.

Parameters:

  • url (::URI)

    a URI pointing to the HTTP port of a Riak node.



13
14
15
16
17
# File 'lib/blodsband/riak/mr.rb', line 13

def initialize(url)
  @url = url
  @inputs = nil
  @phases = []
end

Instance Method Details

#arunBlodsband::Future<Blodsband::Riak::Response>

Execute this map reduce.

Returns:



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/blodsband/riak/mr.rb', line 81

def arun
  m = Multi.new
  m.add(:mr, 
        EM::HttpRequest.new(URI.join(@url.to_s, "mapred")).
        apost(:head => {
                "Content-Type" => "application/json"
              },
              :body => Yajl::Encoder.encode(body)))
  return(Future.new do
           m.really_perform
           if err = m.responses[:errback][:mr]
             Error.new(err)
           else
             Yajl::Parser.parse(m.responses[:callback][:mr].response)
           end
         end)
end

#bodyHash<String, Object>

The body to use in the HTTP request used to execute this map reduce. Mostly for debugging purposes.

Returns:

  • (Hash<String, Object>)

    a Hash to json-encode and send as body when executing the map reduce.



60
61
62
63
64
65
# File 'lib/blodsband/riak/mr.rb', line 60

def body
  {
    "inputs" => @inputs,
    "query" => @phases
  }
end

#inputs(hash) ⇒ Blodsband::Riak::Mr

Add inputs to the map reduce.

Parameters:

Returns:



26
27
28
29
# File 'lib/blodsband/riak/mr.rb', line 26

def inputs(hash)
  @inputs = hash
  self
end

#map(hash) ⇒ Blodsband::Riak::Mr

Add a map phase to the map reduce.

Parameters:

Returns:



38
39
40
41
# File 'lib/blodsband/riak/mr.rb', line 38

def map(hash)
  @phases << {"map" => hash}
  self
end

#reduce(hash) ⇒ Blodsband::Riak::Mr

Add a reduce phase to the map reduce.

Parameters:

Returns:



50
51
52
53
# File 'lib/blodsband/riak/mr.rb', line 50

def reduce(hash)
  @phases << {"reduce" => hash}
  self
end

#runBlodsband::Riak::Response

Execute this map reduce.

Returns:



72
73
74
# File 'lib/blodsband/riak/mr.rb', line 72

def run
  arun.get
end