Class: Fluent::ForwardInput::Handler

Inherits:
Coolio::Socket
  • Object
show all
Defined in:
lib/fluent/plugin/in_forward.rb

Constant Summary collapse

PEERADDR_FAILED =
["?", "?", "name resolusion failed", "?"]

Instance Method Summary collapse

Constructor Details

#initialize(io, linger_timeout, log, on_message) ⇒ Handler

Returns a new instance of Handler.



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# File 'lib/fluent/plugin/in_forward.rb', line 239

def initialize(io, linger_timeout, log, on_message)
  super(io)

  @peeraddr = nil
  if io.is_a?(TCPSocket) # for unix domain socket support in the future
    @peeraddr = (io.peeraddr rescue PEERADDR_FAILED)
    opt = [1, linger_timeout].pack('I!I!')  # { int l_onoff; int l_linger; }
    io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
  end

  @chunk_counter = 0
  @on_message = on_message
  @log = log
  @log.trace {
    begin
      remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername)
    rescue => e
      remote_port = nil
      remote_addr = nil
    end
    "accepted fluent socket from '#{remote_addr}:#{remote_port}': object_id=#{self.object_id}"
  }
end

Instance Method Details

#on_closeObject



319
320
321
# File 'lib/fluent/plugin/in_forward.rb', line 319

def on_close
  @log.trace { "closed socket" }
end

#on_connectObject



263
264
# File 'lib/fluent/plugin/in_forward.rb', line 263

def on_connect
end

#on_read(data) ⇒ Object



266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
# File 'lib/fluent/plugin/in_forward.rb', line 266

def on_read(data)
  first = data[0]
  if first == '{' || first == '['
    m = method(:on_read_json)
    @serializer = :to_json.to_proc
    @y = Yajl::Parser.new
    @y.on_parse_complete = lambda { |obj|
      option = @on_message.call(obj, @chunk_counter, @peeraddr)
      respond option if option
      @chunk_counter = 0
    }
  else
    m = method(:on_read_msgpack)
    @serializer = :to_msgpack.to_proc
    @u = Fluent::Engine.msgpack_factory.unpacker
  end

  (class << self; self; end).module_eval do
    define_method(:on_read, m)
  end
  m.call(data)
end

#on_read_json(data) ⇒ Object



289
290
291
292
293
294
295
296
# File 'lib/fluent/plugin/in_forward.rb', line 289

def on_read_json(data)
  @chunk_counter += data.bytesize
  @y << data
rescue => e
  @log.error "forward error", error: e, error_class: e.class
  @log.error_backtrace
  close
end

#on_read_msgpack(data) ⇒ Object



298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/fluent/plugin/in_forward.rb', line 298

def on_read_msgpack(data)
  @chunk_counter += data.bytesize
  @u.feed_each(data) do |obj|
    option = @on_message.call(obj, @chunk_counter, @peeraddr)
    respond option if option
    @chunk_counter = 0
  end
rescue => e
  @log.error "forward error", error: e, error_class: e.class
  @log.error_backtrace
  close
end

#respond(option) ⇒ Object



311
312
313
314
315
316
317
# File 'lib/fluent/plugin/in_forward.rb', line 311

def respond(option)
  if option && option['chunk']
    res = { 'ack' => option['chunk'] }
    write @serializer.call(res)
    @log.trace { "sent response to fluent socket" }
  end
end