Class: LogCourier::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/log-courier/client.rb

Overview

Implementation of a single client connection

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Client

Returns a new instance of Client.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/log-courier/client.rb', line 53

def initialize(options = {})
  @options = {
    logger:       nil,
    spool_size:   1024,
    idle_timeout: 5
  }.merge!(options)

  @logger = @options[:logger]

  require 'log-courier/client_tls'
  @client = ClientTls.new(@options)

  # Load the json adapter
  @json_adapter = MultiJson.adapter.instance

  @event_queue = EventQueue.new @options[:spool_size]
  @pending_payloads = {}
  @first_payload = nil
  @last_payload = nil

  # Start the spooler which will collect events into chunks
  @send_ready = false
  @send_mutex = Mutex.new
  @send_cond = ConditionVariable.new
  @spooler_thread = Thread.new do
    run_spooler
  end

  @pending_ping = false

  # Start the IO thread
  @io_control = EventQueue.new 1
  @io_thread = Thread.new do
    run_io
  end
end

Instance Method Details

#buffer_jdat_data(events, nonce) ⇒ Object



293
294
295
296
297
298
299
300
301
302
303
# File 'lib/log-courier/client.rb', line 293

def buffer_jdat_data(events, nonce)
  buffer = Zlib::Deflate.new

  # Write each event in JSON format
  events.each do |event|
    buffer_jdat_data_event(buffer, event)
  end

  # Generate and return the message
  nonce + buffer.flush(Zlib::FINISH)
end

#buffer_jdat_data_event(buffer, event) ⇒ Object



305
306
307
308
309
310
# File 'lib/log-courier/client.rb', line 305

def buffer_jdat_data_event(buffer, event)
  json_data = @json_adapter.dump(event)

  # Add length and then the data
  buffer << [json_data.length].pack('N') << json_data
end

#generate_nonceObject



258
259
260
# File 'lib/log-courier/client.rb', line 258

def generate_nonce
  (0...16).map { rand(256).chr }.join("")
end

#process_ackn(message) ⇒ Object



322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
# File 'lib/log-courier/client.rb', line 322

def process_ackn(message)
  # Sanity
  if message.length != 20
    raise ProtocolError, "ACKN message size invalid (#{message.length})"
  end

  # Grab nonce
  sequence, nonce = message[0...4].unpack('N').first, message[4..-1]

  # Find the payload - skip if we couldn't as it will just a duplicated ACK
  return unless @pending_payloads.key?(nonce)

  payload = @pending_payloads[nonce]

  # Full ACK?
  # TODO: protocol error if sequence too large?
  if sequence >= payload.events.length
    @client.resume_send if @client.send_paused?

    @pending_payloads.delete nonce
    payload.previous.next = payload.next
  else
    # Partial ACK - only process if something was actually processed
    if sequence > payload.ack_events
      payload.ack_events = sequence
      payload.events = payload.events[0...sequence]
      payload.data = nil
    end
  end
end

#process_pong(message) ⇒ Object



312
313
314
315
316
317
318
319
320
# File 'lib/log-courier/client.rb', line 312

def process_pong(message)
  # Sanity
  if message.length != 0
    raise ProtocolError, "Unexpected data attached to pong message (#{message.length})"
  end

  # No longer pending a PONG
  @ping_pending = false
end

#publish(event) ⇒ Object



90
91
92
93
# File 'lib/log-courier/client.rb', line 90

def publish(event)
  # Pass the event into the spooler
  @event_queue << event
end

#reset_keepaliveObject



254
255
256
# File 'lib/log-courier/client.rb', line 254

def reset_keepalive
  @keepalive_next = Time.now.to_i + @keepalive_timeout
end

#run_ioObject



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
# File 'lib/log-courier/client.rb', line 132

