Class: Appsignal::Agent
- Inherits:
-
Object
- Object
- Appsignal::Agent
- Defined in:
- lib/appsignal/agent.rb
Constant Summary collapse
- ACTION =
'log_entries'.freeze
- AGGREGATOR_LIMIT =
Three minutes with a sleep time of 60 seconds
3
Instance Attribute Summary collapse
-
#active ⇒ Object
Returns the value of attribute active.
-
#aggregator ⇒ Object
Returns the value of attribute aggregator.
-
#aggregator_queue ⇒ Object
Returns the value of attribute aggregator_queue.
-
#master_pid ⇒ Object
Returns the value of attribute master_pid.
-
#paused ⇒ Object
Returns the value of attribute paused.
-
#pid ⇒ Object
Returns the value of attribute pid.
-
#revision ⇒ Object
Returns the value of attribute revision.
-
#sleep_time ⇒ Object
Returns the value of attribute sleep_time.
-
#subscriber ⇒ Object
Returns the value of attribute subscriber.
-
#thread ⇒ Object
Returns the value of attribute thread.
-
#transmission_successful ⇒ Object
Returns the value of attribute transmission_successful.
-
#transmitter ⇒ Object
Returns the value of attribute transmitter.
Instance Method Summary collapse
- #active? ⇒ Boolean
- #add_to_aggregator_queue(aggregator) ⇒ Object
- #enqueue(transaction) ⇒ Object
- #forked! ⇒ Object
-
#initialize ⇒ Agent
constructor
A new instance of Agent.
- #restart_thread ⇒ Object
- #resubscribe ⇒ Object
- #send_aggregators ⇒ Object
- #send_queue ⇒ Object
- #shutdown(send_current_queue = false, reason = nil) ⇒ Object
- #start_thread ⇒ Object
- #stop_thread ⇒ Object
- #subscribe ⇒ Object
- #truncate_aggregator_queue(limit = AGGREGATOR_LIMIT) ⇒ Object
- #unsubscribe ⇒ Object
Constructor Details
#initialize ⇒ Agent
Returns a new instance of Agent.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/appsignal/agent.rb', line 10 def initialize return unless Appsignal.config.active? if Appsignal.config.env == 'development' @sleep_time = 10.0 else @sleep_time = 60.0 end @master_pid = Process.pid @pid = @master_pid @aggregator = Aggregator.new @transmitter = Transmitter.new(ACTION) @aggregator_queue = [] @transmission_successful = true subscribe start_thread @active = true Appsignal.logger.info('Started Appsignal agent') end |
Instance Attribute Details
#active ⇒ Object
Returns the value of attribute active.
6 7 8 |
# File 'lib/appsignal/agent.rb', line 6 def active @active end |
#aggregator ⇒ Object
Returns the value of attribute aggregator.
6 7 8 |
# File 'lib/appsignal/agent.rb', line 6 def aggregator @aggregator end |
#aggregator_queue ⇒ Object
Returns the value of attribute aggregator_queue.
6 7 8 |
# File 'lib/appsignal/agent.rb', line 6 def aggregator_queue @aggregator_queue end |
#master_pid ⇒ Object
Returns the value of attribute master_pid.
6 7 8 |
# File 'lib/appsignal/agent.rb', line 6 def master_pid @master_pid end |
#paused ⇒ Object
Returns the value of attribute paused.
6 7 8 |
# File 'lib/appsignal/agent.rb', line 6 def paused @paused end |
#pid ⇒ Object
Returns the value of attribute pid.
6 7 8 |
# File 'lib/appsignal/agent.rb', line 6 def pid @pid end |
#revision ⇒ Object
Returns the value of attribute revision.
6 7 8 |
# File 'lib/appsignal/agent.rb', line 6 def revision @revision end |
#sleep_time ⇒ Object
Returns the value of attribute sleep_time.
6 7 8 |
# File 'lib/appsignal/agent.rb', line 6 def sleep_time @sleep_time end |
#subscriber ⇒ Object
Returns the value of attribute subscriber.
6 7 8 |
# File 'lib/appsignal/agent.rb', line 6 def subscriber @subscriber end |
#thread ⇒ Object
Returns the value of attribute thread.
6 7 8 |
# File 'lib/appsignal/agent.rb', line 6 def thread @thread end |
#transmission_successful ⇒ Object
Returns the value of attribute transmission_successful.
6 7 8 |
# File 'lib/appsignal/agent.rb', line 6 def transmission_successful @transmission_successful end |
#transmitter ⇒ Object
Returns the value of attribute transmitter.
6 7 8 |
# File 'lib/appsignal/agent.rb', line 6 def transmitter @transmitter end |
Instance Method Details
#active? ⇒ Boolean
30 31 32 |
# File 'lib/appsignal/agent.rb', line 30 def active? !! @active end |
#add_to_aggregator_queue(aggregator) ⇒ Object
123 124 125 |
# File 'lib/appsignal/agent.rb', line 123 def add_to_aggregator_queue(aggregator) @aggregator_queue.unshift(aggregator) end |
#enqueue(transaction) ⇒ Object
95 96 97 98 99 100 101 102 |
# File 'lib/appsignal/agent.rb', line 95 def enqueue(transaction) forked! if @pid != Process.pid if Appsignal.is_ignored_action?(transaction.action) Appsignal.logger.debug("Ignoring transaction: #{transaction.request_id} (#{transaction.action})") return end aggregator.add(transaction) end |
#forked! ⇒ Object
149 150 151 152 153 154 155 156 157 158 |
# File 'lib/appsignal/agent.rb', line 149 def forked! Appsignal.logger.info('Forked worker process') @active = true @pid = Process.pid Thread.exclusive do @aggregator = Aggregator.new end resubscribe restart_thread end |
#restart_thread ⇒ Object
54 55 56 57 58 |
# File 'lib/appsignal/agent.rb', line 54 def restart_thread Appsignal.logger.debug 'Restarting agent thread' stop_thread start_thread end |
#resubscribe ⇒ Object
83 84 85 86 87 |
# File 'lib/appsignal/agent.rb', line 83 def resubscribe Appsignal.logger.debug('Resubscribing to notifications') unsubscribe subscribe end |
#send_aggregators ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/appsignal/agent.rb', line 127 def send_aggregators @aggregator_queue.map! do |payload| begin if handle_result(transmitter.transmit(payload)) nil else payload end rescue *Transmitter::HTTP_ERRORS => ex Appsignal.logger.error "#{ex} while sending aggregators" payload end end.compact! @transmission_successful = @aggregator_queue.empty? end |
#send_queue ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
# File 'lib/appsignal/agent.rb', line 104 def send_queue Appsignal.logger.debug('Sending queue') # Replace aggregator while making sure no thread # is adding to it's queue aggregator_to_be_sent = nil Thread.exclusive do aggregator_to_be_sent = aggregator @aggregator = Aggregator.new end begin payload = Appsignal::ZippedPayload.new(aggregator_to_be_sent.post_processed_queue!) add_to_aggregator_queue(payload) send_aggregators rescue Exception => ex Appsignal.logger.error "#{ex.class} while sending queue: #{ex.}\n#{ex.backtrace.join("\n")}" end end |
#shutdown(send_current_queue = false, reason = nil) ⇒ Object
160 161 162 163 164 165 166 167 168 169 170 |
# File 'lib/appsignal/agent.rb', line 160 def shutdown(send_current_queue=false, reason=nil) Appsignal.logger.info("Shutting down agent (#{reason})") @active = false unsubscribe stop_thread # Only attempt to send the queue on shutdown when there are no API issues if send_current_queue && @transmission_successful send_queue end end |
#start_thread ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/appsignal/agent.rb', line 34 def start_thread Appsignal.logger.debug('Starting agent thread') @revision = ENV['APP_REVISION'] @thread = Thread.new do begin sleep(rand(sleep_time)) loop do if aggregator.has_transactions? || aggregator_queue.any? send_queue end truncate_aggregator_queue Appsignal.logger.debug("Sleeping #{sleep_time}") sleep(sleep_time) end rescue Exception=>ex Appsignal.logger.error "#{ex.class} in agent thread: '#{ex.}'\n#{ex.backtrace.join("\n")}" end end end |
#stop_thread ⇒ Object
60 61 62 63 64 65 |
# File 'lib/appsignal/agent.rb', line 60 def stop_thread if @thread && @thread.alive? Appsignal.logger.debug 'Stopping agent thread' Thread.kill(@thread) end end |
#subscribe ⇒ Object
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/appsignal/agent.rb', line 67 def subscribe Appsignal.logger.debug('Subscribing to notifications') # Subscribe to notifications that don't start with a ! @subscriber = ActiveSupport::Notifications.subscribe(/^[^!]/) do |*args| if Appsignal::Transaction.current event = Appsignal::Event.event_for_instrumentation(*args) if event.name.start_with?('process_action') Appsignal::Transaction.current.set_process_action_event(event) elsif event.name.start_with?('perform_job') Appsignal::Transaction.current.set_perform_job_event(event) end Appsignal::Transaction.current.add_event(event) end end end |
#truncate_aggregator_queue(limit = AGGREGATOR_LIMIT) ⇒ Object
143 144 145 146 147 |
# File 'lib/appsignal/agent.rb', line 143 def truncate_aggregator_queue(limit = AGGREGATOR_LIMIT) return unless @aggregator_queue.length > limit Appsignal.logger.error "Aggregator queue to large, removing items" @aggregator_queue = @aggregator_queue.first(limit) end |
#unsubscribe ⇒ Object
89 90 91 92 93 |
# File 'lib/appsignal/agent.rb', line 89 def unsubscribe Appsignal.logger.debug('Unsubscribing from notifications') ActiveSupport::Notifications.unsubscribe(@subscriber) @subscriber = nil end |