Class: Instana::Agent

Inherits:
Object
  • Object
show all
Includes:
AgentHelpers, AgentHooks, AgentTasks
Defined in:
lib/instana/agent.rb

Constant Summary collapse

LOCALHOST =
'127.0.0.1'.freeze
MIME_JSON =
'application/json'.freeze
DISCOVERY_PATH =
'com.instana.plugin.ruby.discovery'.freeze

Constants included from AgentTasks

AgentTasks::OJ_OPTIONS

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from AgentTasks

#handle_agent_tasks, #process_agent_task

Methods included from AgentHooks

#after_fork, #after_resque_fork, #before_resque_fork

Methods included from AgentHelpers

#forked?, #get_real_pid, #pid_namespace?, #ready?, #report_pid

Constructor Details

#initializeAgent

Returns a new instance of Agent.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/instana/agent.rb', line 33

def initialize
  # Supported two states (unannounced & announced)
  @state = :unannounced

  # Timestamp of the last successful response from
  # entity data reporting.
  @entity_last_seen = Time.now

  # Used to track the last time the collect timer was run.
  @last_collect_run = Time.now

  # Two timers, one for each state (unannounced & announced)
  @timers = ::Timers::Group.new
  @announce_timer = nil
  @collect_timer = nil

  @thread_spawn_lock = Mutex.new

  # Detect platform flags
  @is_linux = (RbConfig::CONFIG['host_os'] =~ /linux/i) ? true : false
  @is_osx = (RUBY_PLATFORM =~ /darwin/i) ? true : false

  # In case we're running in Docker, have the default gateway available
  # to check in case we're running in bridged network mode
  if @is_linux && File.exist?("/sbin/ip")
    @default_gateway = `/sbin/ip route | awk '/default/ { print $3 }'`.chomp
  else
    @default_gateway = nil
  end

  # Re-useable HTTP client for communication with
  # the host agent.
  @httpclient = nil

  # Collect initial process info - repeat prior to announce
  # in `announce_sensor` in case of process rename, after fork etc.
  @process = ::Instana::Util.collect_process_info

  # The agent UUID returned from the host agent
  @agent_uuid = nil

  # This will hold info on the discovered agent host
  @discovered = nil

  # The agent may pass down custom headers for this sensor to capture
  @extra_headers = nil
end

Instance Attribute Details

#agent_uuidObject

Returns the value of attribute agent_uuid.



23
24
25
# File 'lib/instana/agent.rb', line 23

def agent_uuid
  @agent_uuid
end

#collect_threadObject

Returns the value of attribute collect_thread.



25
26
27
# File 'lib/instana/agent.rb', line 25

def collect_thread
  @collect_thread
end

#extra_headersObject

Returns the value of attribute extra_headers.



27
28
29
# File 'lib/instana/agent.rb', line 27

def extra_headers
  @extra_headers
end

#processObject

Returns the value of attribute process.



24
25
26
# File 'lib/instana/agent.rb', line 24

def process
  @process
end

#stateObject

Returns the value of attribute state.



22
23
24
# File 'lib/instana/agent.rb', line 22

def state
  @state
end

#thread_spawn_lockObject

Returns the value of attribute thread_spawn_lock.



26
27
28
# File 'lib/instana/agent.rb', line 26

def thread_spawn_lock
  @thread_spawn_lock
end

Instance Method Details

#announce_sensorObject

Collect process ID, name and arguments to notify the host agent.



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# File 'lib/instana/agent.rb', line 186

def announce_sensor
  unless @discovered
    ::Instana.logger.debug("#{__method__} called but discovery hasn't run yet!")
    return false
  end

  # Always re-collect process info before announce in case the process name has been
  # re-written (looking at you puma!)
  @process = ::Instana::Util.collect_process_info

  announce_payload = {}
  announce_payload[:pid] = pid_namespace? ? get_real_pid : Process.pid
  announce_payload[:name] = @process[:name]
  announce_payload[:args] = @process[:arguments]

  if @is_linux && !ENV.key?('INSTANA_TEST')
    # We create an open socket to the host agent in case we are running in a container
    # and the real pid needs to be detected.
    socket = TCPSocket.new @discovered[:agent_host], @discovered[:agent_port]
    announce_payload[:fd] = socket.fileno
    announce_payload[:inode] = File.readlink("/proc/#{Process.pid}/fd/#{socket.fileno}")
  end

  uri = URI.parse("http://#{@discovered[:agent_host]}:#{@discovered[:agent_port]}/#{DISCOVERY_PATH}")
  req = Net::HTTP::Put.new(uri)
  req.body = Oj.dump(announce_payload, OJ_OPTIONS)

  ::Instana.logger.debug { "Announce: http://#{@discovered[:agent_host]}:#{@discovered[:agent_port]}/#{DISCOVERY_PATH} - payload: #{req.body}" }

  response = make_host_agent_request(req)

  if response && (response.code.to_i == 200)
    data = Oj.load(response.body, OJ_OPTIONS)
    @process[:report_pid] = data['pid']
    @agent_uuid = data['agentUuid']

    if data.key?('extraHeaders')
      @extra_headers = data['extraHeaders']
    end
    true
  else
    false
  end
