Class: Fluent::SecureForwardInput

Inherits:
Input
  • Object
show all
Includes:
Mixin::ConfigPlaceholders
Defined in:
lib/fluent/plugin/in_secure_forward.rb,
lib/fluent/plugin/in_secure_forward.rb

Defined Under Namespace

Classes: Session

Constant Summary collapse

DEFAULT_SECURE_LISTEN_PORT =
24284

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSecureForwardInput

Returns a new instance of SecureForwardInput.



66
67
68
69
70
71
72
73
# File 'lib/fluent/plugin/in_secure_forward.rb', line 66

def initialize
  super
  require 'ipaddr'
  require 'socket'
  require 'openssl'
  require 'digest'
  require 'securerandom'
end

Instance Attribute Details

#nodesObject (readonly)

Returns the value of attribute nodes.



62
63
64
# File 'lib/fluent/plugin/in_secure_forward.rb', line 62

def nodes
  @nodes
end

#read_intervalObject (readonly)

Returns the value of attribute read_interval.



49
50
51
# File 'lib/fluent/plugin/in_secure_forward.rb', line 49

def read_interval
  @read_interval
end

#sessionsObject (readonly)

node/socket/thread list which has sslsocket instance keepaliving to client



64
65
66
# File 'lib/fluent/plugin/in_secure_forward.rb', line 64

def sessions
  @sessions
end

#socket_intervalObject (readonly)

Returns the value of attribute socket_interval.



49
50
51
# File 'lib/fluent/plugin/in_secure_forward.rb', line 49

def socket_interval
  @socket_interval
end

Instance Method Details

#cert_auto_generateObject

meaningless for security…? not implemented yet config_param :dns_reverse_lookup_check, :bool, :default => false



33
# File 'lib/fluent/plugin/in_secure_forward.rb', line 33

config_param :cert_auto_generate, :bool, :default => false

#certificateObject



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/fluent/plugin/in_secure_forward.rb', line 147

def certificate
  return @cert, @key if @cert && @key

  if @cert_auto_generate
    key = OpenSSL::PKey::RSA.generate(@generate_private_key_length)

    digest = OpenSSL::Digest::SHA1.new
    issuer = subject = OpenSSL::X509::Name.new
    subject.add_entry('C', @generate_cert_country)
    subject.add_entry('ST', @generate_cert_state)
    subject.add_entry('L', @generate_cert_locality)
    subject.add_entry('CN', @generate_cert_common_name)

    cer = OpenSSL::X509::Certificate.new
    cer.not_before = Time.at(0)
    cer.not_after = Time.at(0)
    cer.public_key = key
    cer.serial = 1
    cer.issuer = issuer
    cer.subject  = subject
    cer.sign(key, digest)

    @cert = cer
    @key = key
    return @cert, @key
  end

  @cert = OpenSSL::X509::Certificate.new(File.read(@cert_file_path))
  @key = OpenSSL::PKey::RSA.new(File.read(@private_key_file), @private_key_passphrase)
end

#configure(conf) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/fluent/plugin/in_secure_forward.rb', line 80

def configure(conf)
  super

  unless @cert_auto_generate || @cert_file_path
    raise Fluent::ConfigError, "One of 'cert_auto_generate' or 'cert_file_path' must be specified"
  end

  @read_interval = @read_interval_msec / 1000.0
  @socket_interval = @socket_interval_msec / 1000.0

  @nodes = []

  @clients.each do |client|
    if client.host && client.network
      raise Fluent::ConfigError, "both of 'host' and 'network' are specified for client"
    end
    if !client.host && !client.network
      raise Fluent::ConfigError, "Either of 'host' and 'network' must be specified for client"
    end
    source = nil
    if client.host
      begin
        source = IPSocket.getaddress(client.host)
      rescue SocketError => e
        raise Fluent::ConfigError, "host '#{client.host}' cannot be resolved"
      end
    end
    source_addr = begin
                    IPAddr.new(source || client.network)
                  rescue ArgumentError => e
                    raise Fluent::ConfigError, "network '#{client.network}' address format is invalid"
                  end
    @nodes.push({
        address: source_addr,
        shared_key: (client.shared_key || @shared_key),
        users: (client.users ? client.users.split(',') : nil)
      })
  end

  @generate_cert_common_name ||= @self_hostname
  self.certificate
  true
end

#on_message(msg) ⇒ Object



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/fluent/plugin/in_secure_forward.rb', line 207

def on_message(msg)
  # NOTE: copy&paste from Fluent::ForwardInput#on_message(msg)

  # TODO: format error
  tag = msg[0].to_s
  entries = msg[1]

  if entries.class == String
    # PackedForward
    es = MessagePackEventStream.new(entries, @cached_unpacker)
    Fluent::Engine.emit_stream(tag, es)

  elsif entries.class == Array
    # Forward
    es = Fluent::MultiEventStream.new
    entries.each {|e|
      time = e[0].to_i
      time = (now ||= Fluent::Engine.now) if time == 0
      record = e[1]
      es.add(time, record)
    }
    Fluent::Engine.emit_stream(tag, es)

  else
    # Message
    time = msg[1]
    time = Fluent::Engine.now if time == 0
    record = msg[2]
    Fluent::Engine.emit(tag, time, record)
  end
end

#runObject

sslsocket server thread



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/fluent/plugin/in_secure_forward.rb', line 178

def run # sslsocket server thread
  log.trace "setup for ssl sessions"
  cert, key = self.certificate
  ctx = OpenSSL::SSL::SSLContext.new
  ctx.cert = cert
  ctx.key = key

  log.trace "start to listen", :bind => @bind, :port => @port
  server = TCPServer.new(@bind, @port)
  log.trace "starting SSL server", :bind => @bind, :port => @port
  @sock = OpenSSL::SSL::SSLServer.new(server, ctx)
  @sock.start_immediately = false
  begin
    log.trace "accepting sessions"
    loop do
      while socket = @sock.accept
        log.trace "accept tcp connection (ssl session not established yet)"
        @sessions.push Session.new(self, socket)

        # cleanup closed session instance
        @sessions.delete_if(&:closed?)
        log.trace "session instances:", :all => @sessions.size, :closed => @sessions.select(&:closed?).size
      end
    end
  rescue OpenSSL::SSL::SSLError => e
    raise unless e.message.start_with?('SSL_accept SYSCALL') # signal trap on accept
  end
end

#select_authenticate_users(node, username) ⇒ Object



139
140
141
142
143
144
145
# File 'lib/fluent/plugin/in_secure_forward.rb', line 139

def select_authenticate_users(node, username)
  if node.nil? || node[:users].nil?
    @users.select{|u| u.username == username}
  else
    @users.select{|u| node[:users].include?(u.username) && u.username == username}
  end
end

#shutdownObject



132
133
134
135
136
137
# File 'lib/fluent/plugin/in_secure_forward.rb', line 132

def shutdown
  @listener.kill
  @listener.join
  @sessions.each{ |s| s.shutdown }
  @sock.close
end

#startObject



124
125
126
127
128
129
130
# File 'lib/fluent/plugin/in_secure_forward.rb', line 124

def start
  super
  OpenSSL::Random.seed(SecureRandom.random_bytes(16))
  @sessions = []
  @sock = nil
  @listener = Thread.new(&method(:run))
end