Class: RedisClient::Cluster::PubSub

Inherits:
Object
  • Object
show all
Defined in:
lib/redis_client/cluster/pub_sub.rb

Defined Under Namespace

Classes: State

Instance Method Summary collapse

Constructor Details

#initialize(router, command_builder) ⇒ PubSub

Returns a new instance of PubSub.



52
53
54
55
56
57
58
# File 'lib/redis_client/cluster/pub_sub.rb', line 52

def initialize(router, command_builder)
  @router = router
  @command_builder = command_builder
  @queue = SizedQueue.new(BUF_SIZE)
  @state_dict = {}
  @commands = []
end

Instance Method Details

#call(*args, **kwargs) ⇒ Object



60
61
62
63
64
65
# File 'lib/redis_client/cluster/pub_sub.rb', line 60

def call(*args, **kwargs)
  command = @command_builder.generate(args, kwargs)
  _call(command)
  @commands << command
  nil
end

#call_v(command) ⇒ Object



67
68
69
70
71
72
# File 'lib/redis_client/cluster/pub_sub.rb', line 67

def call_v(command)
  command = @command_builder.generate(command)
  _call(command)
  @commands << command
  nil
end

#closeObject



74
75
76
77
78
79
80
81
# File 'lib/redis_client/cluster/pub_sub.rb', line 74

def close
  @state_dict.each_value(&:close)
  @state_dict.clear
  @commands.clear
  @queue.clear
  @queue.close
  nil
end

#next_event(timeout = nil) ⇒ Object

rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/redis_client/cluster/pub_sub.rb', line 83

def next_event(timeout = nil) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity
  @state_dict.each_value(&:ensure_worker)
  max_duration = calc_max_duration(timeout)
  starting = obtain_current_time

  loop do
    break if max_duration > 0 && obtain_current_time - starting > max_duration

    case event = @queue.pop(true)
    when ::RedisClient::CommandError
      raise event unless event.message.start_with?('MOVED', 'CLUSTERDOWN Hash slot not served')

      break start_over
    when ::RedisClient::ConnectionError
      break start_over
    when StandardError then raise event
    when Array then break event
    end
  rescue ThreadError
    sleep 0.005
  end
end