Class: Flydata::Output::TcpForwarder

Inherits:
Object
  • Object
show all
Defined in:
lib/flydata/command/sync.rb

Direct Known Subclasses

SslForwarder

Constant Summary collapse

FORWARD_HEADER =
[0x92].pack('C')
BUFFER_SIZE =

32M

1024 * 1024 * 32
DEFUALT_SEND_TIMEOUT =

1 minute

60
RETRY_INTERVAL =
2
RETRY_LIMIT =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(tag, servers, options = {}) ⇒ TcpForwarder

Returns a new instance of TcpForwarder.



529
530
531
532
533
534
535
536
537
538
# File 'lib/flydata/command/sync.rb', line 529

def initialize(tag, servers, options = {})
  @tag = tag
  unless servers and servers.kind_of?(Array) and not servers.empty?
    raise "Servers must not be empty."
  end
  @servers = servers
  @server_index = 0
  set_options(options)
  reset
end

Instance Attribute Details

#buffer_record_countObject (readonly)

Returns the value of attribute buffer_record_count.



548
549
550
# File 'lib/flydata/command/sync.rb', line 548

def buffer_record_count
  @buffer_record_count
end

#buffer_sizeObject (readonly)

Returns the value of attribute buffer_size.



548
549
550
# File 'lib/flydata/command/sync.rb', line 548

def buffer_size
  @buffer_size
end

Instance Method Details

#closeObject



642
643
644
# File 'lib/flydata/command/sync.rb', line 642

def close
  flush
end

#connect(server) ⇒ Object



619
620
621
622
623
624
625
626
627
628
629
630
# File 'lib/flydata/command/sync.rb', line 619

def connect(server)
  host, port = server.split(':')
  sock = TCPSocket.new(host, port.to_i)

  # Set options
  opt = [1, DEFUALT_SEND_TIMEOUT].pack('I!I!')
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
  opt = [DEFUALT_SEND_TIMEOUT, 0].pack('L!L!')
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

  sock
end

#emit(records, time = Time.now.to_i) ⇒ Object



550
551
552
553
554
555
556
557
558
559
560
561
562
563
# File 'lib/flydata/command/sync.rb', line 550

def emit(records, time = Time.now.to_i)
  records = [records] unless records.kind_of?(Array)
  records.each do |record|
    event_data = [time,record].to_msgpack
    @buffer_records << event_data
    @buffer_record_count += 1
    @buffer_size += event_data.bytesize
  end
  if @buffer_size > @buffer_size_limit
    send
  else
    false
  end
end

#flushObject



638
639
640
# File 'lib/flydata/command/sync.rb', line 638

def flush
  send
end

#pickup_serverObject

TODO: Check server status



610
611
612
613
614
615
616
617
# File 'lib/flydata/command/sync.rb', line 610

def pickup_server
  ret_server = @servers[@server_index]
  @server_index += 1
  if @server_index >= (@servers.count)
    @server_index = 0
  end
  ret_server
end

#resetObject



632
633
634
635
636
# File 'lib/flydata/command/sync.rb', line 632

def reset
  @buffer_records = ''
  @buffer_record_count = 0
  @buffer_size = 0
end

#sendObject

TODO retry logic



566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
# File 'lib/flydata/command/sync.rb', line 566

def send
  if @buffer_size > 0
  else
    return false
  end
  if ENV['FLYDATA_BENCHMARK']
    reset
    return true
  end
  sock = nil
  retry_count = 0
  begin
    sock = connect(pickup_server)

    # Write header
    sock.write FORWARD_HEADER
    # Write tag
    sock.write @tag.to_msgpack
    # Write records
    sock.write [0xdb, @buffer_records.bytesize].pack('CN')
    StringIO.open(@buffer_records) do |i|
      FileUtils.copy_stream(i, sock)
    end
  rescue => e
    retry_count += 1
    if retry_count > RETRY_LIMIT
      puts "! Error: Failed to send data. Exceeded the retry limit. retry_count:#{retry_count}"
      raise e
    end
    puts "! Warn: Retring to send data. retry_count:#{retry_count} error=#{e.to_s}"
    wait_time = RETRY_INTERVAL ** retry_count
    puts "  Now waiting for next retry. time=#{wait_time}sec"
    sleep wait_time
    retry
  ensure
    if sock
      sock.close rescue nil
    end
  end
  reset
  true
end

#set_options(options) ⇒ Object



540
541
542
543
544
545
546
# File 'lib/flydata/command/sync.rb', line 540

def set_options(options)
  if options[:buffer_size_limit]
    @buffer_size_limit = options[:buffer_size_limit]
  else
    @buffer_size_limit = BUFFER_SIZE
  end
end