Class: Flydata::Output::TcpForwarder

Inherits:
Object
  • Object
show all
Includes:
CommandLoggable
Defined in:
lib/flydata/output/forwarder.rb

Direct Known Subclasses

SslForwarder

Constant Summary collapse

FORWARD_HEADER =
[0x92].pack('C')
BUFFER_SIZE =

32M

1024 * 1024 * 32
DEFUALT_SEND_TIMEOUT =

1 minute

60
RETRY_INTERVAL =
2
RETRY_LIMIT =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from CommandLoggable

#before_logging, #log_error_stderr, #log_info_stdout, #log_warn_stderr

Constructor Details

#initialize(tag, servers, options = {}) ⇒ TcpForwarder

Returns a new instance of TcpForwarder.


34
35
36
37
38
39
40
41
42
43
# File 'lib/flydata/output/forwarder.rb', line 34

def initialize(tag, servers, options = {})
  @tag = tag
  unless servers and servers.kind_of?(Array) and not servers.empty?
    raise "Servers must not be empty."
  end
  @servers = servers
  @server_index = 0
  set_options(options)
  reset
end

Instance Attribute Details

#buffer_record_countObject (readonly)

Returns the value of attribute buffer_record_count


53
54
55
# File 'lib/flydata/output/forwarder.rb', line 53

def buffer_record_count
  @buffer_record_count
end

#buffer_sizeObject (readonly)

Returns the value of attribute buffer_size


53
54
55
# File 'lib/flydata/output/forwarder.rb', line 53

def buffer_size
  @buffer_size
end

Instance Method Details

#buffer_full?Boolean

Returns:

  • (Boolean)

114
115
116
# File 'lib/flydata/output/forwarder.rb', line 114

def buffer_full?
  @buffer_size > @buffer_size_limit
end

#closeObject


151
152
153
# File 'lib/flydata/output/forwarder.rb', line 151

def close
  flush
end

#connect(server) ⇒ Object


128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/flydata/output/forwarder.rb', line 128

def connect(server)
  host, port = server.split(':')
  sock = TCPSocket.new(host, port.to_i)

  # Set options
  opt = [1, DEFUALT_SEND_TIMEOUT].pack('I!I!')
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
  opt = [DEFUALT_SEND_TIMEOUT, 0].pack('L!L!')
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

  sock
end

#emit(records, time = Time.now.to_i) ⇒ Object


55
56
57
58
59
60
61
62
63
64
# File 'lib/flydata/output/forwarder.rb', line 55

def emit(records, time = Time.now.to_i)
  records = [records] unless records.kind_of?(Array)
  records.each do |record|
    event_data = [time,record].to_msgpack
    @buffer_records << event_data
    @buffer_record_count += 1
    @buffer_size += event_data.bytesize
  end
  buffer_full?
end

#flushObject


147
148
149
# File 'lib/flydata/output/forwarder.rb', line 147

def flush
  send
end

#pickup_serverObject

TODO: Check server status


119
120
121
122
123
124
125
126
# File 'lib/flydata/output/forwarder.rb', line 119

def pickup_server
  ret_server = @servers[@server_index]
  @server_index += 1
  if @server_index >= (@servers.count)
    @server_index = 0
  end
  ret_server
end

#resetObject


141
142
143
144
145
# File 'lib/flydata/output/forwarder.rb', line 141

def reset
  @buffer_records = ''
  @buffer_record_count = 0
  @buffer_size = 0
end

#sendObject

TODO retry logic


67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/flydata/output/forwarder.rb', line 67

def send
  byte_size = nil
  record_count = nil
  if @buffer_size > 0
  else
    return false
  end
  if ENV['FLYDATA_BENCHMARK']
    reset
    return true
  end
  sock = nil
  retry_count = 0
  byte_size = @buffer_size
  record_count = @buffer_record_count
  begin
    sock = connect(pickup_server)

    # Write header
    sock.write FORWARD_HEADER
    # Write tag
    sock.write @tag.to_msgpack
    # Write records
    sock.write [0xdb, @buffer_records.bytesize].pack('CN')
    StringIO.open(@buffer_records) do |i|
      FileUtils.copy_stream(i, sock)
    end
  rescue => e
    retry_count += 1
    if retry_count > RETRY_LIMIT
      log_error_stderr("! Error: Failed to send data. Exceeded the retry limit. retry_count:#{retry_count}")
      raise e
    end
    log_warn_stderr("! Warn: Retrying to send data. retry_count:#{retry_count} error=#{e.to_s}")
    wait_time = RETRY_INTERVAL ** retry_count
    log_warn_stderr("  Now waiting for next retry. time=#{wait_time}sec")
    sleep wait_time
    retry
  ensure
    if sock
      sock.close rescue nil
    end
  end
  reset
  { byte_size: byte_size, record_count: record_count }
end

#set_options(options) ⇒ Object


45
46
47
48
49
50
51
# File 'lib/flydata/output/forwarder.rb', line 45

def set_options(options)
  if options[:buffer_size_limit]
    @buffer_size_limit = options[:buffer_size_limit]
  else
    @buffer_size_limit = BUFFER_SIZE
  end
end