Class: Appsignal::Agent

Inherits:
Object
  • Object
show all
Defined in:
lib/appsignal/agent.rb

Constant Summary collapse

ACTION =
'log_entries'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeAgent

Returns a new instance of Agent.



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# File 'lib/appsignal/agent.rb', line 8

def initialize
  return unless Appsignal.active?
  if Appsignal.config.env == 'development'
    @sleep_time = 10.0
  else
    @sleep_time = 60.0
  end
  @master_pid = Process.pid
  @pid = @master_pid
  @aggregator = Aggregator.new
  @transmitter = Transmitter.new(ACTION)
  subscribe
  start_thread
  Appsignal.logger.info('Started Appsignal agent')
end

Instance Attribute Details

#activeObject

Returns the value of attribute active.



5
6
7
# File 'lib/appsignal/agent.rb', line 5

def active
  @active
end

#aggregatorObject

Returns the value of attribute aggregator.



5
6
7
# File 'lib/appsignal/agent.rb', line 5

def aggregator
  @aggregator
end

#master_pidObject

Returns the value of attribute master_pid.



5
6
7
# File 'lib/appsignal/agent.rb', line 5

def master_pid
  @master_pid
end

#pausedObject

Returns the value of attribute paused.



5
6
7
# File 'lib/appsignal/agent.rb', line 5

def paused
  @paused
end

#pidObject

Returns the value of attribute pid.



5
6
7
# File 'lib/appsignal/agent.rb', line 5

def pid
  @pid
end

#sleep_timeObject

Returns the value of attribute sleep_time.



5
6
7
# File 'lib/appsignal/agent.rb', line 5

def sleep_time
  @sleep_time
end

#subscriberObject

Returns the value of attribute subscriber.



5
6
7
# File 'lib/appsignal/agent.rb', line 5

def subscriber
  @subscriber
end

#threadObject

Returns the value of attribute thread.



5
6
7
# File 'lib/appsignal/agent.rb', line 5

def thread
  @thread
end

#transmitterObject

Returns the value of attribute transmitter.



5
6
7
# File 'lib/appsignal/agent.rb', line 5

def transmitter
  @transmitter
end

Instance Method Details

#clear_queueObject



90
91
92
93
94
95
96
97
# File 'lib/appsignal/agent.rb', line 90

def clear_queue
  Appsignal.logger.debug('Clearing queue')
  # Replace aggregator while making sure no thread
  # is adding to it's queue
  Thread.exclusive do
    @aggregator = Aggregator.new
  end
end

#enqueue(transaction) ⇒ Object



64
65
66
67
68
# File 'lib/appsignal/agent.rb', line 64

def enqueue(transaction)
  forked! if @pid != Process.pid
  Appsignal.logger.debug('Enqueueing transaction')
  aggregator.add(transaction)
end

#forked!Object



99
100
101
102
103
104
105
106
# File 'lib/appsignal/agent.rb', line 99

def forked!
  Appsignal.logger.debug('Forked worker process')
  @pid = Process.pid
  Thread.exclusive do
    @aggregator = Aggregator.new
  end
  restart_thread
end

#restart_threadObject



36
37
38
39
# File 'lib/appsignal/agent.rb', line 36

def restart_thread
  stop_thread
  start_thread
end

#send_queueObject



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/appsignal/agent.rb', line 70

def send_queue
  Appsignal.logger.debug('Sending queue')
  # Replace aggregator while making sure no thread
  # is adding to it's queue
  aggregator_to_be_sent = nil
  Thread.exclusive do
    aggregator_to_be_sent = aggregator
    @aggregator = Aggregator.new
  end

  begin
    handle_result(
      transmitter.transmit(aggregator_to_be_sent.post_processed_queue!)
    )
  rescue Exception => ex
    Appsignal.logger.error "#{ex.class} while sending queue: #{ex.message}"
    Appsignal.logger.error ex.backtrace.join('\n')
  end
end

#shutdown(send_current_queue = false) ⇒ Object



108
109
110
111
112
113
# File 'lib/appsignal/agent.rb', line 108

def shutdown(send_current_queue=false)
  Appsignal.logger.info('Shutting down agent')
  ActiveSupport::Notifications.unsubscribe(subscriber)
  Thread.kill(thread) if thread
  send_queue if send_current_queue && @aggregator.has_transactions?
end

#start_threadObject



24
25
26
27
28
29
30
31
32
33
34
# File 'lib/appsignal/agent.rb', line 24

def start_thread
  Appsignal.logger.debug('Starting agent thread')
  @thread = Thread.new do
    loop do
      Appsignal.logger.debug aggregator.queue.inspect
      send_queue if aggregator.has_transactions?
      Appsignal.logger.debug("Sleeping #{sleep_time}")
      sleep(sleep_time)
    end
  end
end

#stop_threadObject



41
42
43
44
45
46
# File 'lib/appsignal/agent.rb', line 41

def stop_thread
  if @thread && @thread.alive?
    Appsignal.logger.debug 'Killing agent thread'
    Thread.kill(@thread)
  end
end

#subscribeObject



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/appsignal/agent.rb', line 48

def subscribe
  Appsignal.logger.debug('Subscribing to notifications')
  # Subscribe to notifications that don't start with a !
  @subscriber = ActiveSupport::Notifications.subscribe(/^[^!]/) do |*args|
    if Appsignal::Transaction.current
      event = ActiveSupport::Notifications::Event.new(*args)
      if event.name.start_with?('process_action')
        Appsignal::Transaction.current.set_process_action_event(event)
      elsif event.name.start_with?('perform_job')
        Appsignal::Transaction.current.set_perform_job_event(event)
      end
      Appsignal::Transaction.current.add_event(event) unless paused
    end
  end
end