Class: Beanpool::Connections

Inherits:
Object
  • Object
show all
Defined in:
lib/beanpool/connections.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ip_array, debug) ⇒ Connections

Returns a new instance of Connections.



8
9
10
11
12
13
14
15
# File 'lib/beanpool/connections.rb', line 8

def initialize(ip_array, debug)
  @ip_array = ip_array
  @troubled_ips = {}
  @connections = {}
  @debug = debug
  build_connections
  @mutex = Mutex.new
end

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



6
7
8
# File 'lib/beanpool/connections.rb', line 6

def connections
  @connections
end

#ip_arrayObject (readonly)

Returns the value of attribute ip_array.



6
7
8
# File 'lib/beanpool/connections.rb', line 6

def ip_array
  @ip_array
end

Instance Method Details

#all_stats(tube_name) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/beanpool/connections.rb', line 134

def all_stats(tube_name)
  return_struct = {}
  @connections.each do |ip, connection|
    stat = connection.tubes[tube_name].stats
    stat.to_h.each do |name, val|
      return_struct[name] =  if val.is_a?(Integer)
        return_struct[name].nil? ? val.to_i : return_struct[name] + val.to_i
      elsif name == 'name'
        return_struct[name].nil? ? ip.to_s : return_struct[name] + '|' + ip.to_s
      else
        return_struct[name].nil? ? val.to_s : return_struct[name] + '|' + val.to_s
      end
    end
  end
  return return_struct
end

#build_connectionsObject



34
35
36
37
38
39
40
41
42
43
44
# File 'lib/beanpool/connections.rb', line 34

def build_connections
  @ip_array.each do |ip|
    raise 'Only single IP for beaneater' if ip.is_a?(Array)
    begin
      @connections[ip] = Beaneater.new(ip)
    rescue => ex
      notify(ex)
      notify("Failed to add #{ip}")
    end
  end
end

#check_timesObject



65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/beanpool/connections.rb', line 65

def check_times
  @troubled_ips.each do |k, v|
    notify("Checking troubled #{k}")
    if v[:time] < Time.now - 60
      begin
        @connections[k] = Beaneater.new(k)
        notify("Re-added to live: #{k}")
        @troubled_ips.delete(k)
      rescue => ex
        notify(ex)
        # Keep retrying every minute
        v[:time] = Time.now
      end
    end
  end
end

#closeObject



46
47
48
49
50
# File 'lib/beanpool/connections.rb', line 46

def close
  @connections.each do |_k, v|
    v.close
  end
end

#connection_sampleObject



59
60
61
62
63
# File 'lib/beanpool/connections.rb', line 59

def connection_sample
  check_times
  ip_id = @connections.keys.sample
  ip_id
end

#get_job_from_tube(timeout = nil, tube_name = 'default') ⇒ Object



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/beanpool/connections.rb', line 82

def get_job_from_tube(timeout = nil, tube_name = 'default')
  @mutex.synchronize do
    ip_id = connection_sample
    connection = @connections[ip_id]
    begin
      job = connection.tubes[tube_name].reserve(timeout)
      return job
    rescue Beaneater::TimedOutError
      return nil
    rescue => ex
      notify(ex)
      notify("Exception IP: #{ip_id}")
      put_ip_in_timeout_and_reload(ip_id)
      return nil
    end
  end
end

#keystring_hash(hash) ⇒ Object



126
127
128
129
130
131
132
# File 'lib/beanpool/connections.rb', line 126

def keystring_hash(hash)
  new_hash = {}
  hash.keys.each do |k|
    new_hash[k.to_s] = hash[k]
  end
  new_hash
end

#notify(object) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/beanpool/connections.rb', line 17

def notify(object)
  if @debug
    if object.is_a? Exception
      backtrace_array = object.backtrace
      backtrace_array.reject! { |x| x =~ /\.rvm/ }
      backtrace_array.unshift(object.message.to_s)
      raw_string = backtrace_array.join("\n")
      puts "EXCEPTION: #{object.message}"
      puts raw_string
    elsif object.is_a?(Hash) || object.is_a?(Array)
      puts object
    elsif object.is_a?(String)
      puts object
    end
  end
end

#put_ip_in_timeout_and_reload(ip) ⇒ Object



52
53
54
55
56
57
# File 'lib/beanpool/connections.rb', line 52

def put_ip_in_timeout_and_reload(ip)
  return unless @connections.size > 1
  @troubled_ips[ip] = { time: Time.now }
  @connections.delete(ip) unless @connections.size < 2
  notify("Added #{ip} to troubled")
end

#put_job_to_tube(body, options) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/beanpool/connections.rb', line 100

def put_job_to_tube(body, options)
  @mutex.synchronize do
    options = keystring_hash(options)
    pri = options["pri"] || 32000
    ttr = options["ttr"] || 60
    reset_use_tube = options['reset_use_tube']
    tube_name = options["tube_name"] || 'default'
    delay = (options["delay"]).to_i

    ip_id = connection_sample
    connection = @connections[ip_id]
    begin
      notify("BEANPOOL: Putting to #{tube_name}")
      connection.tubes[tube_name].put(body, :pri => pri, :delay => delay, :ttr => ttr)
    rescue => ex
      notify(ex)
      put_ip_in_timeout_and_reload(ip_id)
      ip_id = connection_sample
      connection = @connections[ip_id]
      connection.tubes[tube_name].put(body, :pri => pri, :delay => delay, :ttr => ttr)
    end
    # Force default tube reset if requested.
    connection.tubes.use(reset_use_tube) if connection && reset_use_tube
  end
end

#stats(tube_name, stat_name) ⇒ Object



151
152
153
154
155
156
157
158
159
160
161
# File 'lib/beanpool/connections.rb', line 151

def stats(tube_name, stat_name)
  value = 0
  @connections.each do |_k, v|
    tube = v.tubes[tube_name]
    if tube
      beaneater_stats = tube.stats
      value += beaneater_stats[stat_name.to_sym].to_i if beaneater_stats
    end
  end
  value
end