Class: RedisClient::Cluster::PubSub
- Inherits:
-
Object
- Object
- RedisClient::Cluster::PubSub
- Defined in:
- lib/redis_client/cluster/pub_sub.rb
Defined Under Namespace
Classes: State
Instance Method Summary collapse
- #call(*args, **kwargs) ⇒ Object
- #call_v(command) ⇒ Object
- #close ⇒ Object
-
#initialize(router, command_builder) ⇒ PubSub
constructor
A new instance of PubSub.
-
#next_event(timeout = nil) ⇒ Object
rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity.
Constructor Details
#initialize(router, command_builder) ⇒ PubSub
Returns a new instance of PubSub.
55 56 57 58 59 60 61 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 55 def initialize(router, command_builder) @router = router @command_builder = command_builder @queue = SizedQueue.new(BUF_SIZE) @state_dict = {} @commands = [] end |
Instance Method Details
#call(*args, **kwargs) ⇒ Object
63 64 65 66 67 68 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 63 def call(*args, **kwargs) command = @command_builder.generate(args, kwargs) _call(command) @commands << command nil end |
#call_v(command) ⇒ Object
70 71 72 73 74 75 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 70 def call_v(command) command = @command_builder.generate(command) _call(command) @commands << command nil end |
#close ⇒ Object
77 78 79 80 81 82 83 84 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 77 def close @state_dict.each_value(&:close) @state_dict.clear @commands.clear @queue.clear @queue.close nil end |
#next_event(timeout = nil) ⇒ Object
rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 86 def next_event(timeout = nil) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity @state_dict.each_value(&:ensure_worker) max_duration = calc_max_duration(timeout) starting = obtain_current_time loop do break if max_duration > 0 && obtain_current_time - starting > max_duration case event = @queue.pop(true) when ::RedisClient::CommandError raise event unless event..start_with?('MOVED', 'CLUSTERDOWN') break start_over when ::RedisClient::ConnectionError then break start_over when StandardError then raise event when Array then break event end rescue ThreadError sleep 0.005 end end |