Class: Instana::Agent

Inherits:
Object
  • Object
show all
Includes:
AgentHelpers, AgentHooks, AgentTasks
Defined in:
lib/instana/agent.rb

Constant Summary collapse

LOCALHOST =
'127.0.0.1'.freeze
MIME_JSON =
'application/json'.freeze
DISCOVERY_PATH =
'com.instana.plugin.ruby.discovery'.freeze
METRICS_PATH =
"com.instana.plugin.ruby.%s"
TRACES_PATH =
"com.instana.plugin.ruby/traces.%s"

Constants included from AgentTasks

AgentTasks::OJ_OPTIONS

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from AgentTasks

#handle_agent_tasks, #process_agent_task

Methods included from AgentHooks

#after_fork, #after_resque_fork, #before_resque_fork

Methods included from AgentHelpers

#forked?, #get_real_pid, #pid_namespace?, #ready?, #report_pid

Constructor Details

#initializeAgent

Returns a new instance of Agent.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/instana/agent.rb', line 37

def initialize
  @testmode = ENV.key?('INSTANA_TEST')

  # Supported two states (unannounced & announced)
  @state = :unannounced

  # Timestamp of the last successful response from
  # entity data reporting.
  @entity_last_seen = Time.now

  # Used to track the last time the collect timer was run.
  @last_collect_run = Time.now

  # Two timers, one for each state (unannounced & announced)
  @timers = ::Timers::Group.new
  @announce_timer = nil
  @pending_timer = nil
  @collect_timer = nil

  @thread_spawn_lock = Mutex.new

  # Detect platform flags
  @is_linux = (RbConfig::CONFIG['host_os'] =~ /linux/i) ? true : false
  @is_osx = (RUBY_PLATFORM =~ /darwin/i) ? true : false

  # In case we're running in Docker, have the default gateway available
  # to check in case we're running in bridged network mode
  if @is_linux && File.exist?("/sbin/ip")
    @default_gateway = `/sbin/ip route | awk '/default/ { print $3 }'`.chomp
  else
    @default_gateway = nil
  end

  # Re-useable HTTP client for communication with
  # the host agent.
  @httpclient = nil

  # Collect initial process info - repeat prior to announce
  # in `announce_sensor` in case of process rename, after fork etc.
  @process = ::Instana::Util.collect_process_info

  # The agent UUID returned from the host agent
  @agent_uuid = nil

  # This will hold info on the discovered agent host
  @discovered = nil

  # The agent may pass down custom headers for this sensor to capture
  @extra_headers = nil
end

Instance Attribute Details

#agent_uuidObject

Returns the value of attribute agent_uuid.



23
24
25
# File 'lib/instana/agent.rb', line 23

def agent_uuid
  @agent_uuid
end

#collect_threadObject

Returns the value of attribute collect_thread.



25
26
27
# File 'lib/instana/agent.rb', line 25

def collect_thread
  @collect_thread
end

#extra_headersObject

Returns the value of attribute extra_headers.



27
28
29
# File 'lib/instana/agent.rb', line 27

def extra_headers
  @extra_headers
end

#processObject

Returns the value of attribute process.



24
25
26
# File 'lib/instana/agent.rb', line 24

def process
  @process
end

#stateObject

Returns the value of attribute state.



22
23
24
# File 'lib/instana/agent.rb', line 22

def state
  @state
end

#testmodeObject

Returns the value of attribute testmode.



29
30
31
# File 'lib/instana/agent.rb', line 29

def testmode
  @testmode
end

#thread_spawn_lockObject

Returns the value of attribute thread_spawn_lock.



26
27
28
# File 'lib/instana/agent.rb', line 26

def thread_spawn_lock
  @thread_spawn_lock
end

Instance Method Details

#announce_sensorObject

Collect process ID, name and arguments to notify the host agent.



223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
# File 'lib/instana/agent.rb', line 223

def announce_sensor
  unless @discovered
    ::Instana.logger.debug("#{__method__} called but discovery hasn't run yet!")
    return false
  end

  # Always re-collect process info before announce in case the process name has been
  # re-written (looking at you puma!)
  @process = ::Instana::Util.collect_process_info

  announce_payload = {}
  announce_payload[:pid] = pid_namespace? ? get_real_pid : Process.pid
  announce_payload[:name] = @process[:name]
  announce_payload[:args] = @process[:arguments]

  if @is_linux && !@testmode
    # We create an open socket to the host agent in case we are running in a container
    # and the real pid needs to be detected.
    socket = TCPSocket.new @discovered[:agent_host], @discovered[:agent_port]
    announce_payload[:fd] = socket.fileno
    announce_payload[:inode] = File.readlink("/proc/#{Process.pid}/fd/#{socket.fileno}")
  end

  uri = URI.parse("http://#{@discovered[:agent_host]}:#{@discovered[:agent_port]}/#{DISCOVERY_PATH}")
  req = Net::HTTP::Put.new(uri)
  req.body = Oj.dump(announce_payload, OJ_OPTIONS)

  ::Instana.logger.debug { "Announce: http://#{@discovered[:agent_host]}:#{@discovered[:agent_port]}/#{DISCOVERY_PATH} - payload: #{req.body}" }

  response = make_host_agent_request(req)

  if response && (response.code.to_i == 200)
    data = Oj.load(response.body, OJ_OPTIONS)
    @process[:report_pid] = data['pid']
    @agent_uuid = data['agentUuid']

    if data.key?('extraHeaders')
      @extra_headers = data['extraHeaders']
    end
    true
  else
    false
  end
