Class: Riakpb::MapReduce

Inherits:
Object
  • Object
show all
Includes:
Util::Translation
Defined in:
lib/riakpb/map_reduce.rb

Overview

Class for invoking map-reduce jobs using the HTTP interface.

Defined Under Namespace

Classes: Phase

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Translation

#i18n_scope, #t

Constructor Details

#initialize(client) {|self| ... } ⇒ MapReduce

Creates a new map-reduce job.

Yields:

  • (self)

    helpful for initializing the job



20
21
22
23
# File 'lib/riakpb/map_reduce.rb', line 20

def initialize(client)
  @client, @inputs, @query = client, [], []
  yield self if block_given?
end

Instance Attribute Details

#inputsArray<[bucket,key]>, String

Returns The bucket/keys for input to the job, or the bucket (all keys).

See Also:



9
10
11
# File 'lib/riakpb/map_reduce.rb', line 9

def inputs
  @inputs
end

#queryArray<Phase>

Returns The map and reduce phases that will be executed.

See Also:



15
16
17
# File 'lib/riakpb/map_reduce.rb', line 15

def query
  @query
end

Instance Method Details

#add(bucket) ⇒ MapReduce #add(bucket, key) ⇒ MapReduce #add(object) ⇒ MapReduce #add(bucket, key, keydata) ⇒ MapReduce Also known as: <<, include

Add or replace inputs for the job.

Overloads:

  • #add(bucket) ⇒ MapReduce

    Run the job across all keys in the bucket. This will replace any other inputs previously added.

  • #add(bucket, key) ⇒ MapReduce

    Add a bucket/key pair to the job.

  • #add(object) ⇒ MapReduce

    Add an object to the job (by its bucket/key)



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/riakpb/map_reduce.rb', line 41

def add(*params)
  params = params.dup.flatten
  case params.size
  when 1
    p = params.first
    case p
    when Riakpb::Bucket
      @inputs = p.name
    when Riakpb::Key
      @inputs << p.to_input
    when String
      @inputs = p
    end
  when 2..3
    bucket = params.shift
    bucket = bucket.name if Riakpb::Bucket === bucket
    @inputs << params.unshift(bucket)
  end
  self
end

Add a link phase to the job. Link phases follow links attached to objects automatically (a special case of map).



95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/riakpb/map_reduce.rb', line 95

def link(params={})
  bucket          ||= params[:bucket]
  tag             ||= params[:tag]
  keep              = params[:keep] || false
  function          = {}

  function[:bucket] = bucket  unless bucket.nil?
  function[:tag]    = tag     unless tag.nil?

  @query << Phase.new({:type => :link, :function => function, :keep => keep})

  return(self)
end

#map(function) ⇒ MapReduce #map(function?, options) ⇒ MapReduce

Add a map phase to the job.



72
73
74
75
76
# File 'lib/riakpb/map_reduce.rb', line 72

def map(*params)
  options = params.extract_options!
  @query << Phase.new({:type => :map, :function => params.shift}.merge(options))
  self
end

#reduce(function) ⇒ MapReduce #reduce(function?, options) ⇒ MapReduce

Add a reduce phase to the job.



86
87
88
89
90
# File 'lib/riakpb/map_reduce.rb', line 86

def reduce(*params)
  options = params.extract_options!
  @query << Phase.new({:type => :reduce, :function => params.shift}.merge(options))
  self
end

#runArray<Array>

Executes this map-reduce job.



126
127
128
129
# File 'lib/riakpb/map_reduce.rb', line 126

def run
  response = @client.map_reduce_request(to_json, "application/json")
#      ActiveSupport::JSON.decode(response[:body])
end

#timeout(value) ⇒ Object

Sets the timeout for the map-reduce job.



112
113
114
# File 'lib/riakpb/map_reduce.rb', line 112

def timeout(value)
  @timeout = value
end

#to_json(options = {}) ⇒ String

Convert the job to JSON for submission over the HTTP interface.



118
119
120
121
122
# File 'lib/riakpb/map_reduce.rb', line 118

def to_json(options={})
  hash = {"inputs" => inputs, "query" => query.map(&:as_json)}
  hash['timeout'] = @timeout.to_i if @timeout
  ActiveSupport::JSON.encode(hash, options)
end