Class: Instana::Agent

Inherits:
Object
  • Object
show all
Defined in:
lib/instana/agent.rb

Constant Summary collapse

LOCALHOST =
'127.0.0.1'.freeze
MIME_JSON =
'application/json'.freeze
DISCOVERY_PATH =
'com.instana.plugin.ruby.discovery'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeAgent

Returns a new instance of Agent.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/instana/agent.rb', line 24

def initialize
  # Supported two states (unannounced & announced)
  @state = :unannounced

  # Timestamp of the last successful response from
  # entity data reporting.
  @entity_last_seen = Time.now

  # Used to track the last time the collect timer was run.
  @last_collect_run = Time.now

  # Two timers, one for each state (unannounced & announced)
  @timers = ::Timers::Group.new
  @announce_timer = nil
  @collect_timer = nil

  @thread_spawn_lock = Mutex.new

  # Detect platform flags
  @is_linux = (RUBY_PLATFORM =~ /linux/i) ? true : false
  @is_osx = (RUBY_PLATFORM =~ /darwin/i) ? true : false

  # In case we're running in Docker, have the default gateway available
  # to check in case we're running in bridged network mode
  if @is_linux
    @default_gateway = `/sbin/ip route | awk '/default/ { print $3 }'`.chomp
  else
    @default_gateway = nil
  end

  # Re-useable HTTP client for communication with
  # the host agent.
  @httpclient = nil

  # Collect initial process info - repeat prior to announce
  # in `announce_sensor` in case of process rename, after fork etc.
  @process = ::Instana::Util.collect_process_info

  # The agent UUID returned from the host agent
  @agent_uuid = nil

  # This will hold info on the discovered agent host
  @discovered = nil
end

Instance Attribute Details

#agent_uuidObject

Returns the value of attribute agent_uuid.



15
16
17
# File 'lib/instana/agent.rb', line 15

def agent_uuid
  @agent_uuid
end

#collect_threadObject

Returns the value of attribute collect_thread.



17
18
19
# File 'lib/instana/agent.rb', line 17

def collect_thread
  @collect_thread
end

#processObject

Returns the value of attribute process.



16
17
18
# File 'lib/instana/agent.rb', line 16

def process
  @process
end

#stateObject

Returns the value of attribute state.



14
15
16
# File 'lib/instana/agent.rb', line 14

def state
  @state
end

#thread_spawn_lockObject

Returns the value of attribute thread_spawn_lock.



18
19
20
# File 'lib/instana/agent.rb', line 18

def thread_spawn_lock
  @thread_spawn_lock
end

Instance Method Details

#after_forkObject

Used post fork to re-initialize state and restart communications with the host agent.



72
73
74
75
76
77
78
79
80
81
82
# File 'lib/instana/agent.rb', line 72

def after_fork
  ::Instana.logger.debug "after_fork hook called. Falling back to unannounced state and spawning a new background agent thread."

  # Reseed the random number generator for this
  # new thread.
  srand

  transition_to(:unannounced)
  setup
  spawn_background_thread
end

#announce_sensorObject

Collect process ID, name and arguments to notify the host agent.



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/instana/agent.rb', line 185

def announce_sensor
  unless @discovered
    ::Instana.logger.debug("#{__method__} called but discovery hasn't run yet!")
    return false
  end

  # Always re-collect process info before announce in case the process name has been
  # re-written (looking at you puma!)
  @process = ::Instana::Util.collect_process_info

  announce_payload = {}
  announce_payload[:pid] = pid_namespace? ? get_real_pid : Process.pid
  announce_payload[:name] = @process[:name]
  announce_payload[:args] = @process[:arguments]

  if @is_linux && !::Instana.test?
    # We create an open socket to the host agent in case we are running in a container
    # and the real pid needs to be detected.
    socket = TCPSocket.new @discovered[:agent_host], @discovered[:agent_port]
    announce_payload[:fd] = socket.fileno
    announce_payload[:inode] = File.readlink("/proc/#{Process.pid}/fd/#{socket.fileno}")
  end

  uri = URI.parse("http://#{@discovered[:agent_host]}:#{@discovered[:agent_port]}/#{DISCOVERY_PATH}")
  req = Net::HTTP::Put.new(uri)
  req.body = Oj.dump(announce_payload)

  ::Instana.logger.debug "Announce: http://#{@discovered[:agent_host]}:#{@discovered[:agent_port]}/#{DISCOVERY_PATH} - payload: #{req.body}"

  response = make_host_agent_request(req)

  if response && (response.code.to_i == 200)
    data = Oj.load(response.body)
    @process[:report_pid] = data['pid']
    @agent_uuid = data['agentUuid']
    true
  else
    false
  end
rescue => e
  Instana.logger.info "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}"
  Instana.logger.debug e.backtrace.join("\r\n")
  return false
ensure
  socket.close if socket
end

#handle_agent_tasks(json_string) ⇒ Object

When request(s) are received by the host agent, it is sent here for handling & processing.

Parameters:

  • json_string (String)

    the requests from the host agent



