Class: LogCourier::Client
- Inherits:
-
Object
- Object
- LogCourier::Client
- Defined in:
- lib/log-courier/client.rb
Overview
Implementation of a single client connection
Instance Method Summary collapse
- #buffer_jdat_data(events, nonce) ⇒ Object
- #buffer_jdat_data_event(buffer, event) ⇒ Object
- #generate_nonce ⇒ Object
-
#initialize(options = {}) ⇒ Client
constructor
A new instance of Client.
- #process_ackn(message) ⇒ Object
- #process_pong(message) ⇒ Object
- #publish(event) ⇒ Object
- #reset_keepalive ⇒ Object
- #run_io ⇒ Object
- #run_spooler ⇒ Object
- #send_jdat(events) ⇒ Object
- #send_ping ⇒ Object
- #shutdown ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Client
Returns a new instance of Client.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/log-courier/client.rb', line 53 def initialize( = {}) @options = { logger: nil, spool_size: 1024, idle_timeout: 5 }.merge!() @logger = @options[:logger] require 'log-courier/client_tls' @client = ClientTls.new(@options) # Load the json adapter @json_adapter = MultiJson.adapter.instance @event_queue = EventQueue.new @options[:spool_size] @pending_payloads = {} @first_payload = nil @last_payload = nil # Start the spooler which will collect events into chunks @send_ready = false @send_mutex = Mutex.new @send_cond = ConditionVariable.new @spooler_thread = Thread.new do run_spooler end @pending_ping = false # Start the IO thread @io_control = EventQueue.new 1 @io_thread = Thread.new do run_io end end |
Instance Method Details
#buffer_jdat_data(events, nonce) ⇒ Object
293 294 295 296 297 298 299 300 301 302 303 |
# File 'lib/log-courier/client.rb', line 293 def buffer_jdat_data(events, nonce) buffer = Zlib::Deflate.new # Write each event in JSON format events.each do |event| buffer_jdat_data_event(buffer, event) end # Generate and return the message nonce + buffer.flush(Zlib::FINISH) end |
#buffer_jdat_data_event(buffer, event) ⇒ Object
305 306 307 308 309 310 |
# File 'lib/log-courier/client.rb', line 305 def buffer_jdat_data_event(buffer, event) json_data = @json_adapter.dump(event) # Add length and then the data buffer << [json_data.length].pack('N') << json_data end |
#generate_nonce ⇒ Object
258 259 260 |
# File 'lib/log-courier/client.rb', line 258 def generate_nonce (0...16).map { rand(256).chr }.join("") end |
#process_ackn(message) ⇒ Object
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
# File 'lib/log-courier/client.rb', line 322 def process_ackn() # Sanity if .length != 20 raise ProtocolError, "ACKN message size invalid (#{.length})" end # Grab nonce sequence, nonce = [0...4].unpack('N').first, [4..-1] # Find the payload - skip if we couldn't as it will just a duplicated ACK return unless @pending_payloads.key?(nonce) payload = @pending_payloads[nonce] # Full ACK? # TODO: protocol error if sequence too large? if sequence >= payload.events.length @client.resume_send if @client.send_paused? @pending_payloads.delete nonce payload.previous.next = payload.next else # Partial ACK - only process if something was actually processed if sequence > payload.ack_events payload.ack_events = sequence payload.events = payload.events[0...sequence] payload.data = nil end end end |
#process_pong(message) ⇒ Object
312 313 314 315 316 317 318 319 320 |
# File 'lib/log-courier/client.rb', line 312 def process_pong() # Sanity if .length != 0 raise ProtocolError, "Unexpected data attached to pong message (#{.length})" end # No longer pending a PONG @ping_pending = false end |
#publish(event) ⇒ Object
90 91 92 93 |
# File 'lib/log-courier/client.rb', line 90 def publish(event) # Pass the event into the spooler @event_queue << event end |
#reset_keepalive ⇒ Object
254 255 256 |
# File 'lib/log-courier/client.rb', line 254 def reset_keepalive @keepalive_next = Time.now.to_i + @keepalive_timeout end |
#run_io ⇒ Object
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/log-courier/client.rb', line 132 def run_io # TODO: Make keepalive configurable? @keepalive_timeout = 1800 # TODO: Make pending payload max configurable? max_pending_payloads = 100 retry_payload = nil can_send = true loop do # Reconnect loop @client.connect @io_control reset_keepalive # Capture send exceptions begin # IO loop loop do catch :keepalive do begin action = @io_control.pop @keepalive_next - Time.now.to_i # Process the action case action[0] when 'S' # If we're flushing through the pending, pick from there unless retry_payload.nil? # Regenerate data if we need to retry_payload.data = buffer_jdat_data(retry_payload.events, retry_payload.nonce) if retry_payload.data == nil # Send and move onto next @client.send 'JDAT', retry_payload.data retry_payload = retry_payload.next throw :keepalive end # Ready to send, allow spooler to pass us something @send_mutex.synchronize do @send_ready = true @send_cond.signal end can_send = true when 'E' # If we have too many pending payloads, pause the IO if @pending_payloads.length + 1 >= max_pending_payloads @client.pause_send end # Received some events - send them send_jdat action[1] # The send action will trigger another "S" if we have more send buffer can_send = false when 'R' # Received a message signature, = action[1..2] case signature when 'PONG' process_pong when 'ACKN' process_ackn else # Unknown message - only listener is allowed to respond with a "????" message # TODO: What should we do? Just ignore for now and let timeouts conquer end when 'F' # Reconnect, an error occurred break end rescue TimeoutError # Keepalive timeout hit, send a PING unless we were awaiting a PONG if @pending_ping # Timed out, break into reconnect raise TimeoutError end # Is send full? can_send will be false if so # We should've started receiving ACK by now so time out raise TimeoutError unless can_send # Send PING send_ping # We may have filled send buffer can_send = false end end # Reset keepalive timeout reset_keepalive end rescue ProtocolError => e # Reconnect required due to a protocol error @logger.warn("[LogCourierClient] Protocol error: #{e}") unless @logger.nil? rescue TimeoutError # Reconnect due to timeout @logger.warn('[LogCourierClient] Timeout occurred') unless @logger.nil? rescue ShutdownSignal # Shutdown, break out break rescue => e # Unknown error occurred @logger.warn("[LogCourierClient] Unknown error: #{e}") unless @logger.nil? @logger.warn("[LogCourierClient] #{e.backtrace}: #{e.} (#{e.class})") unless @logger.nil? end # Disconnect and retry payloads @client.disconnect retry_payload = @first_payload # TODO: Make reconnect time configurable? sleep 5 end @client.disconnect end |
#run_spooler ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/log-courier/client.rb', line 103 def run_spooler loop do spooled = [] next_flush = Time.now.to_i + @options[:idle_timeout] # The spooler loop begin loop do event = @event_queue.pop next_flush - Time.now.to_i spooled.push(event) break if spooled.length >= @options[:spool_size] end rescue TimeoutError # Hit timeout but no events, keep waiting next if spooled.length == 0 end # Pass through to io_control but only if we're ready to send @send_mutex.synchronize do @send_cond.wait(@send_mutex) unless @send_ready @send_ready = false @io_control << ['E', spooled] end end rescue ShutdownSignal # Just shutdown 0 end |
#send_jdat(events) ⇒ Object
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
# File 'lib/log-courier/client.rb', line 267 def send_jdat(events) # Generate the JSON payload and compress it nonce = generate_nonce data = buffer_jdat_data(events, nonce) # Save the pending payload payload = PendingPayload.new( :events => events, :nonce => nonce, :data => data ) @pending_payloads[nonce] = payload if @first_payload.nil? @first_payload = payload @last_payload = payload else @last_payload.next = payload @last_payload = payload end # Send it @client.send 'JDAT', payload.data end |
#send_ping ⇒ Object
262 263 264 265 |
# File 'lib/log-courier/client.rb', line 262 def send_ping # Send it @client.send 'PING', '' end |
#shutdown ⇒ Object
95 96 97 98 99 100 101 |
# File 'lib/log-courier/client.rb', line 95 def shutdown # Raise a shutdown signal in the spooler and wait for it @spooler_thread.raise ShutdownSignal @io_thread.raise ShutdownSignal @spooler_thread.join @io_thread.join end |