Class: Riak::MapReduce
- Includes:
- Util::Escape, Util::Translation
- Defined in:
- lib/riak/map_reduce.rb,
lib/riak/map_reduce/phase.rb,
lib/riak/map_reduce/results.rb,
lib/riak/map_reduce/filter_builder.rb
Overview
Class for invoking map-reduce jobs using the HTTP interface.
Defined Under Namespace
Classes: FilterBuilder, Phase, Results
Instance Attribute Summary collapse
-
#inputs ⇒ Array<[bucket,key]>, ...
The bucket/keys for input to the job, or the bucket (all keys), or a hash containing the bucket and key-filters.
-
#query ⇒ Array<Phase>
The map and reduce phases that will be executed.
Instance Method Summary collapse
-
#add(*params) ⇒ MapReduce
(also: #<<, #include)
Add or replace inputs for the job.
-
#filter(bucket) { ... } ⇒ MapReduce
Adds a bucket and key-filters built by the given block.
-
#index(bucket, index, query) ⇒ MapReduce
(Secondary Indexes) Use a secondary index query to start a map/reduce job.
-
#initialize(client) {|self| ... } ⇒ MapReduce
constructor
Creates a new map-reduce job.
-
#link(*params) ⇒ MapReduce
Add a link phase to the job.
-
#map(*params) ⇒ MapReduce
Add a map phase to the job.
-
#reduce(*params) ⇒ MapReduce
Add a reduce phase to the job.
-
#run(&block) ⇒ Object
Executes this map-reduce job.
-
#search(index, query) ⇒ MapReduce
(Riak Search) Use a search query to start a map/reduce job.
-
#timeout(value) ⇒ Object
(also: #timeout=)
Sets the timeout for the map-reduce job.
-
#to_json(*a) ⇒ String
Convert the job to JSON for submission over the HTTP interface.
Methods included from Util::Escape
#escape, #maybe_escape, #maybe_unescape, #unescape
Methods included from Util::Translation
Constructor Details
#initialize(client) {|self| ... } ⇒ MapReduce
Creates a new map-reduce job.
51 52 53 54 |
# File 'lib/riak/map_reduce.rb', line 51 def initialize(client) @client, @inputs, @query = client, [], [] yield self if block_given? end |
Instance Attribute Details
#inputs ⇒ Array<[bucket,key]>, ...
Returns The bucket/keys for input to the job, or the bucket (all keys), or a hash containing the bucket and key-filters.
40 41 42 |
# File 'lib/riak/map_reduce.rb', line 40 def inputs @inputs end |
Instance Method Details
#add(bucket) ⇒ MapReduce #add(bucket, key) ⇒ MapReduce #add(object) ⇒ MapReduce #add(bucket, key, keydata) ⇒ MapReduce #add(bucket, filters) ⇒ MapReduce Also known as: <<, include
Add or replace inputs for the job.
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/riak/map_reduce.rb', line 79 def add(*params) params = params.dup params = params.first if Array === params.first case params.size when 1 p = params.first case p when Bucket @inputs = bucket_input(p) when RObject @inputs << robject_input(p) when String maybe_raise_list_exception(caller) @inputs = maybe_escape(p) end when 2..3 bucket = params.shift if Array === params.first if bucket.is_a? Bucket bucket = bucket_input(bucket) else bucket = maybe_escape(bucket) end maybe_raise_list_exception(caller) @inputs = {:bucket => bucket, :key_filters => params.first } else key = params.shift key_data = params.shift || '' @inputs << key_input(key, bucket, key_data) end end self end |
#filter(bucket) { ... } ⇒ MapReduce
Adds a bucket and key-filters built by the given block. Equivalent to #add with a list of filters.
121 122 123 |
# File 'lib/riak/map_reduce.rb', line 121 def filter(bucket, &block) add(bucket, FilterBuilder.new(&block).to_a) end |
#index(bucket, index, query) ⇒ MapReduce
(Secondary Indexes) Use a secondary index query to start a map/reduce job.
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/riak/map_reduce.rb', line 143 def index(bucket, index, query) if bucket.is_a? Bucket bucket = bucket.needs_type? ? [maybe_escape(bucket.type.name), maybe_escape(bucket.name)] : maybe_escape(bucket.name) else bucket = maybe_escape(bucket) end case query when String, Fixnum @inputs = {:bucket => bucket, :index => index, :key => query} when Range raise ArgumentError, t('invalid_index_query', :value => query.inspect) unless String === query.begin || Integer === query.begin @inputs = {:bucket => bucket, :index => index, :start => query.begin, :end => query.end} else raise ArgumentError, t('invalid_index_query', :value => query.inspect) end self end |
#link(walk_spec, options = {}) ⇒ MapReduce #link(bucket, tag, keep, options = {}) ⇒ MapReduce #link(options) ⇒ MapReduce
Add a link phase to the job. Link phases follow links attached to objects automatically (a special case of map).
204 205 206 207 208 209 210 |
# File 'lib/riak/map_reduce.rb', line 204 def link(*params) = params. = .slice!(:type, :function, :language, :arg) unless params.first walk_spec = WalkSpec.normalize(params.shift || ).first @query << Phase.new({:type => :link, :function => walk_spec}.merge()) self end |
#map(function) ⇒ MapReduce #map(function?, options) ⇒ MapReduce
Add a map phase to the job.
170 171 172 173 174 |
# File 'lib/riak/map_reduce.rb', line 170 def map(*params) = params. @query << Phase.new({:type => :map, :function => params.shift}.merge()) self end |
#reduce(function) ⇒ MapReduce #reduce(function?, options) ⇒ MapReduce
Add a reduce phase to the job.
184 185 186 187 188 |
# File 'lib/riak/map_reduce.rb', line 184 def reduce(*params) = params. @query << Phase.new({:type => :reduce, :function => params.shift}.merge()) self end |
#run ⇒ Array<Array> #run {|phase, data| ... } ⇒ nil
Executes this map-reduce job.
242 243 244 245 246 247 248 249 250 |
# File 'lib/riak/map_reduce.rb', line 242 def run(&block) @client.mapred(self, &block) rescue FailedRequest => fr if fr.server_error? && fr.is_json? raise MapReduceError.new(fr.body) else raise fr end end |
#search(index, query) ⇒ MapReduce
(Riak Search) Use a search query to start a map/reduce job.
130 131 132 133 134 |
# File 'lib/riak/map_reduce.rb', line 130 def search(index, query) index = index.name if index.respond_to?(:name) @inputs = {:module => "yokozuna", :function => "mapred_search", :arg => [index, query]} self end |
#timeout(value) ⇒ Object Also known as: timeout=
Sets the timeout for the map-reduce job.
214 215 216 217 |
# File 'lib/riak/map_reduce.rb', line 214 def timeout(value) @timeout = value return self end |
#to_json(*a) ⇒ String
Convert the job to JSON for submission over the HTTP interface.
222 223 224 225 226 |
# File 'lib/riak/map_reduce.rb', line 222 def to_json(*a) hash = {"inputs" => inputs, "query" => query.map(&:as_json)} hash['timeout'] = @timeout.to_i if @timeout hash.to_json(*a) end |