Class: Riak::MapReduce

Inherits:
Object show all
Includes:
Util::Escape, Util::Translation
Defined in:
lib/riak/map_reduce.rb,
lib/riak/map_reduce/phase.rb,
lib/riak/map_reduce/results.rb,
lib/riak/map_reduce/filter_builder.rb

Overview

Class for invoking map-reduce jobs using the HTTP interface.

Defined Under Namespace

Classes: FilterBuilder, Phase, Results

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Escape

#escape, #maybe_escape, #maybe_unescape, #unescape

Methods included from Util::Translation

#i18n_scope, #t

Constructor Details

#initialize(client) {|self| ... } ⇒ MapReduce

Creates a new map-reduce job.

Parameters:

  • client (Client)

    the Riak::Client interface

Yields:

  • (self)

    helpful for initializing the job



36
37
38
39
# File 'lib/riak/map_reduce.rb', line 36

def initialize(client)
  @client, @inputs, @query = client, [], []
  yield self if block_given?
end

Instance Attribute Details

#inputsArray<[bucket,key]>, ...

Returns The bucket/keys for input to the job, or the bucket (all keys), or a hash containing the bucket and key-filters.

Returns:

  • (Array<[bucket,key]>, String, Hash<:bucket,:filters>)

    The bucket/keys for input to the job, or the bucket (all keys), or a hash containing the bucket and key-filters.

See Also:



25
26
27
# File 'lib/riak/map_reduce.rb', line 25

def inputs
  @inputs
end

#queryArray<Phase>

Returns The map and reduce phases that will be executed

Returns:

  • (Array<Phase>)

    The map and reduce phases that will be executed

See Also:



31
32
33
# File 'lib/riak/map_reduce.rb', line 31

def query
  @query
end

Instance Method Details

#add(bucket) ⇒ MapReduce #add(bucket, key) ⇒ MapReduce #add(object) ⇒ MapReduce #add(bucket, key, keydata) ⇒ MapReduce #add(bucket, filters) ⇒ MapReduce Also known as: <<, include

Add or replace inputs for the job.

Overloads:

  • #add(bucket) ⇒ MapReduce

    Run the job across all keys in the bucket. This will replace any other inputs previously added.

    Parameters:

  • #add(bucket, key) ⇒ MapReduce

    Add a bucket/key pair to the job.

    Parameters:

    • bucket (String, Bucket)

      the bucket of the object

    • key (String)

      the key of the object

  • #add(object) ⇒ MapReduce

    Add an object to the job (by its bucket/key)

    Parameters:

    • object (RObject)

      the object to add to the inputs

  • #add(bucket, key, keydata) ⇒ MapReduce

    Parameters:

    • bucket (String, Bucket)

      the bucket of the object

    • key (String)

      the key of the object

    • keydata (String)

      extra data to pass along with the object to the job

  • #add(bucket, filters) ⇒ MapReduce

    Run the job across all keys in the bucket, with the given key-filters. This will replace any other inputs previously added. (Requires Riak 0.14)

    Parameters:

    • bucket (String, Bucket)

      the bucket to filter keys from

    • filters (Array<Array>)

      a list of key-filters to apply to the key list

Returns:



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/riak/map_reduce.rb', line 64

def add(*params)
  params = params.dup
  params = params.first if Array === params.first
  case params.size
  when 1
    p = params.first
    case p
    when Bucket
      @inputs = bucket_input(p)
    when RObject
      @inputs << robject_input(p)
    when String
      warn(t('full_bucket_mapred', :backtrace => caller.join("\n    "))) unless Riak.disable_list_keys_warnings
      @inputs = maybe_escape(p)
    end
  when 2..3
    bucket = params.shift

    if Array === params.first
      if bucket.is_a? Bucket
        bucket = bucket_input(bucket)
      else
        bucket = maybe_escape(bucket)
      end

      warn(t('full_bucket_mapred', :backtrace => caller.join("\n    "))) unless Riak.disable_list_keys_warnings
      @inputs = {:bucket => bucket, :key_filters => params.first }
    else
      key = params.shift
      key_data = params.shift || ''
      @inputs << key_input(key, bucket, key_data)
    end
  end
  self
end

#filter(bucket) { ... } ⇒ MapReduce

Adds a bucket and key-filters built by the given block. Equivalent to #add with a list of filters.

Parameters:

  • bucket (String)

    the bucket to apply key-filters to

Yields:

  • builder block - instance_eval'ed into a FilterBuilder

Returns:

See Also:



108
109
110
# File 'lib/riak/map_reduce.rb', line 108

def filter(bucket, &block)
  add(bucket, FilterBuilder.new(&block).to_a)
end

#index(bucket, index, query) ⇒ MapReduce

(Secondary Indexes) Use a secondary index query to start a map/reduce job.

Parameters:

  • bucket (String, Bucket)

    the bucket whose index to query

  • index (String)

    the index to query

  • query (String, Integer, Range)

    the value of the index, or a range of values (of Strings or Integers)

Returns:



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/riak/map_reduce.rb', line 130

