Class: Bosh::Monitor::AgentManager

Inherits:
Object
  • Object
show all
Defined in:
lib/bosh/monitor/agent_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(event_processor) ⇒ AgentManager

Returns a new instance of AgentManager.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/bosh/monitor/agent_manager.rb', line 10

def initialize(event_processor)
  # hash of agent id to agent structure (see add_agent())
  @agents = { }

  # hash of deployment name to set of agent ids
  @deployments = { }

  @logger = Bhm.logger
  @heartbeats_received = 0
  @alerts_received = 0
  @alerts_processed = 0

  @processor = event_processor
end

Instance Attribute Details

#alerts_processedObject (readonly)

Returns the value of attribute alerts_processed.



6
7
8
# File 'lib/bosh/monitor/agent_manager.rb', line 6

def alerts_processed
  @alerts_processed
end

#alerts_receivedObject (readonly)

Returns the value of attribute alerts_received.



5
6
7
# File 'lib/bosh/monitor/agent_manager.rb', line 5

def alerts_received
  @alerts_received
end

#heartbeats_receivedObject (readonly)

Returns the value of attribute heartbeats_received.



4
5
6
# File 'lib/bosh/monitor/agent_manager.rb', line 4

def heartbeats_received
  @heartbeats_received
end

#processorObject

Returns the value of attribute processor.



8
9
10
# File 'lib/bosh/monitor/agent_manager.rb', line 8

def processor
  @processor
end

Instance Method Details

#add_agent(deployment_name, vm_data) ⇒ Object

Processes VM data from BOSH Director, extracts relevant agent data, wraps it into Agent object and adds it to a list of managed agents.



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/bosh/monitor/agent_manager.rb', line 121

def add_agent(deployment_name, vm_data)
  unless vm_data.kind_of?(Hash)
    @logger.error("Invalid format for VM data: expected Hash, got #{vm_data.class}: #{vm_data}")
    return false
  end

  @logger.info("Adding agent #{vm_data['agent_id']} (#{vm_data['job']}/#{vm_data['id']}) to #{deployment_name}...")

  agent_id = vm_data['agent_id']

  if agent_id.nil?
    @logger.warn("No agent id for VM: #{vm_data}")
    return false
  end

  # Idle VMs, we don't care about them, but we still want to track them
  if vm_data['job'].nil?
    @logger.debug("VM with no job found: #{agent_id}")
  end

  agent = @agents[agent_id]

  if agent.nil?
    @logger.debug("Discovered agent #{agent_id}")
    agent = Agent.new(agent_id)
    @agents[agent_id] = agent
  end

  agent.deployment = deployment_name
  agent.job = vm_data['job']
  agent.index = vm_data['index']
  agent.cid = vm_data['cid']
  agent.instance_id = vm_data['id']

  @deployments[deployment_name] ||= Set.new
  @deployments[deployment_name] << agent_id
  true
end

#agents_countObject



64
65
66
# File 'lib/bosh/monitor/agent_manager.rb', line 64

def agents_count
  @agents.size
end

#analyze_agent(agent_id) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/bosh/monitor/agent_manager.rb', line 187

def analyze_agent(agent_id)
  agent = @agents[agent_id]
  ts = Time.now.to_i

  if agent.nil?
    @logger.error("Can't analyze agent #{agent_id} as it is missing from agents index, skipping...")
    return false
  end

  if agent.timed_out? && agent.rogue?
    # Agent has timed out but it was never
    # actually a proper member of the deployment,
    # so we don't really care about it
    remove_agent(agent.id)
    return
  end

  if agent.timed_out?
    @processor.process(:alert,
      severity: 2,
      source: agent.name,
      title: "#{agent.id} has timed out",
      created_at: ts,
      deployment: agent.deployment,
      job: agent.job,
      instance_id: agent.instance_id)
  end

  if agent.rogue?
    @processor.process(:alert,
      :severity => 2,
      :source => agent.name,
      :title => "#{agent.id} is not a part of any deployment",
      :created_at => ts)
  end

  true
end

#analyze_agentsObject



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/bosh/monitor/agent_manager.rb', line 160

def analyze_agents
  @logger.info "Analyzing agents..."
  started = Time.now

  processed = Set.new
  count = 0

  # Agents from managed deployments
  @deployments.each_pair do |deployment_name, agent_ids|
    agent_ids.each do |agent_id|
      analyze_agent(agent_id)
      processed << agent_id
      count += 1
    end
  end

  # Rogue agents (hey there Solid Snake)
  (@agents.keys.to_set - processed).each do |agent_id|
    @logger.warn("Agent #{agent_id} is not a part of any deployment")
    analyze_agent(agent_id)
    count += 1
  end

  @logger.info("Analyzed %s, took %s seconds" % [ pluralize(count, "agent"), Time.now - started ])
  count
end

#deployments_countObject



68
69
70
# File 'lib/bosh/monitor/agent_manager.rb', line 68

def deployments_count
  @deployments.size
end

#get_agents_for_deployment(deployment_name) ⇒ Object

Get a hash of agent id -> agent object for all agents associated with the deployment



26
27
28
29
# File 'lib/bosh/monitor/agent_manager.rb', line 26

def get_agents_for_deployment(deployment_name)
  agent_ids = @deployments[deployment_name]
  @agents.select { |key, value| agent_ids.include?(key) }
end

#lookup_plugin(name, options = {}) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
# File 'lib/bosh/monitor/agent_manager.rb', line 31

