Class: Fluent::SecureForwardInput

Inherits:
Input
  • Object
show all
Includes:
Mixin::ConfigPlaceholders
Defined in:
lib/fluent/plugin/in_secure_forward.rb,
lib/fluent/plugin/in_secure_forward.rb

Defined Under Namespace

Classes: Session

Constant Summary collapse

DEFAULT_SECURE_LISTEN_PORT =
24284

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSecureForwardInput

Returns a new instance of SecureForwardInput.



75
76
77
78
79
80
81
82
# File 'lib/fluent/plugin/in_secure_forward.rb', line 75

def initialize
  super
  require 'ipaddr'
  require 'socket'
  require 'openssl'
  require 'digest'
  require 'securerandom'
end

Instance Attribute Details

#nodesObject (readonly)

Returns the value of attribute nodes.



71
72
73
# File 'lib/fluent/plugin/in_secure_forward.rb', line 71

def nodes
  @nodes
end

#read_intervalObject (readonly)

Returns the value of attribute read_interval.



58
59
60
# File 'lib/fluent/plugin/in_secure_forward.rb', line 58

def read_interval
  @read_interval
end

#sessionsObject (readonly)

node/socket/thread list which has sslsocket instance keepaliving to client



73
74
75
# File 'lib/fluent/plugin/in_secure_forward.rb', line 73

def sessions
  @sessions
end

#socket_intervalObject (readonly)

Returns the value of attribute socket_interval.



58
59
60
# File 'lib/fluent/plugin/in_secure_forward.rb', line 58

def socket_interval
  @socket_interval
end

Instance Method Details

#certificateObject



180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/fluent/plugin/in_secure_forward.rb', line 180

def certificate
  return @cert, @key if @cert && @key

  @client_ca = nil
  if @cert_path
    @key = OpenSSL::PKey::RSA.new(File.read(@private_key_path), @private_key_passphrase)
    certs = Fluent::SecureForward::CertUtil.certificates_from_file(@cert_path)
    @cert = certs.shift
    @client_ca = certs
  elsif @ca_cert_path
    opts = {
      ca_cert_path: @ca_cert_path,
      ca_key_path: @ca_private_key_path,
      ca_key_passphrase: @ca_private_key_passphrase,
      private_key_length: @generate_private_key_length,
      country: @generate_cert_country,
      state: @generate_cert_state,
      locality: @generate_cert_locality,
      common_name: @generate_cert_common_name,
    }
    @cert, @key = Fluent::SecureForward::CertUtil.generate_server_pair(opts)
  else
    opts = {
      private_key_length: @generate_private_key_length,
      country: @generate_cert_country,
      state: @generate_cert_state,
      locality: @generate_cert_locality,
      common_name: @generate_cert_common_name,
    }
    @cert, @key = Fluent::SecureForward::CertUtil.generate_self_signed_server_pair(opts)
  end
  return @cert, @key
end

#configure(conf) ⇒ Object



94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/fluent/plugin/in_secure_forward.rb', line 94

def configure(conf)
  super

  if @secure
    unless @cert_path || @ca_cert_path
      raise Fluent::ConfigError, "cert_path or ca_cert_path required for secure communication"
    end
    if @cert_path
      raise Fluent::ConfigError, "private_key_path required" unless @private_key_path
      raise Fluent::ConfigError, "private_key_passphrase required" unless @private_key_passphrase
      certs = Fluent::SecureForward::CertUtil.certificates_from_file(@cert_path)
      if certs.size < 1
        raise Fluent::ConfigError, "no valid certificates in cert_path: #{@cert_path}"
      end
    else # @ca_cert_path
      raise Fluent::ConfigError, "ca_private_key_path required" unless @ca_private_key_path
      raise Fluent::ConfigError, "ca_private_key_passphrase required" unless @ca_private_key_passphrase
    end
  else
    log.warn "'insecure' mode has vulnerability for man-in-the-middle attacks for clients (output plugins)."
  end

  @read_interval = @read_interval_msec / 1000.0
  @socket_interval = @socket_interval_msec / 1000.0

  @nodes = []

  @clients.each do |client|
    if client.host && client.network
      raise Fluent::ConfigError, "both of 'host' and 'network' are specified for client"
    end
    if !client.host && !client.network
      raise Fluent::ConfigError, "Either of 'host' and 'network' must be specified for client"
    end
    source = nil
    if client.host
      begin
        source = IPSocket.getaddress(client.host)
      rescue SocketError => e
        raise Fluent::ConfigError, "host '#{client.host}' cannot be resolved"
      end
    end
    source_addr = begin
                    IPAddr.new(source || client.network)
                  rescue ArgumentError => e
                    raise Fluent::ConfigError, "network '#{client.network}' address format is invalid"
                  end
    @nodes.push({
        address: source_addr,
        shared_key: (client.shared_key || @shared_key),
        users: (client.users ? client.users.split(',') : nil)
      })
  end

  @generate_cert_common_name ||= @self_hostname

  # To check whether certificates are successfully generated/loaded at startup time
  self.certificate

  true
