Class: Flydata::Output::TcpForwarder

Inherits:
Object
  • Object
show all
Defined in:
lib/flydata/command/sync.rb

Direct Known Subclasses

SslForwarder

Constant Summary collapse

FORWARD_HEADER =
[0x92].pack('C')
BUFFER_SIZE =

32M

1024 * 1024 * 32
DEFUALT_SEND_TIMEOUT =

1 minute

60
RETRY_INTERVAL =
2
RETRY_LIMIT =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(tag, servers, options = {}) ⇒ TcpForwarder

Returns a new instance of TcpForwarder.



299
300
301
302
303
304
305
306
307
308
# File 'lib/flydata/command/sync.rb', line 299

def initialize(tag, servers, options = {})
  @tag = tag
  unless servers and servers.kind_of?(Array) and not servers.empty?
    raise "Servers must not be empty."
  end
  @servers = servers
  @server_index = 0
  set_options(options)
  reset
end

Instance Attribute Details

#buffer_record_countObject (readonly)

Returns the value of attribute buffer_record_count.



318
319
320
# File 'lib/flydata/command/sync.rb', line 318

def buffer_record_count
  @buffer_record_count
end

#buffer_sizeObject (readonly)

Returns the value of attribute buffer_size.



318
319
320
# File 'lib/flydata/command/sync.rb', line 318

def buffer_size
  @buffer_size
end

Instance Method Details

#closeObject



413
414
415
# File 'lib/flydata/command/sync.rb', line 413

def close
  flush
end

#connect(server) ⇒ Object



390
391
392
393
394
395
396
397
398
399
400
401
# File 'lib/flydata/command/sync.rb', line 390

def connect(server)
  host, port = server.split(':')
  sock = TCPSocket.new(host, port.to_i)

  # Set options
  opt = [1, DEFUALT_SEND_TIMEOUT].pack('I!I!')
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
  opt = [DEFUALT_SEND_TIMEOUT, 0].pack('L!L!')
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

  sock
end

#emit(records, time = Time.now.to_i) ⇒ Object



320
321
322
323
324
325
326
327
328
329
330
331
332
333
# File 'lib/flydata/command/sync.rb', line 320

def emit(records, time = Time.now.to_i)
  records = [records] unless records.kind_of?(Array)
  records.each do |record|
    event_data = [time,record].to_msgpack
    @buffer_records << event_data
    @buffer_record_count += 1
    @buffer_size += event_data.bytesize
  end
  if @buffer_size > @buffer_size_limit
    send
  else
    false
  end
end

#flushObject



409
410
411
# File 'lib/flydata/command/sync.rb', line 409

def flush
  send
end

#pickup_serverObject

TODO: Check server status



381
382
383
384
385
386
387
388
# File 'lib/flydata/command/sync.rb', line 381

def pickup_server
  ret_server = @servers[@server_index]
  @server_index += 1
  if @server_index >= (@servers.count)
    @server_index = 0
  end
  ret_server
end

#resetObject



403
404
405
406
407
# File 'lib/flydata/command/sync.rb', line 403

def reset
  @buffer_records = ''
  @buffer_record_count = 0
  @buffer_size = 0
end

#sendObject

TODO retry logic



336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
# File 'lib/flydata/command/sync.rb', line 336

def send
  if @buffer_size > 0
    puts "  -> Sending #{@buffer_record_count}records #{@buffer_size}byte"
  else
    return false
  end
  if ENV['FLYDATA_BENCHMARK']
    reset
    return true
  end
  sock = nil
  retry_count = 0
  begin
    sock = connect(pickup_server)

    # Write header
    sock.write FORWARD_HEADER
    # Write tag
    sock.write @tag.to_msgpack
    # Write records
    sock.write [0xdb, @buffer_records.bytesize].pack('CN')
    StringIO.open(@buffer_records) do |i|
      FileUtils.copy_stream(i, sock)
    end
  rescue => e
    retry_count += 1
    if retry_count > RETRY_LIMIT
      puts "! Error: Failed to send data. Exceeded the retry limit. retry_count:#{retry_count}"
      raise e
    end
    puts "! Warn: Retring to send data. retry_count:#{retry_count} error=#{e.to_s}"
    wait_time = RETRY_INTERVAL ** retry_count
    puts "  Now waiting for next retry. time=#{wait_time}sec"
    sleep wait_time
    retry
  ensure
    if sock
      sock.close rescue nil
    end
  end
  reset
  true
end

#set_options(options) ⇒ Object



310
311
312
313
314
315
316
# File 'lib/flydata/command/sync.rb', line 310

def set_options(options)
  if options[:buffer_size_limit]
    @buffer_size_limit = options[:buffer_size_limit]
  else
    @buffer_size_limit = BUFFER_SIZE
  end
end