Class: RedisClient::Cluster::PubSub
- Inherits:
-
Object
- Object
- RedisClient::Cluster::PubSub
- Defined in:
- lib/redis_client/cluster/pub_sub.rb
Defined Under Namespace
Classes: State
Constant Summary collapse
- BUF_SIZE =
Integer(ENV.fetch('REDIS_CLIENT_PUBSUB_BUF_SIZE', 1024))
Instance Method Summary collapse
- #call(*args, **kwargs) ⇒ Object
- #call_v(command) ⇒ Object
- #close ⇒ Object
-
#initialize(router, command_builder) ⇒ PubSub
constructor
A new instance of PubSub.
- #next_event(timeout = nil) ⇒ Object
Constructor Details
Instance Method Details
#call(*args, **kwargs) ⇒ Object
54 55 56 57 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 54 def call(*args, **kwargs) _call(@command_builder.generate(args, kwargs)) nil end |
#call_v(command) ⇒ Object
59 60 61 62 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 59 def call_v(command) _call(@command_builder.generate(command)) nil end |
#close ⇒ Object
64 65 66 67 68 69 70 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 64 def close @state_dict.each_value(&:close) @state_dict.clear @queue.clear @queue.close nil end |
#next_event(timeout = nil) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 72 def next_event(timeout = nil) @state_dict.each_value(&:ensure_worker) max_duration = calc_max_duration(timeout) starting = obtain_current_time loop do break if max_duration > 0 && obtain_current_time - starting > max_duration case event = @queue.pop(true) when StandardError then raise event when Array then break event end rescue ThreadError sleep 0.005 end end |