Method: Flydata::Output::TcpForwarder#send

Defined in:
lib/flydata/output/forwarder.rb

#sendObject

TODO retry logic



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/flydata/output/forwarder.rb', line 71

def send
  if @buffer_size > 0
  else
    return false
  end
  if ENV['FLYDATA_BENCHMARK']
    reset
    return true
  end
  sock = nil
  retry_count = 0
  begin
    sock = connect(pickup_server)

    # Write header
    sock.write FORWARD_HEADER
    # Write tag
    sock.write @tag.to_msgpack
    # Write records
    sock.write [0xdb, @buffer_records.bytesize].pack('CN')
    StringIO.open(@buffer_records) do |i|
      FileUtils.copy_stream(i, sock)
    end
  rescue => e
    retry_count += 1
    if retry_count > RETRY_LIMIT
      log_error_stderr("! Error: Failed to send data. Exceeded the retry limit. retry_count:#{retry_count}")
      raise e
    end
    log_warn_stderr("! Warn: Retrying to send data. retry_count:#{retry_count} error=#{e.to_s}")
    wait_time = RETRY_INTERVAL ** retry_count
    log_warn_stderr("  Now waiting for next retry. time=#{wait_time}sec")
    sleep wait_time
    retry
  ensure
    if sock
      sock.close rescue nil
    end
  end
  reset
  true
end