Class: RedisClient::Cluster::Pipeline
- Inherits:
-
Object
- Object
- RedisClient::Cluster::Pipeline
- Defined in:
- lib/redis_client/cluster.rb
Constant Summary collapse
- ReplySizeError =
Class.new(::RedisClient::Error)
Instance Method Summary collapse
- #blocking_call(timeout, *command, **kwargs) ⇒ Object
- #call(*command, **kwargs) ⇒ Object
- #call_once(*command, **kwargs) ⇒ Object
- #empty? ⇒ Boolean
-
#execute ⇒ Object
rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength.
-
#initialize(client) ⇒ Pipeline
constructor
A new instance of Pipeline.
Constructor Details
#initialize(client) ⇒ Pipeline
Returns a new instance of Pipeline.
15 16 17 18 19 |
# File 'lib/redis_client/cluster.rb', line 15 def initialize(client) @client = client @grouped = Hash.new([].freeze) @size = 0 end |
Instance Method Details
#blocking_call(timeout, *command, **kwargs) ⇒ Object
33 34 35 36 37 |
# File 'lib/redis_client/cluster.rb', line 33 def blocking_call(timeout, *command, **kwargs) node_key = @client.send(:find_node_key, *command, primary_only: true) @grouped[node_key] += [[@size, :blocking_call, timeout, command, kwargs]] @size += 1 end |
#call(*command, **kwargs) ⇒ Object
21 22 23 24 25 |
# File 'lib/redis_client/cluster.rb', line 21 def call(*command, **kwargs) node_key = @client.send(:find_node_key, *command, primary_only: true) @grouped[node_key] += [[@size, :call, command, kwargs]] @size += 1 end |
#call_once(*command, **kwargs) ⇒ Object
27 28 29 30 31 |
# File 'lib/redis_client/cluster.rb', line 27 def call_once(*command, **kwargs) node_key = @client.send(:find_node_key, *command, primary_only: true) @grouped[node_key] += [[@size, :call_once, command, kwargs]] @size += 1 end |
#empty? ⇒ Boolean
39 40 41 |
# File 'lib/redis_client/cluster.rb', line 39 def empty? @size.zero? end |
#execute ⇒ Object
rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/redis_client/cluster.rb', line 43 def execute # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/MethodLength all_replies = Array.new(@size) threads = @grouped.map do |k, v| Thread.new(@client, k, v) do |client, node_key, rows| Thread.pass replies = client.send(:find_node, node_key).pipelined do |pipeline| rows.each do |row| case row[1] when :call then pipeline.call(*row[2], **row[3]) when :call_once then pipeline.call_once(*row[2], **row[3]) when :blocking_call then pipeline.blocking_call(row[2], *row[3], **row[4]) else raise NotImplementedError, row[1] end end end raise ReplySizeError, "commands: #{rows.size}, replies: #{replies.size}" if rows.size != replies.size rows.each_with_index { |row, idx| all_replies[row.first] = replies[idx] } end end threads.each(&:join) all_replies end |