Class: Appsignal::Agent

Inherits:
Object
  • Object
show all
Defined in:
lib/appsignal/agent.rb

Constant Summary collapse

ACTION =
'log_entries'.freeze
AGGREGATOR_LIMIT =

Three minutes with a sleep time of 60 seconds

3

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeAgent

Returns a new instance of Agent.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/appsignal/agent.rb', line 10

def initialize
  return unless Appsignal.config.active?
  if Appsignal.config.env == 'development'
    @sleep_time = 10.0
  else
    @sleep_time = 60.0
  end
  @master_pid              = Process.pid
  @pid                     = @master_pid
  @aggregator              = Aggregator.new
  @transmitter             = Transmitter.new(ACTION)
  @aggregator_queue        = []
  @transmission_successful = true

  subscribe
  start_thread
  @active = true
  Appsignal.logger.info('Started Appsignal agent')
end

Instance Attribute Details

#activeObject

Returns the value of attribute active.



6
7
8
# File 'lib/appsignal/agent.rb', line 6

def active
  @active
end

#aggregatorObject

Returns the value of attribute aggregator.



6
7
8
# File 'lib/appsignal/agent.rb', line 6

def aggregator
  @aggregator
end

#aggregator_queueObject

Returns the value of attribute aggregator_queue.



6
7
8
# File 'lib/appsignal/agent.rb', line 6

def aggregator_queue
  @aggregator_queue
end

#master_pidObject

Returns the value of attribute master_pid.



6
7
8
# File 'lib/appsignal/agent.rb', line 6

def master_pid
  @master_pid
end

#pausedObject

Returns the value of attribute paused.



6
7
8
# File 'lib/appsignal/agent.rb', line 6

def paused
  @paused
end

#pidObject

Returns the value of attribute pid.



6
7
8
# File 'lib/appsignal/agent.rb', line 6

def pid
  @pid
end

#revisionObject

Returns the value of attribute revision.



6
7
8
# File 'lib/appsignal/agent.rb', line 6

def revision
  @revision
end

#sleep_timeObject

Returns the value of attribute sleep_time.



6
7
8
# File 'lib/appsignal/agent.rb', line 6

def sleep_time
  @sleep_time
end

#subscriberObject

Returns the value of attribute subscriber.



6
7
8
# File 'lib/appsignal/agent.rb', line 6

def subscriber
  @subscriber
end

#threadObject

Returns the value of attribute thread.



6
7
8
# File 'lib/appsignal/agent.rb', line 6

def thread
  @thread
end

#transmission_successfulObject

Returns the value of attribute transmission_successful.



6
7
8
# File 'lib/appsignal/agent.rb', line 6

def transmission_successful
  @transmission_successful
end

#transmitterObject

Returns the value of attribute transmitter.



6
7
8
# File 'lib/appsignal/agent.rb', line 6

def transmitter
  @transmitter
end

Instance Method Details

#active?Boolean

Returns:

  • (Boolean)


30
31
32
# File 'lib/appsignal/agent.rb', line 30

def active?
  !! @active
end

#add_to_aggregator_queue(aggregator) ⇒ Object



128
129
130
# File 'lib/appsignal/agent.rb', line 128

def add_to_aggregator_queue(aggregator)
  @aggregator_queue.unshift(aggregator)
end

#enqueue(transaction) ⇒ Object



97
98
99
100
101
102
103
104
# File 'lib/appsignal/agent.rb', line 97

def enqueue(transaction)
  forked! if @pid != Process.pid
  if Appsignal.is_ignored_action?(transaction.action)
    Appsignal.logger.debug("Ignoring transaction: #{transaction.request_id} (#{transaction.action})")
    return
  end
  aggregator.add(transaction)
end

#forked!Object



154
155
156
157
158
159
160
161
162
163
# File 'lib/appsignal/agent.rb', line 154

def forked!
  Appsignal.logger.info('Forked worker process')
  @active = true
  @pid = Process.pid
  Thread.exclusive do
    @aggregator = Aggregator.new
  end
  resubscribe
  restart_thread
end

#restart_threadObject



54
55
56
57
58
# File 'lib/appsignal/agent.rb', line 54

def restart_thread
  Appsignal.logger.debug 'Restarting agent thread'
  stop_thread
  start_thread
end

#resubscribeObject



85
86
87
88
89
# File 'lib/appsignal/agent.rb', line 85

