Class: Fluent::SecureForwardInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_secure_forward.rb,
lib/fluent/plugin/in_secure_forward.rb

Defined Under Namespace

Classes: Session

Constant Summary collapse

DEFAULT_SECURE_LISTEN_PORT =
24284
HOSTNAME_PLACEHOLDERS =
[ '__HOSTNAME__', '${hostname}' ]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSecureForwardInput

Returns a new instance of SecureForwardInput.



91
92
93
94
# File 'lib/fluent/plugin/in_secure_forward.rb', line 91

def initialize
  super
  @cert = nil
end

Instance Attribute Details

#nodesObject (readonly)

Returns the value of attribute nodes.



77
78
79
# File 'lib/fluent/plugin/in_secure_forward.rb', line 77

def nodes
  @nodes
end

#read_intervalObject (readonly)

Returns the value of attribute read_interval.



64
65
66
# File 'lib/fluent/plugin/in_secure_forward.rb', line 64

def read_interval
  @read_interval
end

#sessionsObject (readonly)

node/socket/thread list which has sslsocket instance keepaliving to client



79
80
81
# File 'lib/fluent/plugin/in_secure_forward.rb', line 79

def sessions
  @sessions
end

#socket_intervalObject (readonly)

Returns the value of attribute socket_interval.



64
65
66
# File 'lib/fluent/plugin/in_secure_forward.rb', line 64

def socket_interval
  @socket_interval
end

Instance Method Details

#certificateObject



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/fluent/plugin/in_secure_forward.rb', line 202

def certificate
  return @cert, @key if @cert && @key

  @client_ca = nil
  if @cert_path
    @key = OpenSSL::PKey::RSA.new(File.read(@private_key_path), @private_key_passphrase)
    certs = Fluent::SecureForward::CertUtil.certificates_from_file(@cert_path)
    @cert = certs.shift
    @client_ca = certs
  elsif @ca_cert_path
    opts = {
      ca_cert_path: @ca_cert_path,
      ca_key_path: @ca_private_key_path,
      ca_key_passphrase: @ca_private_key_passphrase,
      private_key_length: @generate_private_key_length,
      country: @generate_cert_country,
      state: @generate_cert_state,
      locality: @generate_cert_locality,
      common_name: @generate_cert_common_name,
    }
    @cert, @key = Fluent::SecureForward::CertUtil.generate_server_pair(opts)
  else
    opts = {
      private_key_length: @generate_private_key_length,
      country: @generate_cert_country,
      state: @generate_cert_state,
      locality: @generate_cert_locality,
      common_name: @generate_cert_common_name,
    }
    @cert, @key = Fluent::SecureForward::CertUtil.generate_self_signed_server_pair(opts)
  end
  return @cert, @key
end

#configure(conf) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/fluent/plugin/in_secure_forward.rb', line 113

def configure(conf)
  hostname = conf.has_key?('hostname') ? conf['hostname'].to_s : Socket.gethostname
  replace_hostname_placeholder(conf, hostname)

  super

  if @secure
    unless @cert_path || @ca_cert_path
      raise Fluent::ConfigError, "cert_path or ca_cert_path required for secure communication"
    end
    if @cert_path
      raise Fluent::ConfigError, "private_key_path required" unless @private_key_path
      raise Fluent::ConfigError, "private_key_passphrase required" unless @private_key_passphrase
      certs = Fluent::SecureForward::CertUtil.certificates_from_file(@cert_path)
      if certs.size < 1
        raise Fluent::ConfigError, "no valid certificates in cert_path: #{@cert_path}"
      end
    else # @ca_cert_path
      raise Fluent::ConfigError, "ca_private_key_path required" unless @ca_private_key_path
      raise Fluent::ConfigError, "ca_private_key_passphrase required" unless @ca_private_key_passphrase
    end
  else
    log.warn "'insecure' mode has vulnerability for man-in-the-middle attacks for clients (output plugins)."
  end

  @read_interval = @read_interval_msec / 1000.0
  @socket_interval = @socket_interval_msec / 1000.0

  @nodes = []

  @clients.each do |client|
    if client.host && client.network
      raise Fluent::ConfigError, "both of 'host' and 'network' are specified for client"
    end
    if !client.host && !client.network
      raise Fluent::ConfigError, "Either of 'host' and 'network' must be specified for client"
    end
    source = nil
    if client.host
      begin
        source = IPSocket.getaddress(client.host)
      rescue SocketError => e
        raise Fluent::ConfigError, "host '#{client.host}' cannot be resolved"
      end
    end
    source_addr = begin
                    IPAddr.new(source || client.network)
                  rescue ArgumentError => e
                    raise Fluent::ConfigError, "network '#{client.network}' address format is invalid"
                  end
    @nodes.push({
        address: source_addr,
        shared_key: (client.shared_key || @shared_key),
        users: (client.users ? client.users.split(',') : nil)
      })
  end

  @generate_cert_common_name ||= @self_hostname

  # To check whether certificates are successfully generated/loaded at startup time
  self.certificate

  true