rescue => e
  Instana.logger.info { "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}" }
  Instana.logger.debug { e.backtrace.join("\r\n") }
  return false
ensure
  socket.close if socket && !socket.closed?
end

#host_agent_available?Boolean

Check that the host agent is available and can be contacted. This will first check localhost and if not, then attempt on the default gateway for docker in bridged mode.

Returns:

  • (Boolean)


351
352
353
354
355
356
357
358
359
360
361
362
# File 'lib/instana/agent.rb', line 351

def host_agent_available?
  @discovered ||= run_discovery

  if @discovered
    return true
  end
  false
rescue => e
  Instana.logger.debug { "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}" }
  Instana.logger.debug { e.backtrace.join("\r\n") } unless @testmode
  return false
end

#report_metrics(payload) ⇒ Boolean

Method to report metrics data to the host agent.

Parameters:

  • paylod (Hash)

    The collection of metrics to report.

Returns:

  • (Boolean)

    true on success, false otherwise



280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/instana/agent.rb', line 280

def report_metrics(payload)
  if @state != :ready && !@testmode
    ::Instana.logger.debug "report_metrics called but agent not in ready state."
    return false
  end

  path = sprintf(METRICS_PATH, @process[:report_pid])
  uri = URI.parse("http://#{@discovered[:agent_host]}:#{@discovered[:agent_port]}/#{path}")
  req = Net::HTTP::Post.new(uri)

  req.body = Oj.dump(payload, OJ_OPTIONS)
  response = make_host_agent_request(req)

  if response
    if response.body && response.body.length > 2
      # The host agent returned something indicating that is has a request for us that we
      # need to process.
      handle_agent_tasks(response.body)
    end

    if response.code.to_i == 200
      @entity_last_seen = Time.now
      return true
    end

  end
  false
rescue => e
  Instana.logger.debug { "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}" }
  Instana.logger.debug { e.backtrace.join("\r\n") }
end

#report_spans(spans) ⇒ Boolean

Accept and report spans to the host agent.

Parameters:

  • spans (Array)

    An array of [Span]

Returns:

  • (Boolean)


317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# File 'lib/instana/agent.rb', line 317

def report_spans(spans)
  if @state != :ready && !@testmode
    ::Instana.logger.debug "report_spans called but agent not in ready state."
    return false
  end

  ::Instana.logger.debug "Reporting #{spans.length} spans"

  path = sprintf(TRACES_PATH, @process[:report_pid])
  uri = URI.parse("http://#{@discovered[:agent_host]}:#{@discovered[:agent_port]}/#{path}")
  req = Net::HTTP::Post.new(uri)

  opts = OJ_OPTIONS.merge({omit_nil: true})

  req.body = Oj.dump(spans, opts)
  response = make_host_agent_request(req)

  if response
    last_trace_response = response.code.to_i

    if [200, 204].include?(last_trace_response)
      return true
    end
  end
  false
rescue => e
  Instana.logger.debug { "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}" }
  Instana.logger.debug { e.backtrace.join("\r\n") }
end

#run_discoveryHash

Runs a discovery process to determine where we can contact the host agent. This is usually just localhost but in docker can be found on the default gateway. Another option is the INSTANA_AGENT_HOST environment variable. This also allows for manual configuration via ::Instana.config.

Returns:

  • (Hash)

    a hash with :agent_host, :agent_port values or empty hash



370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
# File 'lib/instana/agent.rb', line 370

def run_discovery
  discovered = {}

  ::Instana.logger.debug { "#{__method__}: Running agent discovery..." }

  # Try default location or manually configured (if so)
  uri = URI.parse("http://#{::Instana.config[:agent_host]}:#{::Instana.config[:agent_port]}/")
  req = Net::HTTP::Get.new(uri)

  ::Instana.logger.debug { "#{__method__}: Trying #{::Instana.config[:agent_host]}:#{::Instana.config[:agent_port]}" }

  response = make_host_agent_request(req)

  if response && (response.code.to_i == 200)
    discovered[:agent_host] = ::Instana.config[:agent_host]
    discovered[:agent_port] = ::Instana.config[:agent_port]
    ::Instana.logger.debug { "#{__method__}: Found #{discovered[:agent_host]}:#{discovered[:agent_port]}" }
    return discovered
  end

  return nil unless @is_linux

  # We are potentially running on Docker in bridged networking mode.
  # Attempt to contact default gateway
  uri = URI.parse("http://#{@default_gateway}:#{::Instana.config[:agent_port]}/")
  req = Net::HTTP::Get.new(uri)

  ::Instana.logger.debug { "#{__method__}: Trying default gateway #{@default_gateway}:#{::Instana.config[:agent_port]}" }

  response = make_host_agent_request(req)

  if response && (response.code.to_i == 200)
    discovered[:agent_host] = @default_gateway
    discovered[:agent_port] = ::Instana.config[:agent_port]
    ::Instana.logger.debug { "#{__method__}: Found #{discovered[:agent_host]}:#{discovered[:agent_port]}" }
    return discovered
  end

  nil