275
276
277
278
279
280
281
282
283
284
285
# File 'lib/instana/agent.rb', line 275

def handle_agent_tasks(json_string)
  tasks = Oj.load(json_string)

  if tasks.is_a?(Hash)
    process_agent_task(tasks)
  elsif tasks.is_a?(Array)
    tasks.each do |t|
      process_agent_task(t)
    end
  end
end

#host_agent_ready?Boolean

Check that the host agent is available and can be contacted. This will first check localhost and if not, then attempt on the default gateway for docker in bridged mode.

Returns:

  • (Boolean)


351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# File 'lib/instana/agent.rb', line 351

def host_agent_ready?
  @discovered ||= run_discovery

  if @discovered
    # Try default location or manually configured (if so)
    uri = URI.parse("http://#{@discovered[:agent_host]}:#{@discovered[:agent_port]}/")
    req = Net::HTTP::Get.new(uri)

    response = make_host_agent_request(req)

    if response && (response.code.to_i == 200)
      return true
    end
  end
  false
rescue => e
  Instana.logger.debug "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}"
  Instana.logger.debug e.backtrace.join("\r\n") unless ::Instana.test?
  return false
end

#process_agent_task(task) ⇒ Object

Process a task sent from the host agent.

Parameters:

  • task (String)

    the request json from the host agent



291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/instana/agent.rb', line 291

def process_agent_task(task)
  if task.key?("action")
    if task["action"] == "ruby.source"
      payload = ::Instana::Util.get_rb_source(task["args"]["file"])
    else
      payload = { :error => "Unrecognized action: #{task["action"]}. An newer Instana gem may be required for this. Current version: #{::Instana::VERSION}" }
    end
  else
    payload = { :error => "Instana Ruby: No action specified in request." }
  end

  path = "com.instana.plugin.ruby/response.#{@process[:report_pid]}?messageId=#{URI.encode(task['messageId'])}"
  uri = URI.parse("http://#{@discovered[:agent_host]}:#{@discovered[:agent_port]}/#{path}")
  req = Net::HTTP::Post.new(uri)
  req.body = Oj.dump(payload)
  ::Instana.logger.debug "Responding to agent request: #{req.inspect}"
  make_host_agent_request(req)

rescue StandardError => e
  Instana.logger.debug "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}"
  Instana.logger.debug e.backtrace.join("\r\n")
end

#ready?Boolean

Indicates if the agent is ready to send metrics and/or data.

Returns:

  • (Boolean)


427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
# File 'lib/instana/agent.rb', line 427

def ready?
  # In test, we're always ready :-)
  return true if ENV['INSTANA_GEM_TEST']

  if forked?
    ::Instana.logger.debug "Instana: detected fork.  Calling after_fork"
    after_fork
  end

  @state == :announced
rescue => e
  Instana.logger.debug "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}"
  Instana.logger.debug e.backtrace.join("\r\n") unless ::Instana.test?
  return false
end

#report_metrics(payload) ⇒ Boolean

Method to report metrics data to the host agent.

Parameters:

  • paylod (Hash)

    The collection of metrics to report.

Returns:

  • (Boolean)

    true on success, false otherwise



238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
# File 'lib/instana/agent.rb', line 238

def report_metrics(payload)
  unless @discovered
    ::Instana.logger.debug("#{__method__} called but discovery hasn't run yet!")
    return false
  end

  path = "com.instana.plugin.ruby.#{@process[:report_pid]}"
  uri = URI.parse("http://#{@discovered[:agent_host]}:#{@discovered[:agent_port]}/#{path}")
  req = Net::HTTP::Post.new(uri)

  req.body = Oj.dump(payload)
  response = make_host_agent_request(req)

  if response
    if response.body && response.body.length > 2
      # The host agent returned something indicating that is has a request for us that we
      # need to process.
      handle_agent_tasks(response.body)
    end

    if response.code.to_i == 200
      @entity_last_seen = Time.now
      return true
    end

  end
  false
rescue => e
  Instana.logger.debug "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}"
  Instana.logger.debug e.backtrace.join("\r\n")
end

#report_pidObject

Returns the PID that we are reporting to



420
421
422
# File 'lib/instana/agent.rb', line 420

def report_pid
  @process[:report_pid]
end

#report_spans(spans) ⇒ Boolean

Accept and report spans to the host agent.

Parameters:

  • traces (Array)

    An array of [Span]

Returns:

  • (Boolean)


319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/instana/agent.rb', line 319

def report_spans(spans)
  return unless @state == :announced

  unless @discovered
    ::Instana.logger.debug("#{__method__} called but discovery hasn't run yet!")
    return false
  end

  path = "com.instana.plugin.ruby/traces.#{@process[:report_pid]}"
  uri = URI.parse("http://#{@discovered[:agent_host]}:#{@discovered[:agent_port]}/#{path}")
  req = Net::HTTP::Post.new(uri)

  req.body = Oj.dump(spans)
  response = make_host_agent_request(req)

  if response
    last_trace_response = response.code.to_i

    if [200, 204].include?(last_trace_response)
      return true
    end
  end
  false