end

#on_message(msg) ⇒ Object



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/fluent/plugin/in_secure_forward.rb', line 261

def on_message(msg)
  # NOTE: copy&paste from Fluent::ForwardInput#on_message(msg)

  # TODO: format error
  tag = msg[0].to_s
  entries = msg[1]

  if entries.class == String
    # PackedForward
    es = MessagePackEventStream.new(entries, @cached_unpacker)
    router.emit_stream(tag, es)

  elsif entries.class == Array
    # Forward
    es = Fluent::MultiEventStream.new
    entries.each {|e|
      time = e[0].to_i
      time = (now ||= Fluent::Engine.now) if time == 0
      record = e[1]
      es.add(time, record)
    }
    router.emit_stream(tag, es)

  else
    # Message
    time = msg[1]
    time = Fluent::Engine.now if time == 0
    record = msg[2]
    router.emit(tag, time, record)
  end
end

#runObject

sslsocket server thread



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
# File 'lib/fluent/plugin/in_secure_forward.rb', line 214

def run # sslsocket server thread
  log.trace "setup for ssl sessions"
  cert, key = self.certificate

  ctx = OpenSSL::SSL::SSLContext.new(@ssl_version)
  if @secure
    # inject OpenSSL::SSL::SSLContext::DEFAULT_PARAMS
    # https://bugs.ruby-lang.org/issues/9424
    ctx.set_params({})

    if @ssl_ciphers
      ctx.ciphers = @ssl_ciphers
    else
      ### follow httpclient configuration by nahi
      # OpenSSL 0.9.8 default: "ALL:!ADH:!LOW:!EXP:!MD5:+SSLv2:@STRENGTH"
      ctx.ciphers = "ALL:!aNULL:!eNULL:!SSLv2" # OpenSSL >1.0.0 default
    end
  end

  ctx.cert = cert
  ctx.key = key
  if @client_ca
    ctx.extra_chain_cert = @client_ca
  end

  log.trace "start to listen", bind: @bind, port: @port
  server = TCPServer.new(@bind, @port)
  log.trace "starting SSL server", bind: @bind, port: @port
  @sock = OpenSSL::SSL::SSLServer.new(server, ctx)
  @sock.start_immediately = false
  begin
    log.trace "accepting sessions"
    loop do
      while socket = @sock.accept
        log.trace "accept tcp connection (ssl session not established yet)"
        @sessions.push Session.new(self, socket)

        # cleanup closed session instance
        @sessions.delete_if(&:closed?)
        log.trace "session instances:", all: @sessions.size, closed: @sessions.select(&:closed?).size
      end
    end
  rescue OpenSSL::SSL::SSLError => e
    raise unless e.message.start_with?('SSL_accept SYSCALL') # signal trap on accept
  end
end

#select_authenticate_users(node, username) ⇒ Object



172
173
174
175
176
177
178
# File 'lib/fluent/plugin/in_secure_forward.rb', line 172

def select_authenticate_users(node, username)
  if node.nil? || node[:users].nil?
    @users.select{|u| u.username == username}
  else
    @users.select{|u| node[:users].include?(u.username) && u.username == username}
  end
end

#shutdownObject



165
166
167
168
169
170
# File 'lib/fluent/plugin/in_secure_forward.rb', line 165

def shutdown
  @listener.kill
  @listener.join
  @sessions.each{ |s| s.shutdown }
  @sock.close
end

#startObject



156
157
158
159
160
161
162
163
# File 'lib/fluent/plugin/in_secure_forward.rb', line 156

def start
  super
  OpenSSL::Random.seed(SecureRandom.random_bytes(16))
  @sessions = []
  @sock = nil
  @listener = Thread.new(&method(:run))
  @listener.abort_on_exception
end