Class: Fluent::ForwardInput
Defined Under Namespace
Classes: Handler, HeartbeatRequestHandler
Constant Summary collapse
- LISTEN_PORT =
24224
Instance Method Summary collapse
- #configure(conf) ⇒ Object
-
#initialize ⇒ ForwardInput
constructor
A new instance of ForwardInput.
- #listen(client) ⇒ Object
-
#run ⇒ Object
config_param :path, :string, :default => DEFAULT_SOCKET_PATH def listen if File.exist?(@path) File.unlink(@path) end FileUtils.mkdir_p File.dirname(@path) log.debug “listening fluent socket on #@path” Coolio::UNIXServer.new(@path, Handler, method(:on_message)) end.
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize ⇒ ForwardInput
Returns a new instance of ForwardInput.
30 31 32 33 |
# File 'lib/fluent/plugin/in_forward.rb', line 30 def initialize super require 'fluent/plugin/socket_util' end |
Instance Method Details
#configure(conf) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/fluent/plugin/in_forward.rb', line 88 def configure(conf) super if @security if @security.user_auth && @security.users.empty? raise Fluent::ConfigError, "<user> sections required if user_auth enabled" end if !@security.allow_anonymous_source && @security.clients.empty? raise Fluent::ConfigError, "<client> sections required if allow_anonymous_source disabled" end @nodes = [] @security.clients.each do |client| if client.host && client.network raise Fluent::ConfigError, "both of 'host' and 'network' are specified for client" end if !client.host && !client.network raise Fluent::ConfigError, "Either of 'host' and 'network' must be specified for client" end source = nil if client.host begin source = IPSocket.getaddress(client.host) rescue SocketError => e raise Fluent::ConfigError, "host '#{client.host}' cannot be resolved" end end source_addr = begin IPAddr.new(source || client.network) rescue ArgumentError => e raise Fluent::ConfigError, "network '#{client.network}' address format is invalid" end @nodes.push({ address: source_addr, shared_key: (client.shared_key || @security.shared_key), users: client.users }) end end end |
#listen(client) ⇒ Object
171 172 173 174 175 176 177 |
# File 'lib/fluent/plugin/in_forward.rb', line 171 def listen(client) log.info "listening fluent socket on #{@bind}:#{@port}" sock = client.listen_tcp(@bind, @port) s = Coolio::TCPServer.new(sock, nil, Handler, @linger_timeout, log, method(:handle_connection)) s.listen(@backlog) unless @backlog.nil? s end |
#run ⇒ Object
config_param :path, :string, :default => DEFAULT_SOCKET_PATH def listen
if File.exist?(@path)
File.unlink(@path)
end
FileUtils.mkdir_p File.dirname(@path)
log.debug "listening fluent socket on #{@path}"
Coolio::UNIXServer.new(@path, Handler, method(:on_message))
end
189 190 191 192 193 194 |
# File 'lib/fluent/plugin/in_forward.rb', line 189 def run @loop.run(@blocking_timeout) rescue => e log.error "unexpected error", error: e log.error_backtrace end |
#shutdown ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/fluent/plugin/in_forward.rb', line 152 def shutdown # In test cases it occasionally appeared that when detaching a watcher, another watcher is also detached. # In the case in the iteration of watchers, a watcher that has been already detached is intended to be detached # and therfore RuntimeError occurs saying that it is not attached to a loop. # It occurs only when testing for sending responses to ForwardOutput. # Sending responses needs to write the socket that is previously used only to read # and a handler has 2 watchers that is used to read and to write. # This problem occurs possibly because those watchers are thought to be related to each other # and when detaching one of them the other is also detached for some reasons. # As a workaround, check if watchers are attached before detaching them. @loop.watchers.each {|w| w.detach if w.attached? } @loop.stop @usock.close @thread.join @lsock.close super end |
#start ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
# File 'lib/fluent/plugin/in_forward.rb', line 130 def start super @loop = Coolio::Loop.new socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] if Fluent.windows? socket_manager_path = socket_manager_path.to_i end client = ServerEngine::SocketManager::Client.new(socket_manager_path) @lsock = listen(client) @loop.attach(@lsock) @usock = client.listen_udp(@bind, @port) @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) @hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request)) @loop.attach(@hbr) @thread = Thread.new(&method(:run)) end |