Class: Immunio::Channel
- Inherits:
-
Object
- Object
- Immunio::Channel
- Defined in:
- lib/immunio/channel.rb
Overview
Communication channel with the Immunio webservice.
Constant Summary collapse
- DIGEST =
OpenSSL::Digest.new('sha1')
Instance Attribute Summary collapse
-
#error_count ⇒ Object
readonly
Returns the value of attribute error_count.
-
#message_queue ⇒ Object
readonly
Returns the value of attribute message_queue.
-
#rejected_message_count ⇒ Object
readonly
Returns the value of attribute rejected_message_count.
-
#success_count ⇒ Object
readonly
Returns the value of attribute success_count.
Instance Method Summary collapse
-
#initialize(config) ⇒ Channel
constructor
A new instance of Channel.
- #messages_count ⇒ Object
- #needs_reset? ⇒ Boolean
- #on_message(&block) ⇒ Object
- #on_sending(&block) ⇒ Object
- #ready? ⇒ Boolean
- #reset ⇒ Object
- #send_encoded_message(message) ⇒ Object
- #send_message(message) ⇒ Object
- #set_ready ⇒ Object
- #start ⇒ Object
- #started? ⇒ Boolean
-
#stop ⇒ Object
Stop and wait for the last messages to be sent.
-
#wait_until_ready! ⇒ Object
Wait until we receive a message from the agentmanager.
Constructor Details
#initialize(config) ⇒ Channel
Returns a new instance of Channel.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/immunio/channel.rb', line 19 def initialize(config) Immunio.logger.debug { "Creating channel" } @config = config @agent_uuid = nil # Messages waiting to be sent. @message_queue = Queue.new # Messages that were sent but failed. Need to be resent. @send_buffer = [] @send_buffer_bytes = 0 @last_report_time = 0 # A large message we may have popped from the queue but couldn't fit # in the current report @next_message = nil @send_seq = 0 @dropped_message_count = 0 @rejected_message_count = 0 @success_count = 0 @error_count = 0 @quick_connect = true @started = false @ready = false # In the case of a forking web server like Unicorn, # we need to remember the process id because it may # happen that the master process starts its polling # thread first, for example when a request is sent # before forking the workers. @process_id = Process.pid @callbacks = [] # Anything looking to add to the messages sent to the server: @senders = [] end |
Instance Attribute Details
#error_count ⇒ Object (readonly)
Returns the value of attribute error_count.
16 17 18 |
# File 'lib/immunio/channel.rb', line 16 def error_count @error_count end |
#message_queue ⇒ Object (readonly)
Returns the value of attribute message_queue.
16 17 18 |
# File 'lib/immunio/channel.rb', line 16 def @message_queue end |
#rejected_message_count ⇒ Object (readonly)
Returns the value of attribute rejected_message_count.
17 18 19 |
# File 'lib/immunio/channel.rb', line 17 def @rejected_message_count end |
#success_count ⇒ Object (readonly)
Returns the value of attribute success_count.
16 17 18 |
# File 'lib/immunio/channel.rb', line 16 def success_count @success_count end |
Instance Method Details
#messages_count ⇒ Object
71 72 73 |
# File 'lib/immunio/channel.rb', line 71 def @message_queue.size end |
#needs_reset? ⇒ Boolean
107 108 109 |
# File 'lib/immunio/channel.rb', line 107 def needs_reset? @process_id != Process.pid end |
#on_message(&block) ⇒ Object
148 149 150 |
# File 'lib/immunio/channel.rb', line 148 def (&block) @callbacks << block end |
#on_sending(&block) ⇒ Object
152 153 154 |
# File 'lib/immunio/channel.rb', line 152 def on_sending(&block) @senders << block end |
#ready? ⇒ Boolean
59 60 61 |
# File 'lib/immunio/channel.rb', line 59 def ready? @ready end |
#reset ⇒ Object
111 112 113 114 115 116 117 118 |
# File 'lib/immunio/channel.rb', line 111 def reset Immunio.logger.debug { "Resetting channel" } stop @process_id = Process.pid @message_queue.clear end |
#send_encoded_message(message) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/immunio/channel.rb', line 124 def () Immunio.logger.debug do "Queueing message: (queue size: #{@message_queue.size}, max: #{@config.max_send_queue_size})" end if @message_queue.size > @config.max_send_queue_size Immunio.logger.warn { "Dropping message for agent manager due to queue overflow (#{@message_queue.size} > #{@config.max_send_queue_size})" } # No room for this message on the queue. Discard. @dropped_message_count += 1 Immunio.logger.debug { "Dropped message: (#{}, dropped count: #{@dropped_message_count})" } return end Immunio.logger.debug do "Queueing message: message.size: #{.size}, #{MessagePack.unpack()}" end @message_queue << Immunio.logger.debug do "Queueing message: (queue size now: #{@message_queue.size}, max: #{@config.max_send_queue_size})" end end |
#send_message(message) ⇒ Object
120 121 122 |
# File 'lib/immunio/channel.rb', line 120 def () .to_msgpack end |
#set_ready ⇒ Object
63 64 65 |
# File 'lib/immunio/channel.rb', line 63 def set_ready @ready = true end |
#start ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/immunio/channel.rb', line 75 def start return if @started Immunio.logger.debug { "Starting channel" } Immunio.logger.trace { "Thread count is: #{Thread.list.size}" } Immunio.logger.trace { "Threads: #{Thread.list.map(&:object_id)}" } Immunio.logger.trace { "@thread in thread list?: #{Thread.list.include? @thread}" } Immunio.logger.trace { "@process_id is: #{@process_id}" } Immunio.logger.trace { "@thread is: #{@thread.inspect}" } @started = true @thread = Thread.new { run } Immunio.logger.trace { "Thread count is now: #{Thread.list.size}" } end |
#started? ⇒ Boolean
67 68 69 |
# File 'lib/immunio/channel.rb', line 67 def started? @started end |
#stop ⇒ Object
Stop and wait for the last messages to be sent.
93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/immunio/channel.rb', line 93 def stop return unless @started Immunio.logger.debug { "Stopping channel" } @started = false @ready = false if @thread @thread.kill @thread.join end end |
#wait_until_ready! ⇒ Object
Wait until we receive a message from the agentmanager. This is used primarily for internal testing to wait until all the hooks are loaded.
159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
# File 'lib/immunio/channel.rb', line 159 def wait_until_ready! return if @ready if @config.ready_timeout.to_i <= 0 return end Immunio.logger.debug { "Channel waiting #{@config.ready_timeout.to_i} seconds until ready..." } Timeout.timeout @config.ready_timeout.to_i do # Wait until we get a response from the agentmanager sleep 0.1 until ready? Immunio.logger.debug { "Channel ready!" } end end |