Class: Appsignal::Agent
- Inherits:
-
Object
- Object
- Appsignal::Agent
- Defined in:
- lib/appsignal/agent.rb
Constant Summary collapse
- ACTION =
'log_entries'.freeze
Instance Attribute Summary collapse
-
#active ⇒ Object
Returns the value of attribute active.
-
#aggregator ⇒ Object
Returns the value of attribute aggregator.
-
#master_pid ⇒ Object
Returns the value of attribute master_pid.
-
#paused ⇒ Object
Returns the value of attribute paused.
-
#pid ⇒ Object
Returns the value of attribute pid.
-
#sleep_time ⇒ Object
Returns the value of attribute sleep_time.
-
#subscriber ⇒ Object
Returns the value of attribute subscriber.
-
#thread ⇒ Object
Returns the value of attribute thread.
-
#transmitter ⇒ Object
Returns the value of attribute transmitter.
Instance Method Summary collapse
- #clear_queue ⇒ Object
- #enqueue(transaction) ⇒ Object
- #forked! ⇒ Object
-
#initialize ⇒ Agent
constructor
A new instance of Agent.
- #restart_thread ⇒ Object
- #send_queue ⇒ Object
- #shutdown(send_current_queue = false) ⇒ Object
- #start_thread ⇒ Object
- #stop_thread ⇒ Object
- #subscribe ⇒ Object
Constructor Details
#initialize ⇒ Agent
Returns a new instance of Agent.
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/appsignal/agent.rb', line 8 def initialize return unless Appsignal.active? if Appsignal.config.env == 'development' @sleep_time = 10.0 else @sleep_time = 60.0 end @master_pid = Process.pid @pid = @master_pid @aggregator = Aggregator.new @transmitter = Transmitter.new(ACTION) subscribe start_thread Appsignal.logger.info('Started Appsignal agent') end |
Instance Attribute Details
#active ⇒ Object
Returns the value of attribute active.
5 6 7 |
# File 'lib/appsignal/agent.rb', line 5 def active @active end |
#aggregator ⇒ Object
Returns the value of attribute aggregator.
5 6 7 |
# File 'lib/appsignal/agent.rb', line 5 def aggregator @aggregator end |
#master_pid ⇒ Object
Returns the value of attribute master_pid.
5 6 7 |
# File 'lib/appsignal/agent.rb', line 5 def master_pid @master_pid end |
#paused ⇒ Object
Returns the value of attribute paused.
5 6 7 |
# File 'lib/appsignal/agent.rb', line 5 def paused @paused end |
#pid ⇒ Object
Returns the value of attribute pid.
5 6 7 |
# File 'lib/appsignal/agent.rb', line 5 def pid @pid end |
#sleep_time ⇒ Object
Returns the value of attribute sleep_time.
5 6 7 |
# File 'lib/appsignal/agent.rb', line 5 def sleep_time @sleep_time end |
#subscriber ⇒ Object
Returns the value of attribute subscriber.
5 6 7 |
# File 'lib/appsignal/agent.rb', line 5 def subscriber @subscriber end |
#thread ⇒ Object
Returns the value of attribute thread.
5 6 7 |
# File 'lib/appsignal/agent.rb', line 5 def thread @thread end |
#transmitter ⇒ Object
Returns the value of attribute transmitter.
5 6 7 |
# File 'lib/appsignal/agent.rb', line 5 def transmitter @transmitter end |
Instance Method Details
#clear_queue ⇒ Object
90 91 92 93 94 95 96 97 |
# File 'lib/appsignal/agent.rb', line 90 def clear_queue Appsignal.logger.debug('Clearing queue') # Replace aggregator while making sure no thread # is adding to it's queue Thread.exclusive do @aggregator = Aggregator.new end end |
#enqueue(transaction) ⇒ Object
64 65 66 67 68 |
# File 'lib/appsignal/agent.rb', line 64 def enqueue(transaction) forked! if @pid != Process.pid Appsignal.logger.debug('Enqueueing transaction') aggregator.add(transaction) end |
#forked! ⇒ Object
99 100 101 102 103 104 105 106 |
# File 'lib/appsignal/agent.rb', line 99 def forked! Appsignal.logger.debug('Forked worker process') @pid = Process.pid Thread.exclusive do @aggregator = Aggregator.new end restart_thread end |
#restart_thread ⇒ Object
36 37 38 39 |
# File 'lib/appsignal/agent.rb', line 36 def restart_thread stop_thread start_thread end |
#send_queue ⇒ Object
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/appsignal/agent.rb', line 70 def send_queue Appsignal.logger.debug('Sending queue') # Replace aggregator while making sure no thread # is adding to it's queue aggregator_to_be_sent = nil Thread.exclusive do aggregator_to_be_sent = aggregator @aggregator = Aggregator.new end begin handle_result( transmitter.transmit(aggregator_to_be_sent.post_processed_queue!) ) rescue Exception => ex Appsignal.logger.error "#{ex.class} while sending queue: #{ex.}" Appsignal.logger.error ex.backtrace.join('\n') end end |
#shutdown(send_current_queue = false) ⇒ Object
108 109 110 111 112 113 |
# File 'lib/appsignal/agent.rb', line 108 def shutdown(send_current_queue=false) Appsignal.logger.info('Shutting down agent') ActiveSupport::Notifications.unsubscribe(subscriber) Thread.kill(thread) if thread send_queue if send_current_queue && @aggregator.has_transactions? end |
#start_thread ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/appsignal/agent.rb', line 24 def start_thread Appsignal.logger.debug('Starting agent thread') @thread = Thread.new do loop do Appsignal.logger.debug aggregator.queue.inspect send_queue if aggregator.has_transactions? Appsignal.logger.debug("Sleeping #{sleep_time}") sleep(sleep_time) end end end |
#stop_thread ⇒ Object
41 42 43 44 45 46 |
# File 'lib/appsignal/agent.rb', line 41 def stop_thread if @thread && @thread.alive? Appsignal.logger.debug 'Killing agent thread' Thread.kill(@thread) end end |
#subscribe ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/appsignal/agent.rb', line 48 def subscribe Appsignal.logger.debug('Subscribing to notifications') # Subscribe to notifications that don't start with a ! @subscriber = ActiveSupport::Notifications.subscribe(/^[^!]/) do |*args| if Appsignal::Transaction.current event = ActiveSupport::Notifications::Event.new(*args) if event.name.start_with?('process_action') Appsignal::Transaction.current.set_process_action_event(event) elsif event.name.start_with?('perform_job') Appsignal::Transaction.current.set_perform_job_event(event) end Appsignal::Transaction.current.add_event(event) unless paused end end end |