Class: Fluent::ForwardInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_forward.rb

Defined Under Namespace

Classes: Handler, HeartbeatRequestHandler

Constant Summary collapse

LISTEN_PORT =
24224

Instance Method Summary collapse

Constructor Details

#initializeForwardInput

Returns a new instance of ForwardInput.



30
31
32
33
# File 'lib/fluent/plugin/in_forward.rb', line 30

def initialize
  super
  require 'fluent/plugin/socket_util'
end

Instance Method Details

#configure(conf) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/fluent/plugin/in_forward.rb', line 88

def configure(conf)
  super

  if @security
    if @security.user_auth && @security.users.empty?
      raise Fluent::ConfigError, "<user> sections required if user_auth enabled"
    end
    if !@security.allow_anonymous_source && @security.clients.empty?
      raise Fluent::ConfigError, "<client> sections required if allow_anonymous_source disabled"
    end

    @nodes = []

    @security.clients.each do |client|
      if client.host && client.network
        raise Fluent::ConfigError, "both of 'host' and 'network' are specified for client"
      end
      if !client.host && !client.network
        raise Fluent::ConfigError, "Either of 'host' and 'network' must be specified for client"
      end
      source = nil
      if client.host
        begin
          source = IPSocket.getaddress(client.host)
        rescue SocketError => e
          raise Fluent::ConfigError, "host '#{client.host}' cannot be resolved"
        end
      end
      source_addr = begin
                      IPAddr.new(source || client.network)
                    rescue ArgumentError => e
                      raise Fluent::ConfigError, "network '#{client.network}' address format is invalid"
                    end
      @nodes.push({
          address: source_addr,
          shared_key: (client.shared_key || @security.shared_key),
          users: client.users
        })
    end
  end
end

#listen(client) ⇒ Object



171
172
173
174
175
176
177
# File 'lib/fluent/plugin/in_forward.rb', line 171

def listen(client)
  log.info "listening fluent socket on #{@bind}:#{@port}"
  sock = client.listen_tcp(@bind, @port)
  s = Coolio::TCPServer.new(sock, nil, Handler, @linger_timeout, log, method(:handle_connection))
  s.listen(@backlog) unless @backlog.nil?
  s
end

#runObject

config_param :path, :string, :default => DEFAULT_SOCKET_PATH def listen

if File.exist?(@path)
  File.unlink(@path)
end
FileUtils.mkdir_p File.dirname(@path)
log.debug "listening fluent socket on #{@path}"
Coolio::UNIXServer.new(@path, Handler, method(:on_message))

end



189
190
191
192
193
194
# File 'lib/fluent/plugin/in_forward.rb', line 189

def run
  @loop.run(@blocking_timeout)
rescue => e
  log.error "unexpected error", error: e
  log.error_backtrace
end

#shutdownObject



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/fluent/plugin/in_forward.rb', line 152

def shutdown
  # In test cases it occasionally appeared that when detaching a watcher, another watcher is also detached.
  # In the case in the iteration of watchers, a watcher that has been already detached is intended to be detached
  # and therfore RuntimeError occurs saying that it is not attached to a loop.
  # It occurs only when testing for sending responses to ForwardOutput.
  # Sending responses needs to write the socket that is previously used only to read
  # and a handler has 2 watchers that is used to read and to write.
  # This problem occurs possibly because those watchers are thought to be related to each other
  # and when detaching one of them the other is also detached for some reasons.
  # As a workaround, check if watchers are attached before detaching them.
  @loop.watchers.each {|w| w.detach if w.attached? }
  @loop.stop
  @usock.close
  @thread.join
  @lsock.close

  super
end

#startObject



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# File 'lib/fluent/plugin/in_forward.rb', line 130

def start
  super

  @loop = Coolio::Loop.new

  socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
  if Fluent.windows?
    socket_manager_path = socket_manager_path.to_i
  end
  client = ServerEngine::SocketManager::Client.new(socket_manager_path)

  @lsock = listen(client)
  @loop.attach(@lsock)

  @usock = client.listen_udp(@bind, @port)
  @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
  @hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request))
  @loop.attach(@hbr)

  @thread = Thread.new(&method(:run))
end