Class: Wakame::MasterManagers::AgentMonitor

Inherits:
Object
  • Object
show all
Includes:
ThreadImmutable, Wakame::MasterManager
Defined in:
lib/wakame/master_managers/agent_monitor.rb

Instance Attribute Summary

Attributes included from Wakame::MasterManager

#master

Instance Method Summary collapse

Methods included from ThreadImmutable

#bind_thread, included, #target_thread, #target_thread?, #thread_check

Methods included from Wakame::MasterManager

#reload, #start, #stop

Instance Method Details

#agent_poolObject



145
146
147
# File 'lib/wakame/master_managers/agent_monitor.rb', line 145

def agent_pool
  Models::AgentPool.instance
end

#initObject



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/wakame/master_managers/agent_monitor.rb', line 8

def init
  @agent_timeout = 301.to_f
  @agent_kill_timeout = @agent_timeout * 2
  @gc_period = 20.to_f

  # GC event trigger for agent timer & status
  @agent_timeout_timer = EM::PeriodicTimer.new(@gc_period) {
    StatusDB.pass {
      #Wakame.log.debug("Started agent GC : agents.size=#{@registered_agents.size}")
      self.agent_pool.dataset.all.each { |row|
        agent = Service::Agent.find(row[:agent_id])
        #next if agent.status == Service::Agent::STATUS_OFFLINE
        
        diff_time = Time.now - agent.last_ping_at_time
        #Wakame.log.debug "AgentMonitor GC : #{agent_id}: #{diff_time}"
        if diff_time > @agent_timeout.to_f
          agent.update_monitor_status(Service::Agent::STATUS_TIMEOUT)
        end
        
        if diff_time > @agent_kill_timeout.to_f
          agent_pool.unregister(agent)
        end
      }
      
      #Wakame.log.debug("Finished agent GC")
    }
  }
  
  
  master.add_subscriber('registry') { |data|
    data = eval(data)
    next if Time.parse(data[:responded_at]) < master.started_at
    
    StatusDB.pass {
      agent_id = data[:agent_id]
      
      agent = agent_pool.agent_find_or_create(agent_id)
      
      case data[:class_type]
      when 'Wakame::Packets::Register'
        agent.update_status(Service::Agent::STATUS_REGISTERRING)
        agent_pool.register_as_observed(agent)

        agent.root_path = data[:root_path]

        agent.save
        master.action_manager.trigger_action(Actions::RegisterAgent.new(agent))
      when 'Wakame::Packets::UnRegister'
        agent_pool.unregister(agent)
      end
    }

  }

  master.add_subscriber('ping') { |data|
    ping = eval(data)
    # Skip the old ping responses before starting master node.
    next if Time.parse(ping[:responded_at]) < master.started_at

    # Variable update function for the common members
    set_report_values = proc { |agent|
      agent.last_ping_at = ping[:responded_at]

      agent.renew_reported_services(ping[:services])
      agent.save

      agent.update_monitor_status(Service::Agent::STATUS_ONLINE)
    }
    
    StatusDB.pass { 
      agent = Service::Agent.find(ping[:agent_id])
      if agent.nil?
        agent = Service::Agent.new
        agent.id = ping[:agent_id]
        
        set_report_values.call(agent)

        agent_pool.register_as_observed(agent)
      else
        set_report_values.call(agent)
      end
      
      EventDispatcher.fire_event(Event::AgentPong.new(agent))
    }
  }
  
  master.add_subscriber('agent_event') { |data|
    response = eval(data)
    next if Time.parse(response[:responded_at]) < master.started_at

    case response[:class_type]
    when 'Wakame::Packets::StatusCheckResult'
      StatusDB.pass {
        svc_inst = Service::ServiceInstance.find(response[:svc_id])
        if svc_inst
          svc_inst.monitor_status = response[:status]
          svc_inst.save
        else
          Wakame.log.error("#{self.class}: Unknown service ID: #{response[:svc_id]}")
          agent = Service::Agent.find(response[:agent_id])
          correct_svc_monitor_mismatch(agent)
        end
      }
    when 'Wakame::Packets::ServiceStatusChanged'
      StatusDB.pass {
        svc_inst = Service::ServiceInstance.find(response[:svc_id])
        if svc_inst
          response_time = Time.parse(response[:responded_at])
          svc_inst.update_monitor_status(response[:new_status], response_time, response[:fail_message])
        end
      }
    when 'Wakame::Packets::ActorResponse'
      case response[:status]
      when Actor::STATUS_RUNNING
        EventDispatcher.fire_event(Event::ActorProgress.new(response[:agent_id], response[:token], 0))
      when Actor::STATUS_FAILED
        EventDispatcher.fire_event(Event::ActorComplete.new(response[:agent_id], response[:token], response[:status], nil))
      else
        EventDispatcher.fire_event(Event::ActorComplete.new(response[:agent_id], response[:token], response[:status], response[:opts][:return_value]))
      end
    else
      Wakame.log.warn("#{self.class}: Unhandled agent response: #{response[:class_type]}")
    end
  }

  EventDispatcher.subscribe(Event::AgentUnMonitored) { |event|
    StatusDB.pass {
      agent = Service::Agent.find(event.agent.id)
      agent.terminate
    }
  }
end

#terminateObject



141
142
143
# File 'lib/wakame/master_managers/agent_monitor.rb', line 141

def terminate
  @agent_timeout_timer.cancel
end