Class: Flydata::Output::TcpForwarder

Inherits:
Object
  • Object
show all
Includes:
CommandLoggable
Defined in:
lib/flydata/output/forwarder.rb

Direct Known Subclasses

SslForwarder

Constant Summary collapse

FORWARD_HEADER =
[0x92].pack('C')
BUFFER_SIZE =

32M

1024 * 1024 * 32
DEFUALT_SEND_TIMEOUT =

1 minute

60
RETRY_INTERVAL =
2
RETRY_LIMIT =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from CommandLoggable

#before_logging, #log_error_stderr, #log_info_stdout, #log_warn_stderr

Constructor Details

#initialize(tag, servers, options = {}) ⇒ TcpForwarder

Returns a new instance of TcpForwarder.


34
35
36
37
38
39
40
41
42
43
# File 'lib/flydata/output/forwarder.rb', line 34

def initialize(tag, servers, options = {})
  @tag = tag
  unless servers and servers.kind_of?(Array) and not servers.empty?
    raise "Servers must not be empty."
  end
  @servers = servers
  @server_index = 0
  set_options(options)
  reset
end

Instance Attribute Details

#buffer_record_countObject (readonly)

Returns the value of attribute buffer_record_count.


53
54
55
# File 'lib/flydata/output/forwarder.rb', line 53

def buffer_record_count
  @buffer_record_count
end

#buffer_sizeObject (readonly)

Returns the value of attribute buffer_size.


53
54
55
# File 'lib/flydata/output/forwarder.rb', line 53

def buffer_size
  @buffer_size
end

Instance Method Details

#closeObject


149
150
151
# File 'lib/flydata/output/forwarder.rb', line 149

def close
  flush
end

#connect(server) ⇒ Object


126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/flydata/output/forwarder.rb', line 126

def connect(server)
  host, port = server.split(':')
  sock = TCPSocket.new(host, port.to_i)

  # Set options
  opt = [1, DEFUALT_SEND_TIMEOUT].pack('I!I!')
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
  opt = [DEFUALT_SEND_TIMEOUT, 0].pack('L!L!')
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

  sock
end

#emit(records, time = Time.now.to_i) ⇒ Object


55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/flydata/output/forwarder.rb', line 55

def emit(records, time = Time.now.to_i)
  records = [records] unless records.kind_of?(Array)
  records.each do |record|
    event_data = [time,record].to_msgpack
    @buffer_records << event_data
    @buffer_record_count += 1
    @buffer_size += event_data.bytesize
  end
  if @buffer_size > @buffer_size_limit
    send
  else
    false
  end
end

#flushObject


145
146
147
# File 'lib/flydata/output/forwarder.rb', line 145

def flush
  send
end

#pickup_serverObject

TODO: Check server status


117
118
119
120
121
122
123
124
# File 'lib/flydata/output/forwarder.rb', line 117

def pickup_server
  ret_server = @servers[@server_index]
  @server_index += 1
  if @server_index >= (@servers.count)
    @server_index = 0
  end
  ret_server
end

#resetObject


139
140
141
142
143
# File 'lib/flydata/output/forwarder.rb', line 139

def reset
  @buffer_records = ''
  @buffer_record_count = 0
  @buffer_size = 0
end

#sendObject

TODO retry logic


71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/flydata/output/forwarder.rb', line 71

def send
  if @buffer_size > 0
  else
    return false
  end
  if ENV['FLYDATA_BENCHMARK']
    reset
    return true
  end
  sock = nil
  retry_count = 0
  byte_size = @buffer_size
  record_count = @buffer_record_count
  begin
    sock = connect(pickup_server)

    # Write header
    sock.write FORWARD_HEADER
    # Write tag
    sock.write @tag.to_msgpack
    # Write records
    sock.write [0xdb, @buffer_records.bytesize].pack('CN')
    StringIO.open(@buffer_records) do |i|
      FileUtils.copy_stream(i, sock)
    end
  rescue => e
    retry_count += 1
    if retry_count > RETRY_LIMIT
      log_error_stderr("! Error: Failed to send data. Exceeded the retry limit. retry_count:#{retry_count}")
      raise e
    end
    log_warn_stderr("! Warn: Retrying to send data. retry_count:#{retry_count} error=#{e.to_s}")
    wait_time = RETRY_INTERVAL ** retry_count
    log_warn_stderr("  Now waiting for next retry. time=#{wait_time}sec")
    sleep wait_time
    retry
  ensure
    if sock
      sock.close rescue nil
    end
  end
  reset
  { byte_size: byte_size, record_count: record_count }
end

#set_options(options) ⇒ Object


45
46
47
48
49
50
51
# File 'lib/flydata/output/forwarder.rb', line 45

def set_options(options)
  if options[:buffer_size_limit]
    @buffer_size_limit = options[:buffer_size_limit]
  else
    @buffer_size_limit = BUFFER_SIZE
  end
end