def run_io
  # TODO: Make keepalive configurable?
  @keepalive_timeout = 1800

  # TODO: Make pending payload max configurable?
  max_pending_payloads = 100

  retry_payload = nil

  can_send = true

  loop do
    # Reconnect loop
    @client.connect @io_control

    reset_keepalive

    # Capture send exceptions
    begin
      # IO loop
      loop do
        catch :keepalive do
          begin
            action = @io_control.pop @keepalive_next - Time.now.to_i

            # Process the action
            case action[0]
            when 'S'
              # If we're flushing through the pending, pick from there
              unless retry_payload.nil?
                # Regenerate data if we need to
                retry_payload.data = buffer_jdat_data(retry_payload.events, retry_payload.nonce) if retry_payload.data == nil

                # Send and move onto next
                @client.send 'JDAT', retry_payload.data

                retry_payload = retry_payload.next
                throw :keepalive
              end

              # Ready to send, allow spooler to pass us something
              @send_mutex.synchronize do
                @send_ready = true
                @send_cond.signal
              end

              can_send = true
            when 'E'
              # If we have too many pending payloads, pause the IO
              if @pending_payloads.length + 1 >= max_pending_payloads
                @client.pause_send
              end

              # Received some events - send them
              send_jdat action[1]

              # The send action will trigger another "S" if we have more send buffer
              can_send = false
            when 'R'
              # Received a message
              signature, message = action[1..2]
              case signature
              when 'PONG'
                process_pong message
              when 'ACKN'
                process_ackn message
              else
                # Unknown message - only listener is allowed to respond with a "????" message
                # TODO: What should we do? Just ignore for now and let timeouts conquer
              end
            when 'F'
              # Reconnect, an error occurred
              break
            end
          rescue TimeoutError
            # Keepalive timeout hit, send a PING unless we were awaiting a PONG
            if @pending_ping
              # Timed out, break into reconnect
              raise TimeoutError
            end

            # Is send full? can_send will be false if so
            # We should've started receiving ACK by now so time out
            raise TimeoutError unless can_send

            # Send PING
            send_ping

            # We may have filled send buffer
            can_send = false
          end
        end

        # Reset keepalive timeout
        reset_keepalive
      end
    rescue ProtocolError => e
      # Reconnect required due to a protocol error
      @logger.warn("[LogCourierClient] Protocol error: #{e}") unless @logger.nil?
    rescue TimeoutError
      # Reconnect due to timeout
      @logger.warn('[LogCourierClient] Timeout occurred') unless @logger.nil?
    rescue ShutdownSignal
      # Shutdown, break out
      break
    rescue => e
      # Unknown error occurred
      @logger.warn("[LogCourierClient] Unknown error: #{e}") unless @logger.nil?
      @logger.warn("[LogCourierClient] #{e.backtrace}: #{e.message} (#{e.class})") unless @logger.nil?
    end

    # Disconnect and retry payloads
    @client.disconnect
    retry_payload = @first_payload

    # TODO: Make reconnect time configurable?
    sleep 5
  end

  @client.disconnect
end

#run_spoolerObject



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/log-courier/client.rb', line 103

def run_spooler
  loop do
    spooled = []
    next_flush = Time.now.to_i + @options[:idle_timeout]

    # The spooler loop
    begin
      loop do
        event = @event_queue.pop next_flush - Time.now.to_i
        spooled.push(event)
        break if spooled.length >= @options[:spool_size]
      end
    rescue TimeoutError
      # Hit timeout but no events, keep waiting
      next if spooled.length == 0
    end

    # Pass through to io_control but only if we're ready to send
    @send_mutex.synchronize do
      @send_cond.wait(@send_mutex) unless @send_ready
      @send_ready = false
      @io_control << ['E', spooled]
    end
  end
rescue ShutdownSignal
  # Just shutdown
  0
end

#send_jdat(events) ⇒ Object



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/log-courier/client.rb', line 267

def send_jdat(events)
  # Generate the JSON payload and compress it
  nonce = generate_nonce
  data = buffer_jdat_data(events, nonce)

  # Save the pending payload
  payload = PendingPayload.new(
    :events => events,
    :nonce  => nonce,
    :data   => data
  )

  @pending_payloads[nonce] = payload

  if @first_payload.nil?
    @first_payload = payload
    @last_payload = payload
  else
    @last_payload.next = payload
    @last_payload = payload
  end

  # Send it
  @client.send 'JDAT', payload.data
end

#send_pingObject



262
263
264
265
# File 'lib/log-courier/client.rb', line 262

def send_ping
  # Send it
  @client.send 'PING', ''
end

#shutdownObject



95
96
97
98
99
100
101
# File 'lib/log-courier/client.rb', line 95

def shutdown
  # Raise a shutdown signal in the spooler and wait for it
  @spooler_thread.raise ShutdownSignal
  @io_thread.raise ShutdownSignal
  @spooler_thread.join
  @io_thread.join
end