end

#on_message(msg) ⇒ Object



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
# File 'lib/fluent/plugin/in_secure_forward.rb', line 283

def on_message(msg)
  # NOTE: copy&paste from Fluent::ForwardInput#on_message(msg)

  # TODO: format error
  tag = msg[0].to_s
  entries = msg[1]

  if entries.class == String
    # PackedForward
    es = MessagePackEventStream.new(entries, @cached_unpacker)
    router.emit_stream(tag, es)

  elsif entries.class == Array
    # Forward
    es = Fluent::MultiEventStream.new
    entries.each {|e|
      time = e[0].to_i
      time = (now ||= Fluent::Engine.now) if time == 0
      record = e[1]
      es.add(time, record)
    }
    router.emit_stream(tag, es)

  else
    # Message
    time = msg[1]
    time = Fluent::Engine.now if time == 0
    record = msg[2]
    router.emit(tag, time, record)
  end
end

#replace_hostname_placeholder(conf, hostname) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/fluent/plugin/in_secure_forward.rb', line 98

def replace_hostname_placeholder(conf, hostname)
  replace_element = ->(c) {
    c.keys.each do |key|
      v = c[key]
      if v && v.respond_to?(:include?) && v.respond_to?(:gsub)
        if HOSTNAME_PLACEHOLDERS.any?{|ph| v.include?(ph) }
          c[key] = HOSTNAME_PLACEHOLDERS.inject(v){|r, ph| r.gsub(ph, hostname) }
        end
      end
    end
    c.elements.each{|e| replace_element.call(e) }
  }
  replace_element.call(conf)
end

#runObject

sslsocket server thread



236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# File 'lib/fluent/plugin/in_secure_forward.rb', line 236

def run # sslsocket server thread
  log.trace "setup for ssl sessions"
  cert, key = self.certificate

  ctx = OpenSSL::SSL::SSLContext.new(@ssl_version)
  if @secure
    # inject OpenSSL::SSL::SSLContext::DEFAULT_PARAMS
    # https://bugs.ruby-lang.org/issues/9424
    ctx.set_params({})

    if @ssl_ciphers
      ctx.ciphers = @ssl_ciphers
    else
      ### follow httpclient configuration by nahi
      # OpenSSL 0.9.8 default: "ALL:!ADH:!LOW:!EXP:!MD5:+SSLv2:@STRENGTH"
      ctx.ciphers = "ALL:!aNULL:!eNULL:!SSLv2" # OpenSSL >1.0.0 default
    end
  end

  ctx.cert = cert
  ctx.key = key
  if @client_ca
    ctx.extra_chain_cert = @client_ca
  end

  log.trace "start to listen", bind: @bind, port: @port
  server = TCPServer.new(@bind, @port)
  log.trace "starting SSL server", bind: @bind, port: @port
  @sock = OpenSSL::SSL::SSLServer.new(server, ctx)
  @sock.start_immediately = false
  begin
    log.trace "accepting sessions"
    loop do
      while socket = @sock.accept
        log.trace "accept tcp connection (ssl session not established yet)"
        @sessions.push Session.new(self, socket)

        # cleanup closed session instance
        @sessions.delete_if(&:closed?)
        log.trace "session instances:", all: @sessions.size, closed: @sessions.select(&:closed?).size
      end
    end
  rescue OpenSSL::SSL::SSLError => e
    raise unless e.message.start_with?('SSL_accept SYSCALL') # signal trap on accept
  end
end

#select_authenticate_users(node, username) ⇒ Object



194
195
196
197
198
199
200
# File 'lib/fluent/plugin/in_secure_forward.rb', line 194

def select_authenticate_users(node, username)
  if node.nil? || node[:users].nil?
    @users.select{|u| u.username == username}
  else
    @users.select{|u| node[:users].include?(u.username) && u.username == username}
  end
end

#shutdownObject



187
188
189
190
191
192
# File 'lib/fluent/plugin/in_secure_forward.rb', line 187

def shutdown
  @listener.kill
  @listener.join
  @sessions.each{ |s| s.shutdown }
  @sock.close
end

#startObject



178
179
180
181
182
183
184
185
# File 'lib/fluent/plugin/in_secure_forward.rb', line 178

def start
  super
  OpenSSL::Random.seed(SecureRandom.random_bytes(16))
  @sessions = []
  @sock = nil
  @listener = Thread.new(&method(:run))
  @listener.abort_on_exception
end