rescue => e
  Instana.logger.info { "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}" }
  Instana.logger.debug { e.backtrace.join("\r\n") }
  return false
ensure
  socket.close if socket && !socket.closed?
end

#host_agent_available?Boolean

Check that the host agent is available and can be contacted. This will first check localhost and if not, then attempt on the default gateway for docker in bridged mode.

Returns:

  • (Boolean)


314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/instana/agent.rb', line 314

def host_agent_available?
  @discovered ||= run_discovery

  if @discovered
    # Try default location or manually configured (if so)
    uri = URI.parse("http://#{@discovered[:agent_host]}:#{@discovered[:agent_port]}/")
    req = Net::HTTP::Get.new(uri)

    response = make_host_agent_request(req)

    if response && (response.code.to_i == 200)
      return true
    end
  end
  false
rescue => e
  Instana.logger.debug { "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}" }
  Instana.logger.debug { e.backtrace.join("\r\n") } unless ENV.key?('INSTANA_TEST')
  return false
end

#report_metrics(payload) ⇒ Boolean

Method to report metrics data to the host agent.

Parameters:

  • paylod (Hash)

    The collection of metrics to report.

Returns:

  • (Boolean)

    true on success, false otherwise



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/instana/agent.rb', line 243

def report_metrics(payload)
  unless @discovered
    ::Instana.logger.debug("#{__method__} called but discovery hasn't run yet!")
    return false
  end

  path = "com.instana.plugin.ruby.#{@process[:report_pid]}"
  uri = URI.parse("http://#{@discovered[:agent_host]}:#{@discovered[:agent_port]}/#{path}")
  req = Net::HTTP::Post.new(uri)

  req.body = Oj.dump(payload, OJ_OPTIONS)
  response = make_host_agent_request(req)

  if response
    if response.body && response.body.length > 2
      # The host agent returned something indicating that is has a request for us that we
      # need to process.
      handle_agent_tasks(response.body)
    end

    if response.code.to_i == 200
      @entity_last_seen = Time.now
      return true
    end

  end
  false
rescue => e
  Instana.logger.debug { "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}" }
  Instana.logger.debug { e.backtrace.join("\r\n") }
end

#report_spans(spans) ⇒ Boolean

Accept and report spans to the host agent.

Parameters:

  • traces (Array)

    An array of [Span]

Returns:

  • (Boolean)


280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
# File 'lib/instana/agent.rb', line 280

def report_spans(spans)
  return unless @state == :announced

  unless @discovered
    ::Instana.logger.debug("#{__method__} called but discovery hasn't run yet!")
    return false
  end

  path = "com.instana.plugin.ruby/traces.#{@process[:report_pid]}"
  uri = URI.parse("http://#{@discovered[:agent_host]}:#{@discovered[:agent_port]}/#{path}")
  req = Net::HTTP::Post.new(uri)

  opts = OJ_OPTIONS.merge({omit_nil: true})

  req.body = Oj.dump(spans, opts)
  response = make_host_agent_request(req)

  if response
    last_trace_response = response.code.to_i

    if [200, 204].include?(last_trace_response)
      return true
    end
  end
  false
rescue => e
  Instana.logger.debug { "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}" }
  Instana.logger.debug { e.backtrace.join("\r\n") }
end

#run_discoveryHash

Runs a discovery process to determine where we can contact the host agent. This is usually just localhost but in docker can be found on the default gateway. Another option is the INSTANA_AGENT_HOST environment variable. This also allows for manual configuration via ::Instana.config.

Returns:

  • (Hash)

    a hash with :agent_host, :agent_port values or empty hash



341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# File 'lib/instana/agent.rb', line 341

