Class: OneApm::Collector::AgentCommandRouter

Inherits:
Object
  • Object
show all
Defined in:
lib/one_apm/collector/containers/agent_command_router.rb

Defined Under Namespace

Classes: AgentCommandError

Constant Summary collapse

OA_SUCCESS_RESULT =
{}.freeze
OA_ERROR_KEY =
"error"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(event_listener = nil) ⇒ AgentCommandRouter

Returns a new instance of AgentCommandRouter.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 18

def initialize(event_listener=nil)
  @handlers    = Hash.new { |*| Proc.new { |cmd| self.unrecognized_agent_command(cmd) } }

  @backtrace_service = OneApm::Agent::Threading::BacktraceService.new(event_listener)

  @thread_profiler_session = OneApm::Collector::Commands::ThreadProfilerSession.new(@backtrace_service)
  @xray_session_collection = OneApm::Collector::Commands::XraySessionCollection.new(@backtrace_service, event_listener)

  @handlers['restart'] = Proc.new { OneApm::Collector::Commands::RestartAgent.new }
  @handlers['start_profiler'] = Proc.new { |cmd| thread_profiler_session.handle_start_command(cmd) }
  @handlers['stop_profiler']  = Proc.new { |cmd| thread_profiler_session.handle_stop_command(cmd) }
  @handlers['active_xray_sessions'] = Proc.new { |cmd| xray_session_collection.handle_active_xray_sessions(cmd) }

  if event_listener
    event_listener.subscribe(:before_shutdown, &method(:on_before_shutdown))
  end
end

Instance Attribute Details

#backtrace_serviceObject

Returns the value of attribute backtrace_service.



14
15
16
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 14

def backtrace_service
  @backtrace_service
end

#handlersObject (readonly)

Returns the value of attribute handlers.



12
13
14
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 12

def handlers
  @handlers
end

#thread_profiler_sessionObject

Returns the value of attribute thread_profiler_session.



14
15
16
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 14

def thread_profiler_session
  @thread_profiler_session
end

#xray_session_collectionObject

Returns the value of attribute xray_session_collection.



14
15
16
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 14

def xray_session_collection
  @xray_session_collection
end

Instance Method Details

#active_xray_command?(commands) ⇒ Boolean

Returns:

  • (Boolean)


53
54
55
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 53

def active_xray_command?(commands)
  commands.any? {|command| command.name == 'active_xray_sessions'}
end

#call_handler_for(agent_command) ⇒ Object



140
141
142
143
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 140

def call_handler_for(agent_command)
  handler = select_handler(agent_command)
  handler.call(agent_command)
end

#check_for_and_handle_agent_commandsObject



40
41
42
43
44
45
46
47
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 40

def check_for_and_handle_agent_commands
  commands = get_agent_commands

  stop_xray_sessions unless active_xray_command?(commands)

  results = invoke_commands(commands)
  one_apm_service.agent_command_results(results) unless results.empty?
end

#error(err) ⇒ Object



136
137
138
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 136

def error(err)
  { OA_ERROR_KEY => err.message }
end

#get_agent_commandsObject



100
101
102
103
104
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 100

def get_agent_commands
  commands = one_apm_service.get_agent_commands || []
  OneApm::Manager.logger.info "Received get_agent_commands = #{commands.inspect}" if commands.any?
  commands.map {|collector_command| OneApm::Collector::Commands::AgentCommand.new(collector_command)}
end

#harvest!Object



63
64
65
66
67
68
69
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 63

def harvest!
  profiles = []
  profiles += harvest_from_xray_session_collection
  profiles += harvest_from_thread_profiler_session
  log_profiles(profiles)
  profiles
end

#harvest_from_thread_profiler_sessionObject



82
83
84
85
86
87
88
89
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 82

def harvest_from_thread_profiler_session
  if self.thread_profiler_session.ready_to_harvest?
    self.thread_profiler_session.stop(true)
    [self.thread_profiler_session.harvest]
  else
    []
  end
end

#harvest_from_xray_session_collectionObject



78
79
80
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 78

def harvest_from_xray_session_collection
  self.xray_session_collection.harvest_thread_profiles
end

#invoke_command(agent_command) ⇒ Object



119
120
121
122
123
124
125
126
127
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 119

def invoke_command(agent_command)
  begin
    call_handler_for(agent_command)
    return success
  rescue AgentCommandError => e
    OneApm::Manager.logger.debug(e)
    error(e)
  end
end

#invoke_commands(agent_commands) ⇒ Object



106
107
108
109
110
111
112
113
114
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 106

def invoke_commands(agent_commands)
  results = {}

  agent_commands.each do |agent_command|
    results[agent_command.id.to_s] = invoke_command(agent_command)
  end

  results
end

#log_profiles(profiles) ⇒ Object



91
92
93
94
95
96
97
98
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 91

def log_profiles(profiles)
  if profiles.empty?
    OneApm::Manager.logger.debug "No thread profiles with data found to send."
  else
    profile_descriptions = profiles.map { |p| p.to_log_description }
    OneApm::Manager.logger.debug "Sending thread profiles [#{profile_descriptions.join(", ")}]"
  end
end

#merge!(*args) ⇒ Object

We don’t currently support merging thread profiles that failed to send back into the AgentCommandRouter, so we just no-op this method. Same with reset! - we don’t support asynchronous cancellation of a running thread profile or X-Ray session currently.



75
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 75

def merge!(*args); end

#on_before_shutdown(*args) ⇒ Object



57
58
59
60
61
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 57

def on_before_shutdown(*args)
  if self.thread_profiler_session.running?
    self.thread_profiler_session.stop(true)
  end
end

#one_apm_serviceObject



36
37
38
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 36

def one_apm_service
  OneApm::Manager.agent.service
end

#reset!Object



76
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 76

def reset!; end

#select_handler(agent_command) ⇒ Object



145
146
147
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 145

def select_handler(agent_command)
  @handlers[agent_command.name]
end

#stop_xray_sessionsObject



49
50
51
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 49

def stop_xray_sessions
  self.xray_session_collection.stop_all_sessions
end

#successObject



132
133
134
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 132

def success
  OA_SUCCESS_RESULT
end

#unrecognized_agent_command(agent_command) ⇒ Object



149
150
151
# File 'lib/one_apm/collector/containers/agent_command_router.rb', line 149

def unrecognized_agent_command(agent_command)
  OneApm::Manager.logger.debug("Unrecognized agent command #{agent_command.inspect}")
end