Class: Fluent::ForwardInput
Defined Under Namespace
Classes: Handler, HeartbeatRequestHandler
Constant Summary collapse
- LISTEN_PORT =
24224
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ ForwardInput
constructor
A new instance of ForwardInput.
- #listen(client) ⇒ Object
-
#run ⇒ Object
config_param :path, :string, :default => DEFAULT_SOCKET_PATH def listen if File.exist?(@path) File.unlink(@path) end FileUtils.mkdir_p File.dirname(@path) log.debug “listening fluent socket on #@path” Coolio::UNIXServer.new(@path, Handler, method(:on_message)) end.
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ ForwardInput
Returns a new instance of ForwardInput.
30 31 32 33 |
# File 'lib/fluent/plugin/in_forward.rb', line 30 def initialize super require 'fluent/plugin/socket_util' end |
Instance Method Details
#configure(conf) ⇒ Object
55 56 57 |
# File 'lib/fluent/plugin/in_forward.rb', line 55 def configure(conf) super end |
#listen(client) ⇒ Object
100 101 102 103 104 105 106 |
# File 'lib/fluent/plugin/in_forward.rb', line 100 def listen(client) log.info "listening fluent socket on #{@bind}:#{@port}" sock = client.listen_tcp(@bind, @port) s = Coolio::TCPServer.new(sock, nil, Handler, @linger_timeout, log, method(:on_message)) s.listen(@backlog) unless @backlog.nil? s end |
#run ⇒ Object
config_param :path, :string, :default => DEFAULT_SOCKET_PATH def listen
if File.exist?(@path)
File.unlink(@path)
end
FileUtils.mkdir_p File.dirname(@path)
log.debug "listening fluent socket on #{@path}"
Coolio::UNIXServer.new(@path, Handler, method(:on_message))
end
118 119 120 121 122 123 |
# File 'lib/fluent/plugin/in_forward.rb', line 118 def run @loop.run(@blocking_timeout) rescue => e log.error "unexpected error", error: e log.error_backtrace end |
#shutdown ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/fluent/plugin/in_forward.rb', line 81 def shutdown # In test cases it occasionally appeared that when detaching a watcher, another watcher is also detached. # In the case in the iteration of watchers, a watcher that has been already detached is intended to be detached # and therfore RuntimeError occurs saying that it is not attached to a loop. # It occures only when testing for sending responses to ForwardOutput. # Sending responses needs to write the socket that is previously used only to read # and a handler has 2 watchers that is used to read and to write. # This problem occurs possibly because those watchers are thought to be related to each other # and when detaching one of them the other is also detached for some reasons. # As a workaround, check if watchers are attached before detaching them. @loop.watchers.each {|w| w.detach if w.attached? } @loop.stop @usock.close @thread.join @lsock.close super end |
#start ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/fluent/plugin/in_forward.rb', line 59 def start super @loop = Coolio::Loop.new socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] if Fluent.windows? socket_manager_path = socket_manager_path.to_i end client = ServerEngine::SocketManager::Client.new(socket_manager_path) @lsock = listen(client) @loop.attach(@lsock) @usock = client.listen_udp(@bind, @port) @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request)) @loop.attach(@hbr) @thread = Thread.new(&method(:run)) end |