def run_discovery
  discovered = {}

  ::Instana.logger.debug { "#{__method__}: Running agent discovery..." }

  # Try default location or manually configured (if so)
  uri = URI.parse("http://#{::Instana.config[:agent_host]}:#{::Instana.config[:agent_port]}/")
  req = Net::HTTP::Get.new(uri)

  ::Instana.logger.debug { "#{__method__}: Trying #{::Instana.config[:agent_host]}:#{::Instana.config[:agent_port]}" }

  response = make_host_agent_request(req)

  if response && (response.code.to_i == 200)
    discovered[:agent_host] = ::Instana.config[:agent_host]
    discovered[:agent_port] = ::Instana.config[:agent_port]
    ::Instana.logger.debug { "#{__method__}: Found #{discovered[:agent_host]}:#{discovered[:agent_port]}" }
    return discovered
  end

  return nil unless @is_linux

  # We are potentially running on Docker in bridged networking mode.
  # Attempt to contact default gateway
  uri = URI.parse("http://#{@default_gateway}:#{::Instana.config[:agent_port]}/")
  req = Net::HTTP::Get.new(uri)

  ::Instana.logger.debug { "#{__method__}: Trying default gateway #{@default_gateway}:#{::Instana.config[:agent_port]}" }

  response = make_host_agent_request(req)

  if response && (response.code.to_i == 200)
    discovered[:agent_host] = @default_gateway
    discovered[:agent_port] = ::Instana.config[:agent_port]
    ::Instana.logger.debug { "#{__method__}: Found #{discovered[:agent_host]}:#{discovered[:agent_port]}" }
    return discovered
  end

  nil
end

#setupObject

Sets up periodic timers and starts the agent in a background thread.



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/instana/agent.rb', line 110

def setup
  # The announce timer
  # We attempt to announce this ruby sensor to the host agent.
  # In case of failure, we try again in 30 seconds.
  @announce_timer = @timers.every(30) do
    if @state == :unannounced
      if host_agent_available? && announce_sensor
        transition_to(:announced)
        ::Instana.logger.info "Host agent available. We're in business. (#{@state} pid:#{Process.pid} #{@process[:name]})"
      end
    end
  end

  # The collect timer
  # If we are in announced state, send metric data (only delta reporting)
  # every ::Instana.config[:collector][:interval] seconds.
  @collect_timer = @timers.every(::Instana.config[:collector][:interval]) do
    # Make sure that this block doesn't get called more often than the interval.  This can
    # happen on high CPU load and a back up of timer runs.  If we are called before `interval`
    # then we just skip.
    unless (Time.now - @last_collect_run) < ::Instana.config[:collector][:interval]
      @last_collect_run = Time.now
      if @state == :announced
        if !::Instana.collector.collect_and_report
          # If report has been failing for more than 1 minute,
          # fall back to unannounced state
          if (Time.now - @entity_last_seen) > 60
            ::Instana.logger.warn "Host agent offline for >1 min.  Going to sit in a corner..."
            transition_to(:unannounced)
          end
        end
        ::Instana.processor.send
      end
    end
  end
end

#spawn_background_threadObject

Spawns the background thread and calls start. This method is separated out for those who wish to control which thread the background agent will run in.

This method can be overridden with the following:

module Instana

class Agent
  def spawn_background_thread
    # start thread
    start
  end
end

end



96
97
98
99
100
101
102
103
104
105
106
# File 'lib/instana/agent.rb', line 96

def spawn_background_thread
  @thread_spawn_lock.synchronize {
    if @collect_thread && @collect_thread.alive?
      ::Instana.logger.info "[instana] Collect thread already started & alive.  Not spawning another."
    else
      @collect_thread = Thread.new do
        start
      end
    end
  }
end

#startObject

Starts the timer loop for the timers that were initialized in the setup method. This is blocking and should only be called from an already initialized background thread.



151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/instana/agent.rb', line 151

def start
  if !host_agent_available?
    if !ENV.key?("INSTANA_QUIET")
      ::Instana.logger.warn "Host agent not available.  Will retry periodically. (Set env INSTANA_QUIET=1 to shut these messages off)"
    end
  end

  while true
    if @state == :unannounced
      @collect_timer.pause
      @announce_timer.resume
    else
      @announce_timer.pause
      @collect_timer.resume
    end
    @timers.wait
  end
rescue Exception => e
  ::Instana.logger.warn { "#{__method__}:#{File.basename(__FILE__)}:#{__LINE__}: #{e.message}" }
  ::Instana.logger.debug { e.backtrace.join("\r\n") }
ensure
  if @state == :announced
    # Pause the timers so they don't fire while we are
    # reporting traces
    @collect_timer.pause
    @announce_timer.pause

    ::Instana.logger.debug { "#{Thread.current}: Agent exiting. Reporting final spans (if any)." }
    ::Instana.processor.send
  end
end