Class: Flydata::Output::TcpForwarder

Inherits:
Object
  • Object
show all
Defined in:
lib/flydata/command/sync.rb

Direct Known Subclasses

SslForwarder

Constant Summary collapse

FORWARD_HEADER =
[0x92].pack('C')
BUFFER_SIZE =

32M

1024 * 1024 * 32
DEFUALT_SEND_TIMEOUT =

1 minute

60
RETRY_INTERVAL =
2
RETRY_LIMIT =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(tag, servers, options = {}) ⇒ TcpForwarder

Returns a new instance of TcpForwarder.



499
500
501
502
503
504
505
506
507
508
# File 'lib/flydata/command/sync.rb', line 499

def initialize(tag, servers, options = {})
  @tag = tag
  unless servers and servers.kind_of?(Array) and not servers.empty?
    raise "Servers must not be empty."
  end
  @servers = servers
  @server_index = 0
  set_options(options)
  reset
end

Instance Attribute Details

#buffer_record_countObject (readonly)

Returns the value of attribute buffer_record_count.



518
519
520
# File 'lib/flydata/command/sync.rb', line 518

def buffer_record_count
  @buffer_record_count
end

#buffer_sizeObject (readonly)

Returns the value of attribute buffer_size.



518
519
520
# File 'lib/flydata/command/sync.rb', line 518

def buffer_size
  @buffer_size
end

Instance Method Details

#closeObject



612
613
614
# File 'lib/flydata/command/sync.rb', line 612

def close
  flush
end

#connect(server) ⇒ Object



589
590
591
592
593
594
595
596
597
598
599
600
# File 'lib/flydata/command/sync.rb', line 589

def connect(server)
  host, port = server.split(':')
  sock = TCPSocket.new(host, port.to_i)

  # Set options
  opt = [1, DEFUALT_SEND_TIMEOUT].pack('I!I!')
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
  opt = [DEFUALT_SEND_TIMEOUT, 0].pack('L!L!')
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

  sock
end

#emit(records, time = Time.now.to_i) ⇒ Object



520
521
522
523
524
525
526
527
528
529
530
531
532
533
# File 'lib/flydata/command/sync.rb', line 520

def emit(records, time = Time.now.to_i)
  records = [records] unless records.kind_of?(Array)
  records.each do |record|
    event_data = [time,record].to_msgpack
    @buffer_records << event_data
    @buffer_record_count += 1
    @buffer_size += event_data.bytesize
  end
  if @buffer_size > @buffer_size_limit
    send
  else
    false
  end
end

#flushObject



608
609
610
# File 'lib/flydata/command/sync.rb', line 608

def flush
  send
end

#pickup_serverObject

TODO: Check server status



580
581
582
583
584
585
586
587
# File 'lib/flydata/command/sync.rb', line 580

def pickup_server
  ret_server = @servers[@server_index]
  @server_index += 1
  if @server_index >= (@servers.count)
    @server_index = 0
  end
  ret_server
end

#resetObject



602
603
604
605
606
# File 'lib/flydata/command/sync.rb', line 602

def reset
  @buffer_records = ''
  @buffer_record_count = 0
  @buffer_size = 0
end

#sendObject

TODO retry logic



536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
# File 'lib/flydata/command/sync.rb', line 536

def send
  if @buffer_size > 0
  else
    return false
  end
  if ENV['FLYDATA_BENCHMARK']
    reset
    return true
  end
  sock = nil
  retry_count = 0
  begin
    sock = connect(pickup_server)

    # Write header
    sock.write FORWARD_HEADER
    # Write tag
    sock.write @tag.to_msgpack
    # Write records
    sock.write [0xdb, @buffer_records.bytesize].pack('CN')
    StringIO.open(@buffer_records) do |i|
      FileUtils.copy_stream(i, sock)
    end
  rescue => e
    retry_count += 1
    if retry_count > RETRY_LIMIT
      puts "! Error: Failed to send data. Exceeded the retry limit. retry_count:#{retry_count}"
      raise e
    end
    puts "! Warn: Retring to send data. retry_count:#{retry_count} error=#{e.to_s}"
    wait_time = RETRY_INTERVAL ** retry_count
    puts "  Now waiting for next retry. time=#{wait_time}sec"
    sleep wait_time
    retry
  ensure
    if sock
      sock.close rescue nil
    end
  end
  reset
  true
end

#set_options(options) ⇒ Object



510
511
512
513
514
515
516
# File 'lib/flydata/command/sync.rb', line 510

def set_options(options)
  if options[:buffer_size_limit]
    @buffer_size_limit = options[:buffer_size_limit]
  else
    @buffer_size_limit = BUFFER_SIZE
  end
end