def resubscribe
  Appsignal.logger.debug('Resubscribing to notifications')
  unsubscribe
  subscribe
end

#send_aggregatorsObject



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/appsignal/agent.rb', line 132

def send_aggregators
  @aggregator_queue.map! do |payload|
    begin
      if handle_result(transmitter.transmit(payload))
        nil
      else
        payload
      end
    rescue *Transmitter::HTTP_ERRORS => ex
      Appsignal.logger.error "#{ex} while sending aggregators"
      payload
    end
  end.compact!
  @transmission_successful = @aggregator_queue.empty?
end

#send_queueObject



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/appsignal/agent.rb', line 106

def send_queue
  Appsignal.logger.debug('Sending queue')
  unless aggregator.has_transactions? || aggregator_queue.any?
    return
  end
  # Replace aggregator while making sure no thread
  # is adding to it's queue
  aggregator_to_be_sent = nil
  Thread.exclusive do
    aggregator_to_be_sent = aggregator
    @aggregator = Aggregator.new
  end

  begin
    payload = Appsignal::ZippedPayload.new(aggregator_to_be_sent.post_processed_queue!)
    add_to_aggregator_queue(payload)
    send_aggregators
  rescue Exception => ex
    Appsignal.logger.error "#{ex.class} while sending queue: #{ex.message}\n#{ex.backtrace.join("\n")}"
  end
end

#shutdown(send_current_queue = false, reason = nil) ⇒ Object



165
166
167
168
169
170
171
172
173
174
175
# File 'lib/appsignal/agent.rb', line 165

def shutdown(send_current_queue=false, reason=nil)
  Appsignal.logger.info("Shutting down agent (#{reason})")
  @active = false
  unsubscribe
  stop_thread

  # Only attempt to send the queue on shutdown when there are no API issues
  if send_current_queue && @transmission_successful
    send_queue
  end
end

#start_threadObject



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/appsignal/agent.rb', line 34

def start_thread
  Appsignal.logger.debug('Starting agent thread')
  @revision = ENV['APP_REVISION']
  @thread   = Thread.new do
    begin
      sleep(rand(sleep_time))
      loop do
        if aggregator.has_transactions? || aggregator_queue.any?
          send_queue
        end
        truncate_aggregator_queue
        Appsignal.logger.debug("Sleeping #{sleep_time}")
        sleep(sleep_time)
      end
    rescue Exception=>ex
      Appsignal.logger.error "#{ex.class} in agent thread: '#{ex.message}'\n#{ex.backtrace.join("\n")}"
    end
  end
end

#stop_threadObject



60
61
62
63
64
65
# File 'lib/appsignal/agent.rb', line 60

def stop_thread
  if @thread && @thread.alive?
    Appsignal.logger.debug 'Stopping agent thread'
    Thread.kill(@thread)
  end
end

#subscribeObject



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/appsignal/agent.rb', line 67

def subscribe
  Appsignal.logger.debug('Subscribing to notifications')
  # Subscribe to notifications that don't start with a !
  @subscriber = ActiveSupport::Notifications.subscribe(/^[^!]/) do |*args|
    # Some people abuse the notification system and send their own data over it
    # (looking at you, active_admin), make sure we only process valid events.
    if Appsignal::Transaction.current && args.length == 5
      event = Appsignal::Event.event_for_instrumentation(*args)
      if event.name.start_with?('process_action')
        Appsignal::Transaction.current.set_process_action_event(event)
      elsif event.name.start_with?('perform_job')
        Appsignal::Transaction.current.set_perform_job_event(event)
      end
      Appsignal::Transaction.current.add_event(event)
    end
  end
end

#truncate_aggregator_queue(limit = AGGREGATOR_LIMIT) ⇒ Object



148
149
150
151
152
# File 'lib/appsignal/agent.rb', line 148

def truncate_aggregator_queue(limit = AGGREGATOR_LIMIT)
  return unless @aggregator_queue.length > limit
  Appsignal.logger.error "Aggregator queue to large, removing items"
  @aggregator_queue = @aggregator_queue.first(limit)
end

#unsubscribeObject



91
92
93
94
95
# File 'lib/appsignal/agent.rb', line 91

def unsubscribe
  Appsignal.logger.debug('Unsubscribing from notifications')
  ActiveSupport::Notifications.unsubscribe(@subscriber)
  @subscriber = nil
end