def lookup_plugin(name, options = {})
  plugin_class = nil
  begin
    class_name = name.to_s.split("_").map(&:capitalize).join
    plugin_class = Bosh::Monitor::Plugins.const_get(class_name)
  rescue NameError => e
    raise PluginError, "Cannot find '#{name}' plugin"
  end

  plugin_class.new(options)
end

#on_alert(agent, message) ⇒ Object



268
269
270
271
272
273
274
275
# File 'lib/bosh/monitor/agent_manager.rb', line 268

def on_alert(agent, message)
  if message.is_a?(Hash) && !message.has_key?("source")
    message["source"] = agent.name
  end

  @processor.process(:alert, message)
  @alerts_processed += 1
end

#on_heartbeat(agent, message) ⇒ Object



277
278
279
280
281
282
283
284
285
286
287
288
# File 'lib/bosh/monitor/agent_manager.rb', line 277

def on_heartbeat(agent, message)
  agent.updated_at = Time.now

  if message.is_a?(Hash)
    message["timestamp"] = Time.now.to_i if message["timestamp"].nil?
    message["agent_id"] = agent.id
    message["deployment"] = agent.deployment
  end

  @processor.process(:heartbeat, message)
  @heartbeats_received += 1
end

#on_shutdown(agent, message) ⇒ Object



290
291
292
293
# File 'lib/bosh/monitor/agent_manager.rb', line 290

def on_shutdown(agent, message)
  @logger.info("Agent '#{agent.id}' shutting down...")
  remove_agent(agent.id)
end

#process_event(kind, subject, payload = {}) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/bosh/monitor/agent_manager.rb', line 226

def process_event(kind, subject, payload = {})
  kind = kind.to_s
  agent_id = subject.split('.', 4).last
  agent = @agents[agent_id]

  if agent.nil?
    # There might be more than a single shutdown event,
    # we are only interested in processing it if agent
    # is still managed
    return if kind == "shutdown"

    @logger.warn("Received #{kind} from unmanaged agent: #{agent_id}")
    agent = Agent.new(agent_id)
    @agents[agent_id] = agent
  else
    @logger.debug("Received #{kind} from #{agent_id}: #{payload}")
  end

  case payload
  when String
    message = Yajl::Parser.parse(payload)
  when Hash
    message = payload
  end

  case kind.to_s
  when "alert"
    on_alert(agent, message)
  when "heartbeat"
    on_heartbeat(agent, message)
  when "shutdown"
    on_shutdown(agent, message)
  else
    @logger.warn("No handler found for '#{kind}' event")
  end

rescue Yajl::ParseError => e
  @logger.error("Cannot parse incoming event: #{e}")
rescue Bhm::InvalidEvent => e
  @logger.error("Invalid event: #{e}")
end

#remove_agent(agent_id) ⇒ Object



110
111
112
113
114
115
116
# File 'lib/bosh/monitor/agent_manager.rb', line 110

def remove_agent(agent_id)
  @logger.info("Removing agent #{agent_id} from all deployments...")
  @agents.delete(agent_id)
  @deployments.each_pair do |deployment, agents|
    agents.delete(agent_id)
  end
end

#remove_deployment(name) ⇒ Object



100
101
102
103
104
105
106
107
108
# File 'lib/bosh/monitor/agent_manager.rb', line 100

def remove_deployment(name)
  agent_ids = @deployments[name]

  agent_ids.to_a.each do |agent_id|
    @agents.delete(agent_id)
  end

  @deployments.delete(name)
end

#setup_eventsObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/bosh/monitor/agent_manager.rb', line 43

def setup_events
  @processor.enable_pruning(Bhm.intervals.prune_events)
  Bhm.plugins.each do |plugin|
    @processor.add_plugin(lookup_plugin(plugin["name"], plugin["options"]), plugin["events"])
  end

  EM.schedule do
    Bhm.nats.subscribe("hm.agent.heartbeat.*") do |message, reply, subject|
      process_event(:heartbeat, subject, message)
    end

    Bhm.nats.subscribe("hm.agent.alert.*") do |message, reply, subject|
      process_event(:alert, subject, message)
    end

    Bhm.nats.subscribe("hm.agent.shutdown.*") do |message, reply, subject|
      process_event(:shutdown, subject, message)
    end
  end
end

#sync_agents(deployment, vms) ⇒ Object



85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/bosh/monitor/agent_manager.rb', line 85

def sync_agents(deployment, vms)
  managed_agent_ids = @deployments[deployment] || Set.new
  active_agent_ids  = Set.new

  vms.each do |vm|
    if add_agent(deployment, vm)
      active_agent_ids << vm["agent_id"]
    end
  end

  (managed_agent_ids - active_agent_ids).each do |agent_id|
    remove_agent(agent_id)
  end
end

#sync_deployments(deployments) ⇒ Object

Syncs deployments list received from director with HM deployments.

Parameters:

  • deployments

    Array list of deployments returned by director



75
76
77
78
79
80
81
82
83
# File 'lib/bosh/monitor/agent_manager.rb', line 75

def sync_deployments(deployments)
  managed = Set.new(deployments.map { |d| d["name"] })
  all     = Set.new(@deployments.keys)

  (all - managed).each do |stale_deployment|
    @logger.warn("Found stale deployment #{stale_deployment}, removing...")
    remove_deployment(stale_deployment)
  end
end