Class: Riak::MapReduce

Inherits:
Object show all
Includes:
Util::Escape, Util::Translation
Defined in:
lib/riak/map_reduce.rb,
lib/riak/map_reduce/phase.rb,
lib/riak/map_reduce/results.rb,
lib/riak/map_reduce/filter_builder.rb

Overview

Class for invoking map-reduce jobs using the HTTP interface.

Defined Under Namespace

Classes: FilterBuilder, Phase, Results

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Escape

#escape, #maybe_escape, #maybe_unescape, #unescape

Methods included from Util::Translation

#i18n_scope, #t

Constructor Details

#initialize(client) {|self| ... } ⇒ MapReduce

Creates a new map-reduce job.

Yields:

  • (self)

    helpful for initializing the job


36
37
38
39
# File 'lib/riak/map_reduce.rb', line 36

def initialize(client)
  @client, @inputs, @query = client, [], []
  yield self if block_given?
end

Instance Attribute Details

#inputsArray<[bucket,key]>, ...

Returns The bucket/keys for input to the job, or the bucket (all keys), or a hash containing the bucket and key-filters.

See Also:


25
26
27
# File 'lib/riak/map_reduce.rb', line 25

def inputs
  @inputs
end

#queryArray<Phase>

Returns The map and reduce phases that will be executed

See Also:


31
32
33
# File 'lib/riak/map_reduce.rb', line 31

def query
  @query
end

Instance Method Details

#add(bucket) ⇒ MapReduce #add(bucket, key) ⇒ MapReduce #add(object) ⇒ MapReduce #add(bucket, key, keydata) ⇒ MapReduce #add(bucket, filters) ⇒ MapReduce Also known as: <<, include

Add or replace inputs for the job.

Overloads:

  • #add(bucket) ⇒ MapReduce

    Run the job across all keys in the bucket. This will replace any other inputs previously added.

  • #add(bucket, key) ⇒ MapReduce

    Add a bucket/key pair to the job.

  • #add(object) ⇒ MapReduce

    Add an object to the job (by its bucket/key)

  • #add(bucket, filters) ⇒ MapReduce

    Run the job across all keys in the bucket, with the given key-filters. This will replace any other inputs previously added. (Requires Riak 0.14)


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/riak/map_reduce.rb', line 64

def add(*params)
  params = params.dup
  params = params.first if Array === params.first
  case params.size
  when 1
    p = params.first
    case p
    when Bucket
      @inputs = bucket_input(p)
    when RObject
      @inputs << robject_input(p)
    when String
      warn(t('full_bucket_mapred', :backtrace => caller.join("\n    "))) unless Riak.disable_list_keys_warnings
      @inputs = maybe_escape(p)
    end
  when 2..3
    bucket = params.shift

    if Array === params.first
      if bucket.is_a? Bucket
        bucket = bucket_input(bucket)
      else
        bucket = maybe_escape(bucket)
      end

      warn(t('full_bucket_mapred', :backtrace => caller.join("\n    "))) unless Riak.disable_list_keys_warnings
      @inputs = {:bucket => bucket, :key_filters => params.first }
    else
      key = params.shift
      key_data = params.shift || ''
      @inputs << key_input(key, bucket, key_data)
    end
  end
  self
end

#filter(bucket) { ... } ⇒ MapReduce

Adds a bucket and key-filters built by the given block. Equivalent to #add with a list of filters.

Yields:

  • builder block - instance_eval'ed into a FilterBuilder

See Also:


108
109
110
# File 'lib/riak/map_reduce.rb', line 108

def filter(bucket, &block)
  add(bucket, FilterBuilder.new(&block).to_a)
end

#index(bucket, index, query) ⇒ MapReduce

(Secondary Indexes) Use a secondary index query to start a map/reduce job.


130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/riak/map_reduce.rb', line 130

def index(bucket, index, query)
  if bucket.is_a? Bucket
    bucket = bucket.needs_type? ? [maybe_escape(bucket.type.name), maybe_escape(bucket.name)] : maybe_escape(bucket.name)
  else
    bucket = maybe_escape(bucket)
  end

  case query
  when String, Fixnum
    @inputs = {:bucket => bucket, :index => index, :key => query}
  when Range
    raise ArgumentError, t('invalid_index_query', :value => query.inspect) unless String === query.begin || Integer === query.begin
    @inputs = {:bucket => bucket, :index => index, :start => query.begin, :end => query.end}
  else
    raise ArgumentError, t('invalid_index_query', :value => query.inspect)
  end
  self
end

Add a link phase to the job. Link phases follow links attached to objects automatically (a special case of map).

Overloads:

See Also:


191
192
193
194
195
196
197
# File 'lib/riak/map_reduce.rb', line 191

def link(*params)
  options = params.extract_options!
  walk_spec_options = options.slice!(:type, :function, :language, :arg) unless params.first
  walk_spec = WalkSpec.normalize(params.shift || walk_spec_options).first
  @query << Phase.new({:type => :link, :function => walk_spec}.merge(options))
  self
end

#map(function) ⇒ MapReduce #map(function?, options) ⇒ MapReduce

Add a map phase to the job.


157
158
159
160
161
# File 'lib/riak/map_reduce.rb', line 157

def map(*params)
  options = params.extract_options!
  @query << Phase.new({:type => :map, :function => params.shift}.merge(options))
  self
end

#reduce(function) ⇒ MapReduce #reduce(function?, options) ⇒ MapReduce

Add a reduce phase to the job.


171
172
173
174
175
# File 'lib/riak/map_reduce.rb', line 171

def reduce(*params)
  options = params.extract_options!
  @query << Phase.new({:type => :reduce, :function => params.shift}.merge(options))
  self
end

#runArray<Array> #run {|phase, data| ... } ⇒ nil

Executes this map-reduce job.

Overloads:

  • #runArray<Array>

    Return the entire collection of results.

  • #run {|phase, data| ... } ⇒ nil

    Stream the results through the given block without accumulating.

    Yields:

    • (phase, data)

      A block to stream results through

    Yield Parameters:

    • phase (Fixnum)

      the phase from which the results were generated

    • data (Array)

      a list of results from the phase


229
230
231
232
233
234
235
236
237
# File 'lib/riak/map_reduce.rb', line 229

def run(&block)
  @client.mapred(self, &block)
rescue FailedRequest => fr
  if fr.server_error? && fr.is_json?
    raise MapReduceError.new(fr.body)
  else
    raise fr
  end
end

#search(index, query) ⇒ MapReduce

(Riak Search) Use a search query to start a map/reduce job.


117
118
119
120
121
# File 'lib/riak/map_reduce.rb', line 117

def search(index, query)
  index = index.name if index.respond_to?(:name)
  @inputs = {:module => "yokozuna", :function => "mapred_search", :arg => [index, query]}
  self
end

#timeout(value) ⇒ Object Also known as: timeout=

Sets the timeout for the map-reduce job.


201
202
203
204
# File 'lib/riak/map_reduce.rb', line 201

def timeout(value)
  @timeout = value
  return self
end

#to_json(*a) ⇒ String

Convert the job to JSON for submission over the HTTP interface.


209
210
211
212
213
# File 'lib/riak/map_reduce.rb', line 209

def to_json(*a)
  hash = {"inputs" => inputs, "query" => query.map(&:as_json)}
  hash['timeout'] = @timeout.to_i if @timeout
  hash.to_json(*a)
end