Class: OneApm::Collector::AgentCommandRouter
- Inherits:
-
Object
- Object
- OneApm::Collector::AgentCommandRouter
show all
- Defined in:
- lib/one_apm/collector/containers/agent_command_router.rb
Defined Under Namespace
Classes: AgentCommandError
Constant Summary
collapse
- OA_SUCCESS_RESULT =
{}.freeze
- OA_ERROR_KEY =
"error"
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 18
def initialize(event_listener=nil)
@handlers = Hash.new { |*| Proc.new { |cmd| self.unrecognized_agent_command(cmd) } }
@backtrace_service = OneApm::Agent::Threading::BacktraceService.new(event_listener)
@thread_profiler_session = OneApm::Collector::Commands::ThreadProfilerSession.new(@backtrace_service)
@xray_session_collection = OneApm::Collector::Commands::XraySessionCollection.new(@backtrace_service, event_listener)
@handlers['restart'] = Proc.new { OneApm::Collector::Commands::RestartAgent.new }
@handlers['start_profiler'] = Proc.new { |cmd| thread_profiler_session.handle_start_command(cmd) }
@handlers['stop_profiler'] = Proc.new { |cmd| thread_profiler_session.handle_stop_command(cmd) }
@handlers['active_xray_sessions'] = Proc.new { |cmd| xray_session_collection.handle_active_xray_sessions(cmd) }
if event_listener
event_listener.subscribe(:before_shutdown, &method(:on_before_shutdown))
end
end
|
Instance Attribute Details
#backtrace_service ⇒ Object
Returns the value of attribute backtrace_service.
14
15
16
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 14
def backtrace_service
@backtrace_service
end
|
#handlers ⇒ Object
Returns the value of attribute handlers.
12
13
14
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 12
def handlers
@handlers
end
|
#thread_profiler_session ⇒ Object
Returns the value of attribute thread_profiler_session.
14
15
16
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 14
def thread_profiler_session
@thread_profiler_session
end
|
#xray_session_collection ⇒ Object
Returns the value of attribute xray_session_collection.
14
15
16
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 14
def xray_session_collection
@xray_session_collection
end
|
Instance Method Details
#active_xray_command?(commands) ⇒ Boolean
53
54
55
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 53
def active_xray_command?(commands)
commands.any? {|command| command.name == 'active_xray_sessions'}
end
|
#call_handler_for(agent_command) ⇒ Object
140
141
142
143
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 140
def call_handler_for(agent_command)
handler = select_handler(agent_command)
handler.call(agent_command)
end
|
#check_for_and_handle_agent_commands ⇒ Object
40
41
42
43
44
45
46
47
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 40
def check_for_and_handle_agent_commands
commands = get_agent_commands
stop_xray_sessions unless active_xray_command?(commands)
results = invoke_commands(commands)
one_apm_service.agent_command_results(results) unless results.empty?
end
|
#error(err) ⇒ Object
136
137
138
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 136
def error(err)
{ OA_ERROR_KEY => err.message }
end
|
#get_agent_commands ⇒ Object
100
101
102
103
104
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 100
def get_agent_commands
commands = one_apm_service.get_agent_commands || []
OneApm::Manager.logger.info "Received get_agent_commands = #{commands.inspect}" if commands.any?
commands.map {|collector_command| OneApm::Collector::Commands::AgentCommand.new(collector_command)}
end
|
#harvest! ⇒ Object
63
64
65
66
67
68
69
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 63
def harvest!
profiles = []
profiles += harvest_from_xray_session_collection
profiles += harvest_from_thread_profiler_session
log_profiles(profiles)
profiles
end
|
#harvest_from_thread_profiler_session ⇒ Object
82
83
84
85
86
87
88
89
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 82
def harvest_from_thread_profiler_session
if self.thread_profiler_session.ready_to_harvest?
self.thread_profiler_session.stop(true)
[self.thread_profiler_session.harvest]
else
[]
end
end
|
#harvest_from_xray_session_collection ⇒ Object
78
79
80
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 78
def harvest_from_xray_session_collection
self.xray_session_collection.harvest_thread_profiles
end
|
#invoke_command(agent_command) ⇒ Object
119
120
121
122
123
124
125
126
127
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 119
def invoke_command(agent_command)
begin
call_handler_for(agent_command)
return success
rescue AgentCommandError => e
OneApm::Manager.logger.debug(e)
error(e)
end
end
|
#invoke_commands(agent_commands) ⇒ Object
106
107
108
109
110
111
112
113
114
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 106
def invoke_commands(agent_commands)
results = {}
agent_commands.each do |agent_command|
results[agent_command.id.to_s] = invoke_command(agent_command)
end
results
end
|
#log_profiles(profiles) ⇒ Object
91
92
93
94
95
96
97
98
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 91
def log_profiles(profiles)
if profiles.empty?
OneApm::Manager.logger.debug "No thread profiles with data found to send."
else
profile_descriptions = profiles.map { |p| p.to_log_description }
OneApm::Manager.logger.debug "Sending thread profiles [#{profile_descriptions.join(", ")}]"
end
end
|
#merge!(*args) ⇒ Object
We don’t currently support merging thread profiles that failed to send back into the AgentCommandRouter, so we just no-op this method. Same with reset! - we don’t support asynchronous cancellation of a running thread profile or X-Ray session currently.
75
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 75
def merge!(*args); end
|
#on_before_shutdown(*args) ⇒ Object
57
58
59
60
61
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 57
def on_before_shutdown(*args)
if self.thread_profiler_session.running?
self.thread_profiler_session.stop(true)
end
end
|
#one_apm_service ⇒ Object
36
37
38
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 36
def one_apm_service
OneApm::Manager.agent.service
end
|
#reset! ⇒ Object
76
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 76
def reset!; end
|
#select_handler(agent_command) ⇒ Object
145
146
147
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 145
def select_handler(agent_command)
@handlers[agent_command.name]
end
|
#stop_xray_sessions ⇒ Object
49
50
51
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 49
def stop_xray_sessions
self.xray_session_collection.stop_all_sessions
end
|
#success ⇒ Object
132
133
134
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 132
def success
OA_SUCCESS_RESULT
end
|
#unrecognized_agent_command(agent_command) ⇒ Object
149
150
151
|
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 149
def unrecognized_agent_command(agent_command)
OneApm::Manager.logger.debug("Unrecognized agent command #{agent_command.inspect}")
end
|