end

#setupObject

Sets up periodic timers and starts the agent in a background thread.

There are three possible states for the agent:

- :unannounced
- :announced
- :ready


121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/instana/agent.rb', line 121

def setup
  if ENV.key?('INSTANA_DISABLE')
    ::Instana.logger.info "Instana gem disabled via environment variable (INSTANA_DISABLE).  Going to sit in a corner..."
  end

  # The announce timer
  # We attempt to announce this ruby sensor to the host agent.
  # In case of failure, we try again in 30 seconds.
  @announce_timer = @timers.now_and_every(30) do
    if @state == :unannounced && !ENV.key?('INSTANA_DISABLE')
      if host_agent_available? && announce_sensor
        transition_to(:announced)
        ::Instana.logger.debug "Announce successful.  Waiting on ready. (#{@state} pid:#{Process.pid} #{@process[:name]})"
      end
    end
  end

  # The pending timer
  # Handles the time between successful announce and when the host agent METRICS_PATH and TRACES_PATH
  # no longer return 404s and are truly ready to accept data.
  @pending_timer = @timers.now_and_every(1) do
    path = sprintf(METRICS_PATH, @process[:report_pid])
    uri = URI.parse("http://#{::Instana.config[:agent_host]}:#{::Instana.config[:agent_port]}/#{path}")
    req = Net::HTTP::Post.new(uri)
    req.body = Oj.dump({}, OJ_OPTIONS)

    response = make_host_agent_request(req)
    if response && (response.code.to_i == 200)
      transition_to(:ready)
      ::Instana.logger.info "Host agent available. We're in business. (#{@state} pid:#{Process.pid} #{@process[:name]})"
    end
  end

  # The collect timer
  # If we are in announced state, send metric data (only delta reporting)
  # every ::Instana.config[:collector][:interval] seconds.
  @collect_timer = @timers.every(::Instana.config[:collector][:interval]) do
    # Make sure that this block doesn't get called more often than the interval.  This can
    # happen on high CPU load and a back up of timer runs.  If we are called before `interval`
    # then we just skip.
    unless (Time.now - @last_collect_run) < ::Instana.config[:collector][:interval]
      @last_collect_run = Time.now
      if @state == :ready
        if !::Instana.collector.collect_and_report
          # If report has been failing for more than 1 minute,
          # fall back to unannounced state
          if (Time.now - @entity_last_seen) > 60
            ::Instana.logger.warn "Host agent offline for >1 min.  Going to sit in a corner..."
            transition_to(:unannounced)
          end
        end
        ::Instana.processor.send
      end
    end
  end
end

#spawn_background_threadObject

Spawns the background thread and calls start. This method is separated out for those who wish to control which thread the background agent will run in.

This method can be overridden with the following:

module Instana

class Agent
  def spawn_background_thread
    # start thread
    start
  end
end

end



103
104
105
106
107
108
109
110
111
112
113
# File 'lib/instana/agent.rb', line 103

def spawn_background_thread
  @thread_spawn_lock.synchronize {
    if @collect_thread && @collect_thread.alive?
      ::Instana.logger.info "[instana] Collect thread already started & alive.  Not spawning another."
    else
      @collect_thread = Thread.new do
        start
      end
    end
  }
end

#startObject

Starts the timer loop for the timers that were initialized in the setup method. This is blocking and should only be called from an already initialized background thread.



182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# File 'lib/instana/agent.rb', line 182

def start
  if !ENV.key?('INSTANA_DISABLE') && !host_agent_available?
    if !ENV.key?("INSTANA_QUIET")
      ::Instana.logger.info "Instana host agent not available.  Will retry periodically. (Set env INSTANA_QUIET=1 to shut these messages off)"
    end
  end

  while true
    if @state == :unannounced
      @announce_timer.resume
      @pending_timer.pause
      @collect_timer.pause
    elsif @state == :announced
      @announce_timer.pause
      @pending_timer.resume
      @collect_timer.pause
    elsif @state == :ready
      @announce_timer.pause
      @pending_timer.pause
      @collect_timer.resume
    end
    @timers.wait
  end
rescue Exception => e
  ::Instana.logger.warn { "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}" }
  ::Instana.logger.debug { e.backtrace.join("\r\n") }
ensure
  if @state == :ready
    # Pause the timers so they don't fire while we are reporting traces
    @announce_timer.pause
    @pending_timer.pause
    @collect_timer.pause

    ::Instana.logger.debug { "#{Thread.current}: Agent exiting. Reporting final spans (if any)." }
    ::Instana.processor.send
  end
end