Class: Riak::Multiget

Inherits:
Object show all
Includes:
Util::Translation
Defined in:
lib/riak/multiget.rb

Overview

Coordinates a parallel fetch operation for multiple values.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Util::Translation

#i18n_scope, #t

Constructor Details

#initialize(client, fetch_list) ⇒ Multiget

Create a Riak Multiget operation.

Parameters:

Raises:

  • (ArgumentError)

37
38
39
40
41
42
43
44
45
46
# File 'lib/riak/multiget.rb', line 37

def initialize(client, fetch_list)
  raise ArgumentError, t('client_type', :client => client.inspect) unless client.is_a? Riak::Client
  raise ArgumentError, t('array_type', :array => fetch_list.inspect) unless fetch_list.is_a? Array

  validate_fetch_list fetch_list
  @client, @fetch_list = client, fetch_list.uniq
  self.result_hash = Hash.new
  @finished = false
  self.thread_count = client.multiget_threads
end

Instance Attribute Details

#clientRiak::Client (readonly)

Returns the associated client

Returns:


10
11
12
# File 'lib/riak/multiget.rb', line 10

def client
  @client
end

#fetch_listArray<Bucket, String> (readonly)

Returns fetch_list an Array of Bucket and String keys to fetch

Returns:


13
14
15
# File 'lib/riak/multiget.rb', line 13

def fetch_list
  @fetch_list
end

#finishedBoolean (readonly)

Returns finished if the fetch operation has completed

Returns:

  • (Boolean)

    finished if the fetch operation has completed


19
20
21
# File 'lib/riak/multiget.rb', line 19

def finished
  @finished
end

#result_hashHash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances

Returns Hash<fetch_list_entry, RObject] result_hash a Hash of Bucket and String key pairs to RObject instances

Returns:


16
17
18
# File 'lib/riak/multiget.rb', line 16

def result_hash
  @result_hash
end

#thread_countInteger

Returns The number of threads to use

Returns:

  • (Integer)

    The number of threads to use


22
23
24
# File 'lib/riak/multiget.rb', line 22

def thread_count
  @thread_count
end

Class Method Details

.get_all(client, fetch_list) ⇒ Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances

Perform a Riak Multiget operation.

Parameters:

Returns:


28
29
30
31
32
# File 'lib/riak/multiget.rb', line 28

def self.get_all(client, fetch_list)
  multi = new client, fetch_list
  multi.fetch
  multi.results
end

Instance Method Details

#fetchObject

Starts the parallelized fetch operation

Raises:

  • (ArgumentError)

    when a non-positive-Integer count is given


50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/riak/multiget.rb', line 50

def fetch
  queue = fetch_list.dup
  queue_mutex = Mutex.new
  result_mutex = Mutex.new

  unless thread_count.is_a?(Integer) && thread_count > 0
    raise ArgumentError, t("invalid_multiget_thread_count")
  end

  @threads = 1.upto(thread_count).map do |_node|
    Thread.new do
      loop do
        pair = queue_mutex.synchronize do
          queue.shift
        end

        break if pair.nil?

        found = attempt_fetch(*pair)
        result_mutex.synchronize do
          result_hash[pair] = found
        end
      end
    end
  end
end

#finished?Boolean

Returns:

  • (Boolean)

82
83
84
85
# File 'lib/riak/multiget.rb', line 82

def finished?
  set_finished_for_thread_liveness
  finished
end

#resultsObject


77
78
79
80
# File 'lib/riak/multiget.rb', line 77

def results
  wait_for_finish
  result_hash
end

#wait_for_finishObject


87
88
89
90
91
# File 'lib/riak/multiget.rb', line 87

def wait_for_finish
  return if finished?
  @threads.each {|t| t.join }
  @finished = true
end