Class: Riak::MapReduce

Inherits:
Object show all
Includes:
Util::Escape, Util::Translation
Defined in:
lib/riak/map_reduce.rb,
lib/riak/map_reduce/phase.rb,
lib/riak/map_reduce/filter_builder.rb

Overview

Class for invoking map-reduce jobs using the HTTP interface.

Defined Under Namespace

Classes: FilterBuilder, Phase

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Escape

#escape, #maybe_escape, #maybe_unescape, #unescape

Methods included from Util::Translation

#i18n_scope, #t

Constructor Details

#initialize(client) {|self| ... } ⇒ MapReduce

Creates a new map-reduce job.

Parameters:

  • client (Client)

    the Riak::Client interface

Yields:

  • (self)

    helpful for initializing the job



34
35
36
37
# File 'lib/riak/map_reduce.rb', line 34

def initialize(client)
  @client, @inputs, @query = client, [], []
  yield self if block_given?
end

Instance Attribute Details

#inputsArray<[bucket,key]>, ...

Returns The bucket/keys for input to the job, or the bucket (all keys), or a hash containing the bucket and key-filters.

Returns:

  • (Array<[bucket,key]>, String, Hash<:bucket,:filters>)

    The bucket/keys for input to the job, or the bucket (all keys), or a hash containing the bucket and key-filters.

See Also:



23
24
25
# File 'lib/riak/map_reduce.rb', line 23

def inputs
  @inputs
end

#queryArray<Phase>

Returns The map and reduce phases that will be executed.

Returns:

  • (Array<Phase>)

    The map and reduce phases that will be executed

See Also:



29
30
31
# File 'lib/riak/map_reduce.rb', line 29

def query
  @query
end

Instance Method Details

#add(bucket) ⇒ MapReduce #add(bucket, key) ⇒ MapReduce #add(object) ⇒ MapReduce #add(bucket, key, keydata) ⇒ MapReduce #add(bucket, filters) ⇒ MapReduce Also known as: <<, include

Add or replace inputs for the job.

Overloads:

  • #add(bucket) ⇒ MapReduce

    Run the job across all keys in the bucket. This will replace any other inputs previously added.

    Parameters:

  • #add(bucket, key) ⇒ MapReduce

    Add a bucket/key pair to the job.

    Parameters:

    • bucket (String, Bucket)

      the bucket of the object

    • key (String)

      the key of the object

  • #add(object) ⇒ MapReduce

    Add an object to the job (by its bucket/key)

    Parameters:

    • object (RObject)

      the object to add to the inputs

  • #add(bucket, key, keydata) ⇒ MapReduce

    Parameters:

    • bucket (String, Bucket)

      the bucket of the object

    • key (String)

      the key of the object

    • keydata (String)

      extra data to pass along with the object to the job

  • #add(bucket, filters) ⇒ MapReduce

    Run the job across all keys in the bucket, with the given key-filters. This will replace any other inputs previously added. (Requires Riak 0.14)

    Parameters:

    • bucket (String, Bucket)

      the bucket to filter keys from

    • filters (Array<Array>)

      a list of key-filters to apply to the key list

Returns:



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/riak/map_reduce.rb', line 62

def add(*params)
  params = params.dup
  params = params.first if Array === params.first
  case params.size
  when 1
    p = params.first
    case p
    when Bucket
      warn(t('full_bucket_mapred', :backtrace => caller.join("\n    "))) unless Riak.disable_list_keys_warnings
      @inputs = maybe_escape(p.name)
    when RObject
      @inputs << [maybe_escape(p.bucket.name), maybe_escape(p.key)]
    when String
      warn(t('full_bucket_mapred', :backtrace => caller.join("\n    "))) unless Riak.disable_list_keys_warnings
      @inputs = maybe_escape(p)
    end
  when 2..3
    bucket = params.shift
    bucket = bucket.name if Bucket === bucket
    if Array === params.first
      warn(t('full_bucket_mapred', :backtrace => caller.join("\n    "))) unless Riak.disable_list_keys_warnings
      @inputs = {:bucket => maybe_escape(bucket), :key_filters => params.first }
    else
      key = params.shift
      @inputs << params.unshift(maybe_escape(key)).unshift(maybe_escape(bucket))
    end
  end
  self
end

#filter(bucket) { ... } ⇒ MapReduce

Adds a bucket and key-filters built by the given block. Equivalent to #add with a list of filters.

Parameters:

  • bucket (String)

    the bucket to apply key-filters to

Yields:

  • builder block - instance_eval’ed into a FilterBuilder

Returns:

See Also:



100
101
102
# File 'lib/riak/map_reduce.rb', line 100

def filter(bucket, &block)
  add(bucket, FilterBuilder.new(&block).to_a)
end

#index(bucket, index, query) ⇒ MapReduce

(Secondary Indexes) Use a secondary index query to start a map/reduce job.

Parameters:

  • bucket (String, Bucket)

    the bucket whose index to query

  • index (String)

    the index to query

  • query (String, Integer, Range)

    the value of the index, or a range of values (of Strings or Integers)

Returns:



121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/riak/map_reduce.rb', line 121