rescue => e
  Instana.logger.debug "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}"
  Instana.logger.debug e.backtrace.join("\r\n")
end

#run_discoveryHash

Runs a discovery process to determine where we can contact the host agent. This is usually just localhost but in docker can be found on the default gateway. This also allows for manual configuration via ::Instana.config.

Returns:

  • (Hash)

    a hash with :agent_host, :agent_port values or empty hash



378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
# File 'lib/instana/agent.rb', line 378

def run_discovery
  discovered = {}

  ::Instana.logger.debug "#{__method__}: Running agent discovery..."

  # Try default location or manually configured (if so)
  uri = URI.parse("http://#{::Instana.config[:agent_host]}:#{::Instana.config[:agent_port]}/")
  req = Net::HTTP::Get.new(uri)

  ::Instana.logger.debug "#{__method__}: Trying #{::Instana.config[:agent_host]}:#{::Instana.config[:agent_port]}"

  response = make_host_agent_request(req)

  if response && (response.code.to_i == 200)
    discovered[:agent_host] = ::Instana.config[:agent_host]
    discovered[:agent_port] = ::Instana.config[:agent_port]
    ::Instana.logger.debug "#{__method__}: Found #{discovered[:agent_host]}:#{discovered[:agent_port]}"
    return discovered
  end

  return nil unless @is_linux

  # We are potentially running on Docker in bridged networking mode.
  # Attempt to contact default gateway
  uri = URI.parse("http://#{@default_gateway}:#{::Instana.config[:agent_port]}/")
  req = Net::HTTP::Get.new(uri)

  ::Instana.logger.debug "#{__method__}: Trying default gateway #{@default_gateway}:#{::Instana.config[:agent_port]}"

  response = make_host_agent_request(req)

  if response && (response.code.to_i == 200)
    discovered[:agent_host] = @default_gateway
    discovered[:agent_port] = ::Instana.config[:agent_port]
    ::Instana.logger.debug "#{__method__}: Found #{discovered[:agent_host]}:#{discovered[:agent_port]}"
    return discovered
  end
  nil
end

#setupObject

Sets up periodic timers and starts the agent in a background thread.



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/instana/agent.rb', line 113

def setup
  # The announce timer
  # We attempt to announce this ruby sensor to the host agent.
  # In case of failure, we try again in 30 seconds.
  @announce_timer = @timers.now_and_every(30) do
    if @state == :unannounced
      if host_agent_ready? && announce_sensor
        transition_to(:announced)
        ::Instana.logger.warn "Host agent available. We're in business. (#{@state} pid:#{Process.pid} #{@process[:name]})"
      end
    end
  end

  # The collect timer
  # If we are in announced state, send metric data (only delta reporting)
  # every ::Instana.config[:collector][:interval] seconds.
  @collect_timer = @timers.every(::Instana.config[:collector][:interval]) do
    # Make sure that this block doesn't get called more often than the interval.  This can
    # happen on high CPU load and a back up of timer runs.  If we are called before `interval`
    # then we just skip.
    unless (Time.now - @last_collect_run) < ::Instana.config[:collector][:interval]
      @last_collect_run = Time.now
      if @state == :announced
        if !::Instana.collector.collect_and_report
          # If report has been failing for more than 1 minute,
          # fall back to unannounced state
          if (Time.now - @entity_last_seen) > 60
            ::Instana.logger.warn "Host agent offline for >1 min.  Going to sit in a corner..."
            transition_to(:unannounced)
          end
        end
        ::Instana.processor.send
      end
    end
  end
end

#spawn_background_threadObject

Spawns the background thread and calls start. This method is separated out for those who wish to control which thread the background agent will run in.

This method can be overridden with the following:

module Instana

class Agent
  def spawn_background_thread
    # start thread
    start
  end
end

end



99
100
101
102
103
104
105
106
107
108
109
# File 'lib/instana/agent.rb', line 99

def spawn_background_thread
  @thread_spawn_lock.synchronize {
    if @collect_thread && @collect_thread.alive?
      ::Instana.logger.info "[instana] Collect thread already started & alive.  Not spawning another."
    else
      @collect_thread = Thread.new do
        start
      end
    end
  }
end

#startObject

Starts the timer loop for the timers that were initialized in the setup method. This is blocking and should only be called from an already initialized background thread.



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/instana/agent.rb', line 154

def start
  ::Instana.logger.warn "Host agent not available.  Will retry periodically." unless host_agent_ready?

  loop do
    if @state == :unannounced
      @collect_timer.pause
      @announce_timer.resume
    else
      @announce_timer.pause
      @collect_timer.resume
    end
    @timers.wait
  end
rescue Exception => e
  ::Instana.logger.warn "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}"
  ::Instana.logger.debug e.backtrace.join("\r\n")
ensure
  if @state == :announced
    # Pause the timers so they don't fire while we are
    # reporting traces
    @collect_timer.pause
    @announce_timer.pause

    ::Instana.logger.debug "#{Thread.current}: Agent exiting. Reporting final #{::Instana.processor.queue_count} trace(s)."
    ::Instana.processor.send
  end
end