Class: Flydata::Output::TcpForwarder

Inherits:
Object
  • Object
show all
Defined in:
lib/flydata/command/sync.rb

Direct Known Subclasses

SslForwarder

Constant Summary collapse

FORWARD_HEADER =
[0x92].pack('C')
BUFFER_SIZE =

32M

1024 * 1024 * 32
DEFUALT_SEND_TIMEOUT =

1 minute

60
RETRY_INTERVAL =
2
RETRY_LIMIT =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(tag, servers, options = {}) ⇒ TcpForwarder

Returns a new instance of TcpForwarder.



538
539
540
541
542
543
544
545
546
547
# File 'lib/flydata/command/sync.rb', line 538

def initialize(tag, servers, options = {})
  @tag = tag
  unless servers and servers.kind_of?(Array) and not servers.empty?
    raise "Servers must not be empty."
  end
  @servers = servers
  @server_index = 0
  set_options(options)
  reset
end

Instance Attribute Details

#buffer_record_countObject (readonly)

Returns the value of attribute buffer_record_count.



557
558
559
# File 'lib/flydata/command/sync.rb', line 557

def buffer_record_count
  @buffer_record_count
end

#buffer_sizeObject (readonly)

Returns the value of attribute buffer_size.



557
558
559
# File 'lib/flydata/command/sync.rb', line 557

def buffer_size
  @buffer_size
end

Instance Method Details

#closeObject



651
652
653
# File 'lib/flydata/command/sync.rb', line 651

def close
  flush
end

#connect(server) ⇒ Object



628
629
630
631
632
633
634
635
636
637
638
639
# File 'lib/flydata/command/sync.rb', line 628

def connect(server)
  host, port = server.split(':')
  sock = TCPSocket.new(host, port.to_i)

  # Set options
  opt = [1, DEFUALT_SEND_TIMEOUT].pack('I!I!')
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
  opt = [DEFUALT_SEND_TIMEOUT, 0].pack('L!L!')
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

  sock
end

#emit(records, time = Time.now.to_i) ⇒ Object



559
560
561
562
563
564
565
566
567
568
569
570
571
572
# File 'lib/flydata/command/sync.rb', line 559

def emit(records, time = Time.now.to_i)
  records = [records] unless records.kind_of?(Array)
  records.each do |record|
    event_data = [time,record].to_msgpack
    @buffer_records << event_data
    @buffer_record_count += 1
    @buffer_size += event_data.bytesize
  end
  if @buffer_size > @buffer_size_limit
    send
  else
    false
  end
end

#flushObject



647
648
649
# File 'lib/flydata/command/sync.rb', line 647

def flush
  send
end

#pickup_serverObject

TODO: Check server status



619
620
621
622
623
624
625
626
# File 'lib/flydata/command/sync.rb', line 619

def pickup_server
  ret_server = @servers[@server_index]
  @server_index += 1
  if @server_index >= (@servers.count)
    @server_index = 0
  end
  ret_server
end

#resetObject



641
642
643
644
645
# File 'lib/flydata/command/sync.rb', line 641

def reset
  @buffer_records = ''
  @buffer_record_count = 0
  @buffer_size = 0
end

#sendObject

TODO retry logic



575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
# File 'lib/flydata/command/sync.rb', line 575

def send
  if @buffer_size > 0
  else
    return false
  end
  if ENV['FLYDATA_BENCHMARK']
    reset
    return true
  end
  sock = nil
  retry_count = 0
  begin
    sock = connect(pickup_server)

    # Write header
    sock.write FORWARD_HEADER
    # Write tag
    sock.write @tag.to_msgpack
    # Write records
    sock.write [0xdb, @buffer_records.bytesize].pack('CN')
    StringIO.open(@buffer_records) do |i|
      FileUtils.copy_stream(i, sock)
    end
  rescue => e
    retry_count += 1
    if retry_count > RETRY_LIMIT
      puts "! Error: Failed to send data. Exceeded the retry limit. retry_count:#{retry_count}"
      raise e
    end
    puts "! Warn: Retring to send data. retry_count:#{retry_count} error=#{e.to_s}"
    wait_time = RETRY_INTERVAL ** retry_count
    puts "  Now waiting for next retry. time=#{wait_time}sec"
    sleep wait_time
    retry
  ensure
    if sock
      sock.close rescue nil
    end
  end
  reset
  true
end

#set_options(options) ⇒ Object



549
550
551
552
553
554
555
# File 'lib/flydata/command/sync.rb', line 549

def set_options(options)
  if options[:buffer_size_limit]
    @buffer_size_limit = options[:buffer_size_limit]
  else
    @buffer_size_limit = BUFFER_SIZE
  end
end