Class: Fluent::SecureForwardInput

Inherits:
Input
  • Object
show all
Includes:
Mixin::ConfigPlaceholders
Defined in:
lib/fluent/plugin/in_secure_forward.rb,
lib/fluent/plugin/in_secure_forward.rb

Defined Under Namespace

Classes: Session

Constant Summary collapse

DEFAULT_SECURE_LISTEN_PORT =
24284

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSecureForwardInput

Returns a new instance of SecureForwardInput.



66
67
68
69
70
71
72
# File 'lib/fluent/plugin/in_secure_forward.rb', line 66

def initialize
  super
  require 'ipaddr'
  require 'socket'
  require 'openssl'
  require 'digest'
end

Instance Attribute Details

#nodesObject (readonly)

Returns the value of attribute nodes.



62
63
64
# File 'lib/fluent/plugin/in_secure_forward.rb', line 62

def nodes
  @nodes
end

#read_intervalObject (readonly)

Returns the value of attribute read_interval.



49
50
51
# File 'lib/fluent/plugin/in_secure_forward.rb', line 49

def read_interval
  @read_interval
end

#sessionsObject (readonly)

node/socket/thread list which has sslsocket instance keepaliving to client



64
65
66
# File 'lib/fluent/plugin/in_secure_forward.rb', line 64

def sessions
  @sessions
end

#socket_intervalObject (readonly)

Returns the value of attribute socket_interval.



49
50
51
# File 'lib/fluent/plugin/in_secure_forward.rb', line 49

def socket_interval
  @socket_interval
end

Instance Method Details

#cert_auto_generateObject

meaningless for security…? not implemented yet config_param :dns_reverse_lookup_check, :bool, :default => false



33
# File 'lib/fluent/plugin/in_secure_forward.rb', line 33

config_param :cert_auto_generate, :bool, :default => false

#certificateObject



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/fluent/plugin/in_secure_forward.rb', line 146

def certificate
  return @cert, @key if @cert && @key

  if @cert_auto_generate
    key = OpenSSL::PKey::RSA.generate(@generate_private_key_length)

    digest = OpenSSL::Digest::SHA1.new
    issuer = subject = OpenSSL::X509::Name.new
    subject.add_entry('C', @generate_cert_country)
    subject.add_entry('ST', @generate_cert_state)
    subject.add_entry('L', @generate_cert_locality)
    subject.add_entry('CN', @generate_cert_common_name)

    cer = OpenSSL::X509::Certificate.new
    cer.not_before = Time.at(0)
    cer.not_after = Time.at(0)
    cer.public_key = key
    cer.serial = 1
    cer.issuer = issuer
    cer.subject  = subject
    cer.sign(key, digest)

    @cert = cer
    @key = key
    return @cert, @key
  end

  @cert = OpenSSL::X509::Certificate.new(File.read(@cert_file_path))
  @key = OpenSSL::PKey::RSA.new(File.read(@private_key_file), @private_key_passphrase)
end

#configure(conf) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/fluent/plugin/in_secure_forward.rb', line 79

def configure(conf)
  super

  unless @cert_auto_generate || @cert_file_path
    raise Fluent::ConfigError, "One of 'cert_auto_generate' or 'cert_file_path' must be specified"
  end

  @read_interval = @read_interval_msec / 1000.0
  @socket_interval = @socket_interval_msec / 1000.0

  @nodes = []

  @clients.each do |client|
    if client.host && client.network
      raise Fluent::ConfigError, "both of 'host' and 'network' are specified for client"
    end
    if !client.host && !client.network
      raise Fluent::ConfigError, "Either of 'host' and 'network' must be specified for client"
    end
    source = nil
    if client.host
      begin
        source = IPSocket.getaddress(client.host)
      rescue SocketError => e
        raise Fluent::ConfigError, "host '#{client.host}' cannot be resolved"
      end
    end
    source_addr = begin
                    IPAddr.new(source || client.network)
                  rescue ArgumentError => e
                    raise Fluent::ConfigError, "network '#{client.network}' address format is invalid"
                  end
    @nodes.push({
        address: source_addr,
        shared_key: (client.shared_key || @shared_key),
        users: (client.users ? client.users.split(',') : nil)
      })
  end

  @generate_cert_common_name ||= @self_hostname
  self.certificate
  true
end

#on_message(msg) ⇒ Object



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/fluent/plugin/in_secure_forward.rb', line 202

def on_message(msg)
  # NOTE: copy&paste from Fluent::ForwardInput#on_message(msg)

  # TODO: format error
  tag = msg[0].to_s
  entries = msg[1]

  if entries.class == String
    # PackedForward
    es = MessagePackEventStream.new(entries, @cached_unpacker)
    Fluent::Engine.emit_stream(tag, es)

  elsif entries.class == Array
    # Forward
    es = Fluent::MultiEventStream.new
    entries.each {|e|
      time = e[0].to_i
      time = (now ||= Fluent::Engine.now) if time == 0
      record = e[1]
      es.add(time, record)
    }
    Fluent::Engine.emit_stream(tag, es)

  else
    # Message
    time = msg[1]
    time = Fluent::Engine.now if time == 0
    record = msg[2]
    Fluent::Engine.emit(tag, time, record)
  end
end

#runObject

sslsocket server thread



177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/fluent/plugin/in_secure_forward.rb', line 177

def run # sslsocket server thread
  log.trace "setup for ssl sessions"
  cert, key = self.certificate
  ctx = OpenSSL::SSL::SSLContext.new
  ctx.cert = cert
  ctx.key = key

  log.trace "start to listen", :bind => @bind, :port => @port
  server = TCPServer.new(@bind, @port)
  log.trace "starting SSL server", :bind => @bind, :port => @port
  @sock = OpenSSL::SSL::SSLServer.new(server, ctx)
  @sock.start_immediately = false
  begin
    log.trace "accepting sessions"
    loop do
      while socket = @sock.accept
        log.trace "accept tcp connection (ssl session not established yet)"
        @sessions.push Session.new(self, socket)
      end
    end
  rescue OpenSSL::SSL::SSLError => e
    raise unless e.message.start_with?('SSL_accept SYSCALL') # signal trap on accept
  end
end

#select_authenticate_users(node, username) ⇒ Object



138
139
140
141
142
143
144
# File 'lib/fluent/plugin/in_secure_forward.rb', line 138

def select_authenticate_users(node, username)
  if node.nil? || node[:users].nil?
    @users.select{|u| u.username == username}
  else
    @users.select{|u| node[:users].include?(u.username) && u.username == username}
  end
end

#shutdownObject



131
132
133
134
135
136
# File 'lib/fluent/plugin/in_secure_forward.rb', line 131

def shutdown
  @listener.kill
  @listener.join
  @sessions.each{ |s| s.shutdown }
  @sock.close
end

#startObject



123
124
125
126
127
128
129
# File 'lib/fluent/plugin/in_secure_forward.rb', line 123

def start
  super
  OpenSSL::Random.seed(File.read("/dev/urandom", 16))
  @sessions = []
  @sock = nil
  @listener = Thread.new(&method(:run))
end