Class: RedisClient::Cluster::PubSub
- Inherits:
-
Object
- Object
- RedisClient::Cluster::PubSub
- Defined in:
- lib/redis_client/cluster/pub_sub.rb
Defined Under Namespace
Classes: State
Instance Method Summary collapse
- #call(*args, **kwargs) ⇒ Object
- #call_v(command) ⇒ Object
- #close ⇒ Object
-
#initialize(router, command_builder) ⇒ PubSub
constructor
A new instance of PubSub.
-
#next_event(timeout = nil) ⇒ Object
rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity.
Constructor Details
#initialize(router, command_builder) ⇒ PubSub
Returns a new instance of PubSub.
52 53 54 55 56 57 58 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 52 def initialize(router, command_builder) @router = router @command_builder = command_builder @queue = SizedQueue.new(BUF_SIZE) @state_dict = {} @commands = [] end |
Instance Method Details
#call(*args, **kwargs) ⇒ Object
60 61 62 63 64 65 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 60 def call(*args, **kwargs) command = @command_builder.generate(args, kwargs) _call(command) @commands << command nil end |
#call_v(command) ⇒ Object
67 68 69 70 71 72 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 67 def call_v(command) command = @command_builder.generate(command) _call(command) @commands << command nil end |
#close ⇒ Object
74 75 76 77 78 79 80 81 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 74 def close @state_dict.each_value(&:close) @state_dict.clear @commands.clear @queue.clear @queue.close nil end |
#next_event(timeout = nil) ⇒ Object
rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 83 def next_event(timeout = nil) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity @state_dict.each_value(&:ensure_worker) max_duration = calc_max_duration(timeout) starting = obtain_current_time loop do break if max_duration > 0 && obtain_current_time - starting > max_duration case event = @queue.pop(true) when ::RedisClient::CommandError raise event unless event..start_with?('MOVED', 'CLUSTERDOWN Hash slot not served') break start_over when ::RedisClient::ConnectionError break start_over when StandardError then raise event when Array then break event end rescue ThreadError sleep 0.005 end end |