def index(bucket, index, query)
  bucket = bucket.name if bucket.respond_to?(:name)
  case query
  when String, Fixnum
    @inputs = {:bucket => maybe_escape(bucket), :index => index, :key => query}
  when Range
    raise ArgumentError, t('invalid_index_query', :value => query.inspect) unless String === query.begin || Integer === query.begin
    @inputs = {:bucket => maybe_escape(bucket), :index => index, :start => query.begin, :end => query.end}
  else
    raise ArgumentError, t('invalid_index_query', :value => query.inspect)
  end
  self
end

Add a link phase to the job. Link phases follow links attached to objects automatically (a special case of map).

Overloads:

  • #link(walk_spec, options = {}) ⇒ MapReduce

    Parameters:

  • #link(bucket, tag, keep, options = {}) ⇒ MapReduce

    Parameters:

    • bucket (String, nil)

      the bucket to limit links to

    • tag (String, nil)

      the tag to limit links to

    • keep (Boolean)

      whether to keep results of this phase (overrides the phase options)

    • options (Hash) (defaults to: {})

      extra options for the phase (see Riak::MapReduce::Phase#initialize)

  • #link(options) ⇒ MapReduce

    Parameters:

    • options (Hash)

      options for both the walk spec and link phase

    See Also:

Returns:

See Also:



177
178
179
180
181
182
183
# File 'lib/riak/map_reduce.rb', line 177

def link(*params)
  options = params.extract_options!
  walk_spec_options = options.slice!(:type, :function, :language, :arg) unless params.first
  walk_spec = WalkSpec.normalize(params.shift || walk_spec_options).first
  @query << Phase.new({:type => :link, :function => walk_spec}.merge(options))
  self
end

#map(function) ⇒ MapReduce #map(function?, options) ⇒ MapReduce

Add a map phase to the job.

Overloads:

  • #map(function) ⇒ MapReduce

    Parameters:

    • function (String, Array)

      a Javascript function that represents the phase, or an Erlang [module,function] pair

  • #map(function?, options) ⇒ MapReduce

    Parameters:

Returns:

See Also:



143
144
145
146
147
# File 'lib/riak/map_reduce.rb', line 143

def map(*params)
  options = params.extract_options!
  @query << Phase.new({:type => :map, :function => params.shift}.merge(options))
  self
end

#reduce(function) ⇒ MapReduce #reduce(function?, options) ⇒ MapReduce

Add a reduce phase to the job.

Overloads:

  • #reduce(function) ⇒ MapReduce

    Parameters:

    • function (String, Array)

      a Javascript function that represents the phase, or an Erlang [module,function] pair

  • #reduce(function?, options) ⇒ MapReduce

    Parameters:

Returns:

See Also:



157
158
159
160
161
# File 'lib/riak/map_reduce.rb', line 157

def reduce(*params)
  options = params.extract_options!
  @query << Phase.new({:type => :reduce, :function => params.shift}.merge(options))
  self
end

#runArray<Array> #run {|phase, data| ... } ⇒ nil

Executes this map-reduce job.

Overloads:

  • #runArray<Array>

    Return the entire collection of results.

    Returns:

    • (Array<Array>)

      similar to link-walking, each element is an array of results from a phase where “keep” is true. If there is only one “keep” phase, only the results from that phase will be returned.

  • #run {|phase, data| ... } ⇒ nil

    Stream the results through the given block without accumulating.

    Yields:

    • (phase, data)

      A block to stream results through

    Yield Parameters:

    • phase (Fixnum)

      the phase from which the results were generated

    • data (Array)

      a list of results from the phase

    Returns:

    • (nil)

      nothing



215
216
217
218
219
220
221
222
223
# File 'lib/riak/map_reduce.rb', line 215

def run(&block)
  @client.mapred(self, &block)
rescue FailedRequest => fr
  if fr.server_error? && fr.is_json?
    raise MapReduceError.new(fr.body)
  else
    raise fr
  end
end

#search(bucket, query) ⇒ MapReduce

(Riak Search) Use a search query to start a map/reduce job.

Parameters:

  • bucket (String, Bucket)

    the bucket/index to search

  • query (String)

    the query to run

Returns:



108
109
110
111
112
# File 'lib/riak/map_reduce.rb', line 108

def search(bucket, query)
  bucket = bucket.name if bucket.respond_to?(:name)
  @inputs = {:module => "riak_search", :function => "mapred_search", :arg => [bucket, query]}
  self
end

#timeout(value) ⇒ Object Also known as: timeout=

Sets the timeout for the map-reduce job.

Parameters:

  • value (Fixnum)

    the job timeout, in milliseconds



187
188
189
190
# File 'lib/riak/map_reduce.rb', line 187

def timeout(value)
  @timeout = value
  return self
end

#to_json(*a) ⇒ String

Convert the job to JSON for submission over the HTTP interface.

Returns:

  • (String)

    the JSON representation



195
196
197
198
199
# File 'lib/riak/map_reduce.rb', line 195

def to_json(*a)
  hash = {"inputs" => inputs, "query" => query.map(&:as_json)}
  hash['timeout'] = @timeout.to_i if @timeout
  hash.to_json(*a)
end