def index(bucket, index, query)
  if bucket.is_a? Bucket
    bucket = bucket.needs_type? ? [maybe_escape(bucket.type.name), maybe_escape(bucket.name)] : maybe_escape(bucket.name)
  else
    bucket = maybe_escape(bucket)
  end

  case query
  when String, Fixnum
    @inputs = {:bucket => bucket, :index => index, :key => query}
  when Range
    raise ArgumentError, t('invalid_index_query', :value => query.inspect) unless String === query.begin || Integer === query.begin
    @inputs = {:bucket => bucket, :index => index, :start => query.begin, :end => query.end}
  else
    raise ArgumentError, t('invalid_index_query', :value => query.inspect)
  end
  self
end

Add a link phase to the job. Link phases follow links attached to objects automatically (a special case of map).

Overloads:

  • #link(walk_spec, options = {}) ⇒ MapReduce

    Parameters:

  • #link(bucket, tag, keep, options = {}) ⇒ MapReduce

    Parameters:

    • bucket (String, nil)

      the bucket to limit links to

    • tag (String, nil)

      the tag to limit links to

    • keep (Boolean)

      whether to keep results of this phase (overrides the phase options)

    • options (Hash) (defaults to: {})

      extra options for the phase (see Riak::MapReduce::Phase#initialize)

  • #link(options) ⇒ MapReduce

    Parameters:

    • options (Hash)

      options for both the walk spec and link phase

    See Also:

Returns:

See Also:



191
192
193
194
195
196
197
# File 'lib/riak/map_reduce.rb', line 191

def link(*params)
  options = params.extract_options!
  walk_spec_options = options.slice!(:type, :function, :language, :arg) unless params.first
  walk_spec = WalkSpec.normalize(params.shift || walk_spec_options).first
  @query << Phase.new({:type => :link, :function => walk_spec}.merge(options))
  self
end

#map(function) ⇒ MapReduce #map(function?, options) ⇒ MapReduce

Add a map phase to the job.

Overloads:

  • #map(function) ⇒ MapReduce

    Parameters:

    • function (String, Array)

      a Javascript function that represents the phase, or an Erlang [module,function] pair

  • #map(function?, options) ⇒ MapReduce

    Parameters:

Returns:

See Also:



157
158
159
160
161
# File 'lib/riak/map_reduce.rb', line 157

def map(*params)
  options = params.extract_options!
  @query << Phase.new({:type => :map, :function => params.shift}.merge(options))
  self
end

#reduce(function) ⇒ MapReduce #reduce(function?, options) ⇒ MapReduce

Add a reduce phase to the job.

Overloads:

  • #reduce(function) ⇒ MapReduce

    Parameters:

    • function (String, Array)

      a Javascript function that represents the phase, or an Erlang [module,function] pair

  • #reduce(function?, options) ⇒ MapReduce

    Parameters:

Returns:

See Also:



171
172
173
174
175
# File 'lib/riak/map_reduce.rb', line 171

def reduce(*params)
  options = params.extract_options!
  @query << Phase.new({:type => :reduce, :function => params.shift}.merge(options))
  self
end

#runArray<Array> #run {|phase, data| ... } ⇒ nil

Executes this map-reduce job.

Overloads:

  • #runArray<Array>

    Return the entire collection of results.

    Returns:

    • (Array<Array>)

      similar to link-walking, each element is an array of results from a phase where “keep” is true. If there is only one “keep” phase, only the results from that phase will be returned.

  • #run {|phase, data| ... } ⇒ nil

    Stream the results through the given block without accumulating.

    Yields:

    • (phase, data)

      A block to stream results through

    Yield Parameters:

    • phase (Fixnum)

      the phase from which the results were generated

    • data (Array)

      a list of results from the phase

    Returns:

    • (nil)

      nothing



229
230
231
232
233
234
235
236
237
# File 'lib/riak/map_reduce.rb', line 229

def run(&block)
  @client.mapred(self, &block)
rescue FailedRequest => fr
  if fr.server_error? && fr.is_json?
    raise MapReduceError.new(fr.body)
  else
    raise fr
  end
end

#search(index, query) ⇒ MapReduce

(Riak Search) Use a search query to start a map/reduce job.

Parameters:

Returns:



117
118
119
120
121
# File 'lib/riak/map_reduce.rb', line 117

def search(index, query)
  index = index.name if index.respond_to?(:name)
  @inputs = {:module => "yokozuna", :function => "mapred_search", :arg => [index, query]}
  self
end

#timeout(value) ⇒ Object Also known as: timeout=

Sets the timeout for the map-reduce job.

Parameters:

  • value (Fixnum)

    the job timeout, in milliseconds



201
202
203
204
# File 'lib/riak/map_reduce.rb', line 201

def timeout(value)
  @timeout = value
  return self
end

#to_json(*a) ⇒ String

Convert the job to JSON for submission over the HTTP interface.

Returns:

  • (String)

    the JSON representation



209
210
211
212
213
# File 'lib/riak/map_reduce.rb', line 209

def to_json(*a)
  hash = {"inputs" => inputs, "query" => query.map(&:as_json)}
  hash['timeout'] = @timeout.to_i if @timeout
  hash.to_json(*a)
end