Class: IOPromise::Memcached::MemcacheExecutorPool

Inherits:
ExecutorPool::Batch show all
Defined in:
lib/iopromise/memcached/executor_pool.rb

Instance Method Summary collapse

Methods inherited from ExecutorPool::Batch

#initialize

Methods inherited from ExecutorPool::Base

#begin_executing, #complete, for, #initialize, #register, #sync

Constructor Details

This class inherits a constructor from IOPromise::ExecutorPool::Batch

Instance Method Details

#execute_continue(ready_readers, ready_writers, ready_exceptions) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/iopromise/memcached/executor_pool.rb', line 26

def execute_continue(ready_readers, ready_writers, ready_exceptions)
  if @current_batch.empty?
    next_batch
  end

  return [[], [], [], nil] if @current_batch.empty?

  so_far, readers, writers = memcache_client.continue_get_multi

  # when we're done (nothing to wait on), fill in any remaining keys with nil for completions to occur
  if readers.empty? && writers.empty?
    @keys_to_promises.each do |key, _|
      so_far[key] = nil unless so_far.include? key
    end
  end

  so_far.each do |key, value|
    next unless @keys_to_promises[key]
    @keys_to_promises[key].each do |promise|
      next if promise.fulfilled?

      promise.fulfill(value)
      complete(promise)
      @current_batch.delete(promise)
    end
  end

  [readers, writers, [], nil]
end

#memcache_clientObject



56
57
58
# File 'lib/iopromise/memcached/executor_pool.rb', line 56

def memcache_client
  @connection_pool
end

#next_batchObject



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/iopromise/memcached/executor_pool.rb', line 6

def next_batch
  super

  unless @current_batch.empty?
    @keys_to_promises = @current_batch.group_by { |promise| promise.key }
    @current_batch.each { |promise| begin_executing(promise) }
    begin
      memcache_client.begin_get_multi(@keys_to_promises.keys)
    rescue => e
      @keys_to_promises.values.flatten.each do |promise|
        promise.reject(e)
        complete(promise)
        @current_batch.delete(promise)
      end

      @keys_to_promises = nil
    end
  end
end