Class: Mamiya::Master::AgentMonitor

Inherits:
Object
  • Object
show all
Includes:
AgentMonitorHandlers
Defined in:
lib/mamiya/master/agent_monitor.rb

Overview

Class to monitor agent’s status. This collects all agents’ status. Statuses are updated by event from agent, and running serf query ‘mamiya:status` periodically.

Constant Summary collapse

STATUS_QUERY =
'mamiya:status'.freeze
PACKAGES_QUERY =
'mamiya:packages'.freeze
DEFAULT_INTERVAL =
60
PACKAGE_STATUS_KEYS =
%w(packages prereleases releases currents).map(&:freeze).freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from AgentMonitorHandlers

#pkg__remove, #prerelease__remove, #release__remove, #task___fetch__finish, #task___prepare__finish, #task___switch__finish, #task__error, #task__finalize, #task__finish, #task__start

Constructor Details

#initialize(master, raise_exception: false) ⇒ AgentMonitor

Returns a new instance of AgentMonitor.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/mamiya/master/agent_monitor.rb', line 24

def initialize(master, raise_exception: false)
  @master = master
  @interval = (master.config[:master] && 
              master.config[:master][:monitor] &&
              master.config[:master][:monitor][:refresh_interval]) ||
              DEFAULT_INTERVAL
 
  @raise_exception = raise_exception

  @agents = {}.freeze
  @failed_agents = [].freeze
  @statuses = {}
  @commit_lock = Mutex.new
  @last_refresh_at = nil
end

Instance Attribute Details

#agentsObject (readonly)

Returns the value of attribute agents.



40
41
42
# File 'lib/mamiya/master/agent_monitor.rb', line 40

def agents
  @agents
end

#failed_agentsObject (readonly)

Returns the value of attribute failed_agents.



40
41
42
# File 'lib/mamiya/master/agent_monitor.rb', line 40

def failed_agents
  @failed_agents
end

#last_refresh_atObject (readonly)

Returns the value of attribute last_refresh_at.



40
41
42
# File 'lib/mamiya/master/agent_monitor.rb', line 40

def last_refresh_at
  @last_refresh_at
end

Instance Method Details

#application_status(app, labels: nil) ⇒ Object



58
59
60
# File 'lib/mamiya/master/agent_monitor.rb', line 58

def application_status(app, labels: nil)
  ApplicationStatus.new(self, app, labels: labels)
end

#commit_event(event) ⇒ Object



91
92
93
# File 'lib/mamiya/master/agent_monitor.rb', line 91

def commit_event(event)
  @commit_lock.synchronize { commit_event_without_lock(event) }
end

#commit_event_without_lock(event) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/mamiya/master/agent_monitor.rb', line 95

def commit_event_without_lock(event)
  return unless /\Amamiya:/ === event.user_event

  method_name = event.user_event[7..-1].gsub(/:/, '__').gsub(/-/,'_')
  return unless self.respond_to?(method_name, true)

  payload = JSON.parse(event.payload)
  agent = @statuses[payload["name"]]
  return unless agent

  logger.debug "Commiting #{event.user_event}"
  logger.debug "- #{agent.inspect}"
  __send__ method_name, agent, payload, event
  logger.debug "+ #{agent.inspect}"

rescue JSON::ParserError => e
  logger.warn "Failed to parse payload in event #{event.user_event}: #{e.message}"
end

#package_status(app, pkg, labels: nil) ⇒ Object



54
55
56
# File 'lib/mamiya/master/agent_monitor.rb', line 54

def package_status(app, pkg, labels: nil)
  PackageStatus.new(self, app, pkg, labels: labels)
end

#refresh(**kwargs) ⇒ Object



114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/mamiya/master/agent_monitor.rb', line 114

def refresh(**kwargs)
  logger.debug "Refreshing..."

  new_agents = {}
  new_failed_agents = Set.new
  new_statuses = {}

  @master.serf.members.each do |member|
    new_agents[member["name"]] = member
    new_failed_agents.add(member["name"]) unless member["status"] == 'alive'
  end

  @commit_lock.synchronize { 
    if kwargs[:node]
      new_statuses = statuses.reject do |name, status|
        kwargs[:node].include?(name)
      end
    end

    status_query_th = Thread.new { @master.serf.query(STATUS_QUERY, '', **kwargs) }
    packages_query_th = Thread.new { @master.serf.query(PACKAGES_QUERY, '', **kwargs) }
    status_response = status_query_th.value
    packages_response = packages_query_th.value

    status_response["Responses"].each do |name, json|
      begin
        new_statuses[name] = JSON.parse(json)
      rescue JSON::ParserError => e
        logger.warn "Failed to parse status from #{name}: #{e.message}"
        new_failed_agents << name
        next
      end
    end

    packages_response["Responses"].each do |name, json|
      next unless new_statuses[name]

      begin
        resp = JSON.parse(json)

        PACKAGE_STATUS_KEYS.each do |k|
          new_statuses[name][k] = resp[k]
        end
      rescue JSON::ParserError => e
        logger.warn "Failed to parse packages from #{name}: #{e.message}"
        next
      end
    end

    (new_statuses.keys - packages_response["Responses"].keys).each do |name|
      PACKAGE_STATUS_KEYS.each do |k|
        if @statuses[name] && @statuses[name][k]
          new_statuses[name][k] = @statuses[name][k]
        end
      end
    end

    new_failed_agents = new_failed_agents.to_a

    (new_agents.keys - @agents.keys).join(", ").tap do |agents|
      logger.info "Added agents: #{agents}" unless agents.empty?
    end

    (@agents.keys - new_agents.keys).join(", ").tap do |agents|
      logger.info "Removed agents: #{agents}" unless agents.empty?
    end

    (failed_agents - new_failed_agents).join(", ").tap do |agents|
      logger.info "Recovered agents: #{agents}" unless agents.empty?
    end

    (new_failed_agents - failed_agents).join(", ").tap do |agents|
      logger.info "Newly failed agents: #{agents}" unless agents.empty?
    end

    @agents = new_agents.freeze
    @failed_agents = new_failed_agents.freeze
    @statuses = new_statuses
    @last_refresh_at = Time.now
  }

  self
end

#running?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/mamiya/master/agent_monitor.rb', line 76

def running?
  @thread && @thread.alive?
end

#start!Object



62
63
64
65
66
67
68
69
# File 'lib/mamiya/master/agent_monitor.rb', line 62

def start!
  @thread ||= Thread.new do
    loop do
      self.work_loop
      sleep @interval
    end
  end
end

#statuses(labels: nil) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
# File 'lib/mamiya/master/agent_monitor.rb', line 42

def statuses(labels: nil)
  if labels
    @statuses.select { |name, status|
      status['labels'] &&
        Mamiya::Util::LabelMatcher::Simple.new(status['labels']).
        match?(labels)
    }
  else
    @statuses
  end
end

#stop!Object



71
72
73
74
# File 'lib/mamiya/master/agent_monitor.rb', line 71

def stop!
  @thread.kill if running?
  @thread = nil
end

#work_loopObject



80
81
82
83
84
85
86
87
88
89
# File 'lib/mamiya/master/agent_monitor.rb', line 80

def work_loop
  self.refresh
rescue Exception => e
  raise e if @raise_exception

  logger.fatal "Periodical refreshing failed: #{e.class}: #{e.message}"
  e.backtrace.each do |line|
    logger.fatal "\t#{line}"
  end
end