Class: Fluent::StreamInput::Handler

Inherits:
Coolio::Socket
  • Object
show all
Defined in:
lib/fluent/plugin/in_unix.rb

Instance Method Summary collapse

Constructor Details

#initialize(io, log, on_message) ⇒ Handler

Returns a new instance of Handler.



121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/fluent/plugin/in_unix.rb', line 121

def initialize(io, log, on_message)
  super(io)
  if io.is_a?(TCPSocket)
    opt = [1, @timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
    io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
  end
  @on_message = on_message
  @log = log
  @log.trace {
    remote_port, remote_addr = *Socket.unpack_sockaddr_in(@_io.getpeername) rescue nil
    "accepted fluent socket from '#{remote_addr}:#{remote_port}': object_id=#{self.object_id}"
  }
end

Instance Method Details

#on_closeObject



171
172
173
# File 'lib/fluent/plugin/in_unix.rb', line 171

def on_close
  @log.trace { "closed fluent socket object_id=#{self.object_id}" }
end

#on_connectObject



135
136
# File 'lib/fluent/plugin/in_unix.rb', line 135

def on_connect
end

#on_read(data) ⇒ Object



138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/fluent/plugin/in_unix.rb', line 138

def on_read(data)
  first = data[0]
  if first == '{' || first == '['
    m = method(:on_read_json)
    @y = Yajl::Parser.new
    @y.on_parse_complete = @on_message
  else
    m = method(:on_read_msgpack)
    @u = Fluent::Engine.msgpack_factory.unpacker
  end

  (class << self; self; end).module_eval do
    define_method(:on_read, m)
  end
  m.call(data)
end

#on_read_json(data) ⇒ Object



155
156
157
158
159
160
161
# File 'lib/fluent/plugin/in_unix.rb', line 155

def on_read_json(data)
  @y << data
rescue
  @log.error "unexpected error", error: $!.to_s
  @log.error_backtrace
  close
end

#on_read_msgpack(data) ⇒ Object



163
164
165
166
167
168
169
# File 'lib/fluent/plugin/in_unix.rb', line 163

def on_read_msgpack(data)
  @u.feed_each(data, &@on_message)
rescue
  @log.error "unexpected error", error: $!.to_s
  @log.error_backtrace
  close
end