Class: Immunio::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/immunio/channel.rb

Overview

Communication channel with the Immunio webservice.

Constant Summary collapse

DIGEST =
OpenSSL::Digest.new('sha1')

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Channel

Returns a new instance of Channel.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/immunio/channel.rb', line 19

def initialize(config)
  Immunio.logger.debug { "Creating channel" }
  @config = config

  @agent_uuid = nil

  # Messages waiting to be sent.
  @message_queue = Queue.new
  # Messages that were sent but failed. Need to be resent.
  @send_buffer = []
  @send_buffer_bytes = 0
  @last_report_time = 0

  # A large message we may have popped from the queue but couldn't fit
  # in the current report
  @next_message = nil

  @send_seq = 0
  @dropped_message_count = 0
  @rejected_message_count = 0
  @success_count = 0
  @error_count = 0
  @quick_connect = true

  @started = false
  @ready = false

  # In the case of a forking web server like Unicorn,
  # we need to remember the process id because it may
  # happen that the master process starts its polling
  # thread first, for example when a request is sent
  # before forking the workers.
  @process_id = Process.pid

  @callbacks = []

  # Anything looking to add to the messages sent to the server:
  @senders = []
end

Instance Attribute Details

#error_countObject (readonly)

Returns the value of attribute error_count.



16
17
18
# File 'lib/immunio/channel.rb', line 16

def error_count
  @error_count
end

#message_queueObject (readonly)

Returns the value of attribute message_queue.



16
17
18
# File 'lib/immunio/channel.rb', line 16

def message_queue
  @message_queue
end

#rejected_message_countObject (readonly)

Returns the value of attribute rejected_message_count.



17
18
19
# File 'lib/immunio/channel.rb', line 17

def rejected_message_count
  @rejected_message_count
end

#success_countObject (readonly)

Returns the value of attribute success_count.



16
17
18
# File 'lib/immunio/channel.rb', line 16

def success_count
  @success_count
end

Instance Method Details

#messages_countObject



71
72
73
# File 'lib/immunio/channel.rb', line 71

def messages_count
  @message_queue.size
end

#needs_reset?Boolean

Returns:

  • (Boolean)


107
108
109
# File 'lib/immunio/channel.rb', line 107

def needs_reset?
  @process_id != Process.pid
end

#on_message(&block) ⇒ Object



148
149
150
# File 'lib/immunio/channel.rb', line 148

def on_message(&block)
  @callbacks << block
end

#on_sending(&block) ⇒ Object



152
153
154
# File 'lib/immunio/channel.rb', line 152

def on_sending(&block)
  @senders << block
end

#ready?Boolean

Returns:

  • (Boolean)


59
60
61
# File 'lib/immunio/channel.rb', line 59

def ready?
  @ready
end

#resetObject



111
112
113
114
115
116
117
118
# File 'lib/immunio/channel.rb', line 111

def reset
  Immunio.logger.debug { "Resetting channel" }

  stop

  @process_id = Process.pid
  @message_queue.clear
end

#send_encoded_message(message) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/immunio/channel.rb', line 124

def send_encoded_message(message)
  Immunio.logger.debug do
    "Queueing message: (queue size: #{@message_queue.size}, max: #{@config.max_send_queue_size})"
  end

  if @message_queue.size > @config.max_send_queue_size
    Immunio.logger.warn { "Dropping message for agent manager due to queue overflow (#{@message_queue.size} > #{@config.max_send_queue_size})" }
    # No room for this message on the queue. Discard.
    @dropped_message_count += 1
    Immunio.logger.debug { "Dropped message: (#{message}, dropped count: #{@dropped_message_count})" }
    return
  end

  Immunio.logger.debug do
    "Queueing message: message.size: #{message.size}, #{MessagePack.unpack(message)}"
  end

  @message_queue << message

  Immunio.logger.debug do
    "Queueing message: (queue size now: #{@message_queue.size}, max: #{@config.max_send_queue_size})"
  end
end

#send_message(message) ⇒ Object



120
121
122
# File 'lib/immunio/channel.rb', line 120

def send_message(message)
  send_encoded_message message.to_msgpack
end

#set_readyObject



63
64
65
# File 'lib/immunio/channel.rb', line 63

def set_ready
  @ready = true
end

#startObject



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/immunio/channel.rb', line 75

def start
  return if @started

  Immunio.logger.debug { "Starting channel" }

  Immunio.logger.trace { "Thread count is: #{Thread.list.size}" }
  Immunio.logger.trace { "Threads: #{Thread.list.map(&:object_id)}" }
  Immunio.logger.trace { "@thread in thread list?: #{Thread.list.include? @thread}" }
  Immunio.logger.trace { "@process_id is: #{@process_id}" }
  Immunio.logger.trace { "@thread is: #{@thread.inspect}" }

  @started = true
  @thread = Thread.new { run }

  Immunio.logger.trace { "Thread count is now: #{Thread.list.size}" }
end

#started?Boolean

Returns:

  • (Boolean)


67
68
69
# File 'lib/immunio/channel.rb', line 67

def started?
  @started
end

#stopObject

Stop and wait for the last messages to be sent.



93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/immunio/channel.rb', line 93

def stop
  return unless @started

  Immunio.logger.debug { "Stopping channel" }

  @started = false
  @ready = false

  if @thread
    @thread.kill
    @thread.join
  end
end

#wait_until_ready!Object

Wait until we receive a message from the agentmanager. This is used primarily for internal testing to wait until all the hooks are loaded.



159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/immunio/channel.rb', line 159

def wait_until_ready!
  return if @ready

  if @config.ready_timeout.to_i <= 0
    return
  end

  Immunio.logger.debug { "Channel waiting #{@config.ready_timeout.to_i} seconds until ready..." }
  Timeout.timeout @config.ready_timeout.to_i do
    # Wait until we get a response from the agentmanager
    sleep 0.1 until ready?
    Immunio.logger.debug { "Channel ready!" }
  end
end