Class: Mamiya::Master::AgentMonitor
Overview
Class to monitor agent’s status. This collects all agents’ status. Statuses are updated by event from agent, and running serf query ‘mamiya:status` periodically.
Constant Summary
collapse
- STATUS_QUERY =
'mamiya:status'.freeze
- PACKAGES_QUERY =
'mamiya:packages'.freeze
- DEFAULT_INTERVAL =
60
- PACKAGE_STATUS_KEYS =
%w(packages prereleases releases currents).map(&:freeze).freeze
Instance Attribute Summary collapse
Instance Method Summary
collapse
#pkg__remove, #prerelease__remove, #release__remove, #task___fetch__finish, #task___prepare__finish, #task___switch__finish, #task__error, #task__finalize, #task__finish, #task__start
Constructor Details
#initialize(master, raise_exception: false) ⇒ AgentMonitor
Returns a new instance of AgentMonitor.
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
# File 'lib/mamiya/master/agent_monitor.rb', line 24
def initialize(master, raise_exception: false)
@master = master
@interval = (master.config[:master] &&
master.config[:master][:monitor] &&
master.config[:master][:monitor][:refresh_interval]) ||
DEFAULT_INTERVAL
@raise_exception = raise_exception
@agents = {}.freeze
@failed_agents = [].freeze
@statuses = {}
@commit_lock = Mutex.new
@last_refresh_at = nil
end
|
Instance Attribute Details
#agents ⇒ Object
Returns the value of attribute agents.
40
41
42
|
# File 'lib/mamiya/master/agent_monitor.rb', line 40
def agents
@agents
end
|
#failed_agents ⇒ Object
Returns the value of attribute failed_agents.
40
41
42
|
# File 'lib/mamiya/master/agent_monitor.rb', line 40
def failed_agents
@failed_agents
end
|
#last_refresh_at ⇒ Object
Returns the value of attribute last_refresh_at.
40
41
42
|
# File 'lib/mamiya/master/agent_monitor.rb', line 40
def last_refresh_at
@last_refresh_at
end
|
Instance Method Details
#application_status(app, labels: nil) ⇒ Object
58
59
60
|
# File 'lib/mamiya/master/agent_monitor.rb', line 58
def application_status(app, labels: nil)
ApplicationStatus.new(self, app, labels: labels)
end
|
#commit_event(event) ⇒ Object
91
92
93
|
# File 'lib/mamiya/master/agent_monitor.rb', line 91
def commit_event(event)
@commit_lock.synchronize { commit_event_without_lock(event) }
end
|
#commit_event_without_lock(event) ⇒ Object
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
# File 'lib/mamiya/master/agent_monitor.rb', line 95
def commit_event_without_lock(event)
return unless /\Amamiya:/ === event.user_event
method_name = event.user_event[7..-1].gsub(/:/, '__').gsub(/-/,'_')
return unless self.respond_to?(method_name, true)
payload = JSON.parse(event.payload)
agent = @statuses[payload["name"]]
return unless agent
logger.debug "Commiting #{event.user_event}"
logger.debug "- #{agent.inspect}"
__send__ method_name, agent, payload, event
logger.debug "+ #{agent.inspect}"
rescue JSON::ParserError => e
logger.warn "Failed to parse payload in event #{event.user_event}: #{e.message}"
end
|
#package_status(app, pkg, labels: nil) ⇒ Object
54
55
56
|
# File 'lib/mamiya/master/agent_monitor.rb', line 54
def package_status(app, pkg, labels: nil)
PackageStatus.new(self, app, pkg, labels: labels)
end
|
#refresh(**kwargs) ⇒ Object
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
|
# File 'lib/mamiya/master/agent_monitor.rb', line 114
def refresh(**kwargs)
logger.debug "Refreshing..."
new_agents = {}
new_failed_agents = Set.new
new_statuses = {}
@master.serf.members.each do |member|
new_agents[member["name"]] = member
new_failed_agents.add(member["name"]) unless member["status"] == 'alive'
end
@commit_lock.synchronize {
if kwargs[:node]
new_statuses = statuses.reject do |name, status|
kwargs[:node].include?(name)
end
end
status_query_th = Thread.new { @master.serf.query(STATUS_QUERY, '', **kwargs) }
packages_query_th = Thread.new { @master.serf.query(PACKAGES_QUERY, '', **kwargs) }
status_response = status_query_th.value
packages_response = packages_query_th.value
status_response["Responses"].each do |name, json|
begin
new_statuses[name] = JSON.parse(json)
rescue JSON::ParserError => e
logger.warn "Failed to parse status from #{name}: #{e.message}"
new_failed_agents << name
next
end
end
packages_response["Responses"].each do |name, json|
next unless new_statuses[name]
begin
resp = JSON.parse(json)
PACKAGE_STATUS_KEYS.each do |k|
new_statuses[name][k] = resp[k]
end
rescue JSON::ParserError => e
logger.warn "Failed to parse packages from #{name}: #{e.message}"
next
end
end
(new_statuses.keys - packages_response["Responses"].keys).each do |name|
PACKAGE_STATUS_KEYS.each do |k|
if @statuses[name] && @statuses[name][k]
new_statuses[name][k] = @statuses[name][k]
end
end
end
new_failed_agents = new_failed_agents.to_a
(new_agents.keys - @agents.keys).join(", ").tap do |agents|
logger.info "Added agents: #{agents}" unless agents.empty?
end
(@agents.keys - new_agents.keys).join(", ").tap do |agents|
logger.info "Removed agents: #{agents}" unless agents.empty?
end
(failed_agents - new_failed_agents).join(", ").tap do |agents|
logger.info "Recovered agents: #{agents}" unless agents.empty?
end
(new_failed_agents - failed_agents).join(", ").tap do |agents|
logger.info "Newly failed agents: #{agents}" unless agents.empty?
end
@agents = new_agents.freeze
@failed_agents = new_failed_agents.freeze
@statuses = new_statuses
@last_refresh_at = Time.now
}
self
end
|
#running? ⇒ Boolean
76
77
78
|
# File 'lib/mamiya/master/agent_monitor.rb', line 76
def running?
@thread && @thread.alive?
end
|
#start! ⇒ Object
62
63
64
65
66
67
68
69
|
# File 'lib/mamiya/master/agent_monitor.rb', line 62
def start!
@thread ||= Thread.new do
loop do
self.work_loop
sleep @interval
end
end
end
|
#statuses(labels: nil) ⇒ Object
42
43
44
45
46
47
48
49
50
51
52
|
# File 'lib/mamiya/master/agent_monitor.rb', line 42
def statuses(labels: nil)
if labels
@statuses.select { |name, status|
status['labels'] &&
Mamiya::Util::LabelMatcher::Simple.new(status['labels']).
match?(labels)
}
else
@statuses
end
end
|
#stop! ⇒ Object
71
72
73
74
|
# File 'lib/mamiya/master/agent_monitor.rb', line 71
def stop!
@thread.kill if running?
@thread = nil
end
|
#work_loop ⇒ Object
80
81
82
83
84
85
86
87
88
89
|
# File 'lib/mamiya/master/agent_monitor.rb', line 80
def work_loop
self.refresh
rescue Exception => e
raise e if @raise_exception
logger.fatal "Periodical refreshing failed: #{e.class}: #{e.message}"
e.backtrace.each do |line|
logger.fatal "\t#{line}"
end
end
|