Class: MonitorAgent

Inherits:
Object
  • Object
show all
Includes:
Dataflow
Defined in:
lib/monitor_agent.rb

Constant Summary collapse

QUEUE_NAME =
'cloud-monitors'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(mq) ⇒ MonitorAgent

Returns a new instance of MonitorAgent.



17
18
19
20
21
22
23
24
25
26
# File 'lib/monitor_agent.rb', line 17

def initialize(mq)
  @mq = mq
  mq.queue(QUEUE_NAME).subscribe(:ack => true) do |h,m|
    if AMQP.closing?
      Utils::LOG.info "(ignoring message, will be redelivered later)"
    else
      process_message(m,h)
    end
  end
end

Instance Attribute Details

#mqObject (readonly)

Returns the value of attribute mq.



15
16
17
# File 'lib/monitor_agent.rb', line 15

def mq
  @mq
end

Instance Method Details

#check_server(srv) ⇒ Object



51
52
53
54
55
56
# File 'lib/monitor_agent.rb', line 51

def check_server(srv)
  need_later do
    Utils::LOG.info "checking server: #{srv.inspect}"
    MonitorFactory.build(srv['role'], srv['host'], srv['token']).check
  end
end

#process_message(msg, header) ⇒ Object



28
29
30
31
32
33
34
# File 'lib/monitor_agent.rb', line 28

def process_message(msg, header)
  msg = JSON.parse(msg)
  if MonitorGroup === msg
    Utils::LOG.info "got MonitorGroup: #{msg}"
    process_monitor(msg, header)
  end
end

#process_monitor(monitor_group, header) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/monitor_agent.rb', line 36

def process_monitor(monitor_group, header)
  EM.defer(lambda {
      monitor_group.servers.map do |srv|
        check_server(srv)
      end.map{|m| m.to_hash }
    },
    
    lambda {|monitors|
      alert = MonitorResult.new(monitor_group.env_id, monitor_group.api, monitors)
      Utils::LOG.info alert.inspect
      mq.queue(AlertAgent::QUEUE_NAME).publish(alert.to_json)
      header.ack
    })
end