Class: Fluent::SecureForwardInput

Inherits:
Input
  • Object
show all
Includes:
Mixin::ConfigPlaceholders
Defined in:
lib/fluent/plugin/in_secure_forward.rb,
lib/fluent/plugin/in_secure_forward.rb

Defined Under Namespace

Classes: Session

Constant Summary collapse

DEFAULT_SECURE_LISTEN_PORT =
24284

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeSecureForwardInput

Returns a new instance of SecureForwardInput.



65
66
67
68
69
70
# File 'lib/fluent/plugin/in_secure_forward.rb', line 65

def initialize
  super
  require 'socket'
  require 'openssl'
  require 'digest'
end

Instance Attribute Details

#nodesObject (readonly)

<user>

username ....
password ....

</user>



56
57
58
# File 'lib/fluent/plugin/in_secure_forward.rb', line 56

def nodes
  @nodes
end

#read_intervalObject (readonly)

Returns the value of attribute read_interval.



49
50
51
# File 'lib/fluent/plugin/in_secure_forward.rb', line 49

def read_interval
  @read_interval
end

#sessionsObject (readonly)

<client>

host ipaddr/hostname
shared_key .... # optional shared key
users username,list,of,allowed

</client>



63
64
65
# File 'lib/fluent/plugin/in_secure_forward.rb', line 63

def sessions
  @sessions
end

#socket_intervalObject (readonly)

Returns the value of attribute socket_interval.



49
50
51
# File 'lib/fluent/plugin/in_secure_forward.rb', line 49

def socket_interval
  @socket_interval
end

#usersObject (readonly)

list of (username, password) by <user> tag



51
52
53
# File 'lib/fluent/plugin/in_secure_forward.rb', line 51

def users
  @users
end

Instance Method Details

#cert_auto_generateObject

meaningless for security…? not implemented yet config_param :dns_reverse_lookup_check, :bool, :default => false



33
# File 'lib/fluent/plugin/in_secure_forward.rb', line 33

config_param :cert_auto_generate, :bool, :default => false

#certificateObject



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/fluent/plugin/in_secure_forward.rb', line 141

def certificate
  return @cert, @key if @cert && @key

  if @cert_auto_generate
    key = OpenSSL::PKey::RSA.generate(@generate_private_key_length)

    digest = OpenSSL::Digest::SHA1.new
    issuer = subject = OpenSSL::X509::Name.new
    subject.add_entry('C', @generate_cert_country)
    subject.add_entry('ST', @generate_cert_state)
    subject.add_entry('L', @generate_cert_locality)
    subject.add_entry('CN', @generate_cert_common_name)

    cer = OpenSSL::X509::Certificate.new
    cer.not_before = Time.at(0)
    cer.not_after = Time.at(0)
    cer.public_key = key
    cer.serial = 1
    cer.issuer = issuer
    cer.subject  = subject
    cer.sign(key, digest)

    @cert = cer
    @key = key
    return @cert, @key
  end

  @cert = OpenSSL::X509::Certificate.new(File.read(@cert_file_path))
  @key = OpenSSL::PKey::RSA.new(File.read(@private_key_file), @private_key_passphrase)
end

#configure(conf) ⇒ Object



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/fluent/plugin/in_secure_forward.rb', line 77

def configure(conf)
  super

  unless @cert_auto_generate || @cert_file_path
    raise Fluent::ConfigError, "One of 'cert_auto_generate' or 'cert_file_path' must be specified"
  end

  @read_interval = @read_interval_msec / 1000.0
  @socket_interval = @socket_interval_msec / 1000.0

  @users = []
  @nodes = []
  conf.elements.each do |element|
    case element.name
    when 'user'
      unless element['username'] && element['password']
        raise Fluent::ConfigError, "username/password pair missing in <user>"
      end
      @users.push({
          username: element['username'],
          password: element['password']
        })
    when 'client'
      unless element['host']
        raise Fluent::ConfigError, "host missing in <client>"
      end
      @nodes.push({
          host: element['host'],
          shared_key: (element['shared_key'] || @shared_key),
          users: (element['users'] ? element['users'].split(',') : nil),
        })
    else
      raise Fluent::ConfigError, "unknown config tag name"
    end
  end

  @generate_cert_common_name ||= @self_hostname
  self.certificate
  true
end

#on_message(msg) ⇒ Object



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/fluent/plugin/in_secure_forward.rb', line 197

def on_message(msg)
  # NOTE: copy&paste from Fluent::ForwardInput#on_message(msg)

  # TODO: format error
  tag = msg[0].to_s
  entries = msg[1]

  if entries.class == String
    # PackedForward
    es = MessagePackEventStream.new(entries, @cached_unpacker)
    Fluent::Engine.emit_stream(tag, es)

  elsif entries.class == Array
    # Forward
    es = Fluent::MultiEventStream.new
    entries.each {|e|
      time = e[0].to_i
      time = (now ||= Fluent::Engine.now) if time == 0
      record = e[1]
      es.add(time, record)
    }
    Fluent::Engine.emit_stream(tag, es)

  else
    # Message
    time = msg[1]
    time = Fluent::Engine.now if time == 0
    record = msg[2]
    Fluent::Engine.emit(tag, time, record)
  end
end

#runObject

sslsocket server thread



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/fluent/plugin/in_secure_forward.rb', line 172

def run # sslsocket server thread
  log.trace "setup for ssl sessions"
  cert, key = self.certificate
  ctx = OpenSSL::SSL::SSLContext.new
  ctx.cert = cert
  ctx.key = key

  log.trace "start to listen", :bind => @bind, :port => @port
  server = TCPServer.new(@bind, @port)
  log.trace "starting SSL server", :bind => @bind, :port => @port
  @sock = OpenSSL::SSL::SSLServer.new(server, ctx)
  @sock.start_immediately = false
  begin
    log.trace "accepting sessions"
    loop do
      while socket = @sock.accept
        log.trace "accept tcp connection (ssl session not established yet)"
        @sessions.push Session.new(self, socket)
      end
    end
  rescue OpenSSL::SSL::SSLError => e
    raise unless e.message.start_with?('SSL_accept SYSCALL') # signal trap on accept
  end
end

#select_authenticate_users(node, username) ⇒ Object



133
134
135
136
137
138
139
# File 'lib/fluent/plugin/in_secure_forward.rb', line 133

def select_authenticate_users(node, username)
  if node.nil? || node[:users].nil?
    @users.select{|u| u[:username] == username}
  else
    @users.select{|u| node[:users].include?(u[:username]) && u[:username] == username}
  end
end

#shutdownObject



126
127
128
129
130
131
# File 'lib/fluent/plugin/in_secure_forward.rb', line 126

def shutdown
  @listener.kill
  @listener.join
  @sessions.each{ |s| s.shutdown }
  @sock.close
end

#startObject



118
119
120
121
122
123
124
# File 'lib/fluent/plugin/in_secure_forward.rb', line 118

def start
  super
  OpenSSL::Random.seed(File.read("/dev/urandom", 16))
  @sessions = []
  @sock = nil
  @listener = Thread.new(&method(:run))
end