Class: RedisClient::Cluster::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_client/cluster.rb

Constant Summary collapse

ReplySizeError =
Class.new(::RedisClient::Error)

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ Pipeline

Returns a new instance of Pipeline.



15
16
17
18
19
# File 'lib/redis_client/cluster.rb', line 15

def initialize(client)
  @client = client
  @grouped = Hash.new([].freeze)
  @size = 0
end

Instance Method Details

#blocking_call(timeout, *command, **kwargs) ⇒ Object



33
34
35
36
37
# File 'lib/redis_client/cluster.rb', line 33

def blocking_call(timeout, *command, **kwargs)
  node_key = @client.send(:find_node_key, *command, primary_only: true)
  @grouped[node_key] += [[@size, :blocking_call, timeout, command, kwargs]]
  @size += 1
end

#call(*command, **kwargs) ⇒ Object



21
22
23
24
25
# File 'lib/redis_client/cluster.rb', line 21

def call(*command, **kwargs)
  node_key = @client.send(:find_node_key, *command, primary_only: true)
  @grouped[node_key] += [[@size, :call, command, kwargs]]
  @size += 1
end

#call_once(*command, **kwargs) ⇒ Object



27
28
29
30
31
# File 'lib/redis_client/cluster.rb', line 27

def call_once(*command, **kwargs)
  node_key = @client.send(:find_node_key, *command, primary_only: true)
  @grouped[node_key] += [[@size, :call_once, command, kwargs]]
  @size += 1
end

#empty?Boolean

Returns:

  • (Boolean)


39
40
41
# File 'lib/redis_client/cluster.rb', line 39

def empty?
  @size.zero?
end

#executeObject

rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/redis_client/cluster.rb', line 43

def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength
  all_replies = Array.new(@size)
  threads = @grouped.map do |k, v|
    Thread.new(@client, k, v) do |client, node_key, rows|
      Thread.pass
      replies = client.send(:find_node, node_key).pipelined do |pipeline|
        rows.each do |row|
          case row[1]
          when :call then pipeline.call(*row[2], **row[3])
          when :call_once then pipeline.call_once(*row[2], **row[3])
          when :blocking_call then pipeline.blocking_call(row[2], *row[3], **row[4])
          else raise NotImplementedError, row[1]
          end
        end
      end

      raise ReplySizeError, "commands: #{rows.size}, replies: #{replies.size}" if rows.size != replies.size

      rows.each_with_index { |row, idx| all_replies[row.first] = replies[idx] }
    end
  end

  threads.each(&:join)
  all_replies
end