Class: RedisClient::Cluster::PubSub
- Inherits:
-
Object
- Object
- RedisClient::Cluster::PubSub
- Defined in:
- lib/redis_client/cluster/pub_sub.rb
Defined Under Namespace
Classes: State
Constant Summary collapse
- BUF_SIZE =
Integer(ENV.fetch('REDIS_CLIENT_PUBSUB_BUF_SIZE', 1024))
Instance Method Summary collapse
- #call(*args, **kwargs) ⇒ Object
- #call_v(command) ⇒ Object
- #close ⇒ Object
-
#initialize(router, command_builder) ⇒ PubSub
constructor
A new instance of PubSub.
- #next_event(timeout = nil) ⇒ Object
Constructor Details
Instance Method Details
#call(*args, **kwargs) ⇒ Object
53 54 55 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 53 def call(*args, **kwargs) _call(@command_builder.generate(args, kwargs)) end |
#call_v(command) ⇒ Object
57 58 59 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 57 def call_v(command) _call(@command_builder.generate(command)) end |
#close ⇒ Object
61 62 63 64 65 66 67 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 61 def close @state_dict.each_value(&:close) @state_dict.clear @queue.clear @queue.close nil end |
#next_event(timeout = nil) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/redis_client/cluster/pub_sub.rb', line 69 def next_event(timeout = nil) @state_dict.each_value(&:ensure_worker) max_duration = calc_max_duration(timeout) starting = obtain_current_time loop do break if max_duration > 0 && obtain_current_time - starting > max_duration case event = @queue.pop(true) when StandardError then raise event when Array then break event end rescue ThreadError sleep 0.005 end end |