Class: Riak::Multi

Inherits:
Object show all
Includes:
Util::Translation
Defined in:
lib/riak/multi.rb

Overview

Coordinates a parallel operation for multiple keys.

Direct Known Subclasses

Multiexist, Multiget

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util::Translation

#i18n_scope, #t

Constructor Details

#initialize(client, keys) ⇒ Multi

Create a Riak Multi operation.

Parameters:

Raises:

  • (ArgumentError)

    when a non-positive-Integer count is given for threads



35
36
37
38
39
40
41
42
43
44
45
# File 'lib/riak/multi.rb', line 35

def initialize(client, keys)
  raise ArgumentError, t('client_type', :client => client.inspect) unless client.is_a? Riak::Client
  raise ArgumentError, t('array_type', :array => keys.inspect) unless keys.is_a? Array

  self.thread_count = client.multi_threads
  validate_keys keys
  @client = client
  @keys = keys.uniq
  self.result_hash = {}
  @finished = false
end

Instance Attribute Details

#clientRiak::Client (readonly)

Returns the associated client.

Returns:



10
11
12
# File 'lib/riak/multi.rb', line 10

def client
  @client
end

#keysArray<Bucket, String> (readonly)

Returns fetch_list an Array of Bucket and String keys.

Returns:



13
14
15
# File 'lib/riak/multi.rb', line 13

def keys
  @keys
end

#result_hashHash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances

Returns Hash<fetch_list_entry, RObject] result_hash a Hash of Bucket and String key pairs to RObject instances.

Returns:



16
17
18
# File 'lib/riak/multi.rb', line 16

def result_hash
  @result_hash
end

#thread_countInteger

Returns The number of threads to use.

Returns:

  • (Integer)

    The number of threads to use



19
20
21
# File 'lib/riak/multi.rb', line 19

def thread_count
  @thread_count
end

Class Method Details

.perform(client, keys) ⇒ Hash<key, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances

Perform a Riak Multi operation.

Parameters:

Returns:



25
26
27
28
29
# File 'lib/riak/multi.rb', line 25

def self.perform(client, keys)
  multi = new client, keys
  multi.perform
  multi.results
end

Instance Method Details

#finished?Boolean Also known as: finished

Returns:

  • (Boolean)


76
77
78
# File 'lib/riak/multi.rb', line 76

def finished?
  @finished ||= @threads && @threads.none?(&:alive?)
end

#performObject

Starts the parallelized operation



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/riak/multi.rb', line 48

def perform
  queue = keys.dup
  queue_mutex = Mutex.new
  result_mutex = Mutex.new

  @threads = 1.upto(thread_count).map do |_node|
    Thread.new do
      loop do
        pair = queue_mutex.synchronize do
          queue.shift
        end

        break if pair.nil?

        found = work(*pair)
        result_mutex.synchronize do
          result_hash[pair] = found
        end
      end
    end
  end
end

#resultsObject



71
72
73
74
# File 'lib/riak/multi.rb', line 71

def results
  wait_for_finish
  result_hash
end

#wait_for_finishObject



81
82
83
84
85
# File 'lib/riak/multi.rb', line 81

def wait_for_finish
  return if finished?
  @threads.each(&:join)
  @finished = true
end