Class: Riak::Multiget

Inherits:
Object show all
Includes:
Util::Translation
Defined in:
lib/riak/multiget.rb

Overview

Coordinates a parallel fetch operation for multiple values.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util::Translation

#i18n_scope, #t

Constructor Details

#initialize(client, fetch_list) ⇒ Multiget

Create a Riak Multiget operation.

Raises:

  • (ArgumentError)


37
38
39
40
41
42
43
44
45
46
# File 'lib/riak/multiget.rb', line 37

def initialize(client, fetch_list)
  raise ArgumentError, t('client_type', :client => client.inspect) unless client.is_a? Riak::Client
  raise ArgumentError, t('array_type', :array => fetch_list.inspect) unless fetch_list.is_a? Array

  validate_fetch_list fetch_list
  @client, @fetch_list = client, fetch_list.uniq
  self.result_hash = Hash.new
  @finished = false
  self.thread_count = client.multiget_threads
end

Instance Attribute Details

#clientRiak::Client (readonly)



10
11
12
# File 'lib/riak/multiget.rb', line 10

def client
  @client
end

#fetch_listArray<Bucket, String> (readonly)



13
14
15
# File 'lib/riak/multiget.rb', line 13

def fetch_list
  @fetch_list
end

#finishedBoolean (readonly)



19
20
21
# File 'lib/riak/multiget.rb', line 19

def finished
  @finished
end

#result_hashHash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances



16
17
18
# File 'lib/riak/multiget.rb', line 16

def result_hash
  @result_hash
end

#thread_countInteger



22
23
24
# File 'lib/riak/multiget.rb', line 22

def thread_count
  @thread_count
end

Class Method Details

.get_all(client, fetch_list) ⇒ Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances

Perform a Riak Multiget operation.



28
29
30
31
32
# File 'lib/riak/multiget.rb', line 28

def self.get_all(client, fetch_list)
  multi = new client, fetch_list
  multi.fetch
  multi.results
end

Instance Method Details

#fetchObject

Starts the parallelized fetch operation

Raises:

  • (ArgumentError)

    when a non-positive-Integer count is given



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/riak/multiget.rb', line 50

def fetch
  queue = fetch_list.dup
  queue_mutex = Mutex.new
  result_mutex = Mutex.new

  unless thread_count.is_a?(Integer) && thread_count > 0
    raise ArgumentError, t("invalid_multiget_thread_count")
  end

  @threads = 1.upto(thread_count).map do |_node|
    Thread.new do
      loop do
        pair = queue_mutex.synchronize do
          queue.shift
        end

        break if pair.nil?

        found = attempt_fetch(*pair)
        result_mutex.synchronize do
          result_hash[pair] = found
        end
      end
    end
  end
end

#finished?Boolean



82
83
84
85
# File 'lib/riak/multiget.rb', line 82

def finished?
  set_finished_for_thread_liveness
  finished
end

#resultsObject



77
78
79
80
# File 'lib/riak/multiget.rb', line 77

def results
  wait_for_finish
  result_hash
end

#wait_for_finishObject



87
88
89
90
91
# File 'lib/riak/multiget.rb', line 87

def wait_for_finish
  return if finished?
  @threads.each {|t| t.join }
  @finished = true
end