Class: Fluent::SecureForwardOutput::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/output_node.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sender, conf) ⇒ Node

Returns a new instance of Node.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/fluent/plugin/output_node.rb', line 21

def initialize(sender, conf)
  @sender = sender
  @shared_key = conf.shared_key || sender.shared_key

  @host = conf.host
  @port = conf.port
  @hostlabel = conf.hostlabel || conf.host
  @username = conf.username
  @password = conf.password
  @standby = conf.standby

  @proxy_uri = conf.proxy_uri

  @keepalive = sender.keepalive
  @connection_hard_timeout = sender.connection_hard_timeout

  @authentication = nil

  @writing = false

  @expire = nil
  @first_session = false
  @detach = false

  @socket = nil
  @sslsession = nil
  @unpacker = MessagePack::Unpacker.new

  @shared_key_salt = generate_salt
  @state = :helo
  @mtime = Time.now
  @thread = nil
end

Instance Attribute Details

#authenticationObject

Returns the value of attribute authentication.



14
15
16
# File 'lib/fluent/plugin/output_node.rb', line 14

def authentication
  @authentication
end

#detachObject

Returns the value of attribute detach.



17
18
19
# File 'lib/fluent/plugin/output_node.rb', line 17

def detach
  @detach
end

#expireObject (readonly)

Returns the value of attribute expire.



19
20
21
# File 'lib/fluent/plugin/output_node.rb', line 19

def expire
  @expire
end

#first_sessionObject

Returns the value of attribute first_session.



17
18
19
# File 'lib/fluent/plugin/output_node.rb', line 17

def first_session
  @first_session
end

#hostObject

Returns the value of attribute host.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def host
  @host
end

#hostlabelObject

Returns the value of attribute hostlabel.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def hostlabel
  @hostlabel
end

#keepaliveObject

Returns the value of attribute keepalive.



14
15
16
# File 'lib/fluent/plugin/output_node.rb', line 14

def keepalive
  @keepalive
end

#passwordObject

Returns the value of attribute password.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def password
  @password
end

#portObject

Returns the value of attribute port.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def port
  @port
end

#shared_keyObject

Returns the value of attribute shared_key.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def shared_key
  @shared_key
end

#shared_key_saltObject

Returns the value of attribute shared_key_salt.



15
16
17
# File 'lib/fluent/plugin/output_node.rb', line 15

def shared_key_salt
  @shared_key_salt
end

#socketObject

Returns the value of attribute socket.



15
16
17
# File 'lib/fluent/plugin/output_node.rb', line 15

def socket
  @socket
end

#sslsessionObject

Returns the value of attribute sslsession.



15
16
17
# File 'lib/fluent/plugin/output_node.rb', line 15

def sslsession
  @sslsession
end

#standbyObject

Returns the value of attribute standby.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def standby
  @standby
end

#stateObject

Returns the value of attribute state.



15
16
17
# File 'lib/fluent/plugin/output_node.rb', line 15

def state
  @state
end

#unpackerObject

Returns the value of attribute unpacker.



15
16
17
# File 'lib/fluent/plugin/output_node.rb', line 15

def unpacker
  @unpacker
end

#usernameObject

Returns the value of attribute username.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def username
  @username
end

Instance Method Details

#check_helo(message) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/fluent/plugin/output_node.rb', line 132

def check_helo(message)
  log.debug "checking helo"
  # ['HELO', options(hash)]
  unless message.size == 2 && message[0] == 'HELO'
    return false
  end
  opts = message[1]
  @shared_key_nonce = opts['nonce'] || '' # make shared_key_check failed (instead of error) if protocol version mismatch exist
  @authentication = opts['auth']
  @allow_keepalive = opts['keepalive']
  @mtime = Time.now
  true
end

#check_pong(message) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# File 'lib/fluent/plugin/output_node.rb', line 162

def check_pong(message)
  log.debug "checking pong"
  # ['PONG', bool(authentication result), 'reason if authentication failed',
  #  self_hostname, sha512\_hex(salt + self_hostname + nonce + sharedkey)]
  unless message.size == 5 && message[0] == 'PONG'
    return false, 'invalid format for PONG message'
  end
  _pong, auth_result, reason, hostname, shared_key_hexdigest = message

  unless auth_result
    return false, 'authentication failed: ' + reason
  end

  if hostname == @sender.self_hostname
    return false, 'same hostname between input and output: invalid configuration'
  end

  clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(@shared_key_nonce).update(@shared_key).hexdigest
  unless shared_key_hexdigest == clientside
    return false, 'shared key mismatch'
  end

  @mtime = Time.now
  return true, nil
end

#connectObject



226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
# File 'lib/fluent/plugin/output_node.rb', line 226

def connect
  Thread.current.abort_on_exception = true
  log.debug "starting client"

  begin
    addr = @sender.hostname_resolver.getaddress(@host)
    log.debug "create tcp socket to node", host: @host, address: addr, port: @port
  rescue => e
    log.warn "failed to resolve the hostname", error_class: e.class, error: e, host: @host
    @state = :failed
    return
  end

  begin
    if @proxy_uri.nil? then
      sock = TCPSocket.new(addr, @port)
    else
      proxy = Proxifier::Proxy(@proxy_uri)
      sock = proxy.open(addr, @port)
    end
  rescue => e
    log.warn "failed to connect for secure-forward", error_class: e.class, error: e, host: @host, address: addr, port: @port
    @state = :failed
    return
  end

  log.trace "changing socket options"
  opt = [1, @sender.send_timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)

  opt = [@sender.send_timeout.to_i, 0].pack('L!L!')  # struct timeval
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

  log.trace "initializing SSL contexts"

  context = OpenSSL::SSL::SSLContext.new(@sender.ssl_version)

  log.trace "setting SSL verification options"

  if @sender.secure
    # inject OpenSSL::SSL::SSLContext::DEFAULT_PARAMS
    # https://bugs.ruby-lang.org/issues/9424
    context.set_params({})

    if @sender.ssl_ciphers
      context.ciphers = @sender.ssl_ciphers
    else
      ### follow httpclient configuration by nahi
      # OpenSSL 0.9.8 default: "ALL:!ADH:!LOW:!EXP:!MD5:+SSLv2:@STRENGTH"
      context.ciphers = "ALL:!aNULL:!eNULL:!SSLv2" # OpenSSL >1.0.0 default
    end

    log.trace "set verify_mode VERIFY_PEER"
    context.verify_mode = OpenSSL::SSL::VERIFY_PEER
    if @sender.enable_strict_verification
      context.cert_store = OpenSSL::X509::Store.new
      begin
        context.cert_store.set_default_paths
      rescue OpenSSL::X509::StoreError => e
        log.warn "faild to load system default certificates", error: e
      end
    end
    if @sender.ca_cert_path
      log.trace "set to use private CA", path: @sender.ca_cert_path
      context.ca_file = @sender.ca_cert_path
    end
  end

  log.debug "trying to connect ssl session", host: @host, address: addr, port: @port
  begin
    sslsession = OpenSSL::SSL::SSLSocket.new(sock, context)
    log.trace "connecting...", host: @host, address: addr, port: @port
    sslsession.connect
    @mtime = Time.now
  rescue => e
    log.warn "failed to establish SSL connection", error_class: e.class, error: e, host: @host, address: addr, port: @port
    @state = :failed
    return
  end

  log.debug "ssl session connected", host: @host, port: @port

  begin
    if @sender.enable_strict_verification
      log.debug "checking peer's certificate", subject: sslsession.peer_cert.subject
      sslsession.post_connection_check(@hostlabel)
      verify = sslsession.verify_result
      if verify != OpenSSL::X509::V_OK
        err_name = Fluent::SecureForwardOutput::OpenSSLUtil.verify_result_name(verify)
        log.warn "BUG: failed to verify certification while connecting host #{@host} as #{@hostlabel} (but not raised, why?)"
        log.warn "BUG: verify_result: #{err_name}"
        raise RuntimeError, "BUG: failed to verify certification and to handle it correctly while connecting host #{@host} as #{@hostlabel}"
      end
    end
  rescue OpenSSL::SSL::SSLError => e
    log.warn "failed to verify certification while connecting ssl session", host: @host, hostlabel: @hostlabel
    self.shutdown
    raise
  end

  log.debug "ssl session connected", host: @host, port: @port
  @socket = sock
  @sslsession = sslsession

  buf = ''
  read_length = @sender.read_length
  read_interval = @sender.read_interval
  socket_interval = @sender.socket_interval

  @mtime = Time.now

  loop do
    break if @detach
    break if @connection_hard_timeout && Time.now > @mtime + @connection_hard_timeout

    begin
      while @sslsession.read_nonblock(read_length, buf)
        if buf == ''
          sleep read_interval
          next
        end
        @unpacker.feed_each(buf, &method(:on_read))
        @mtime = Time.now
        buf = ''
      end
    rescue OpenSSL::SSL::SSLError => e
      # to wait i/o restart
      log.trace "SSLError", error_class: e.class, error: e, mtime: @mtime, host: @host, port: @port
      if @connection_hard_timeout && Time.now > @mtime + @connection_hard_timeout
        log.warn "connection hard timeout", mtime: @mtime, timeout: @connection_hard_timeout, host: @host, port: @port
        log.warn "aborting connection", host: @host, port: @port
        self.release!
        self.detach!
        break
      else
        sleep socket_interval
      end
    rescue SystemCallError => e
      log.warn "disconnected by Error", error_class: e.class, error: e, host: @host, port: @port
      self.release!
      self.detach!
      break
    rescue EOFError
      log.warn "disconnected", host: @host, port: @port
      self.release!
      self.detach!
      break
    end
  end
  while @writing
    break if @detach

    sleep read_interval
  end
  self.shutdown
end

#detach!Object



71
72
73
# File 'lib/fluent/plugin/output_node.rb', line 71

def detach!
  @detach = true
end

#detached?Boolean

Returns:

  • (Boolean)


75
76
77
# File 'lib/fluent/plugin/output_node.rb', line 75

def detached?
  @detach
end

#dupObject



59
60
61
62
63
64
65
# File 'lib/fluent/plugin/output_node.rb', line 59

def dup
  renewed = self.class.new(
    @sender,
    Fluent::Config::Section.new({host: @host, port: @port, hostlabel: @hostlabel, username: @username, password: @password, shared_key: @shared_key, standby: @standby, proxy_uri: @proxy_uri})
  )
  renewed
end

#established?Boolean

Returns:

  • (Boolean)


116
117
118
# File 'lib/fluent/plugin/output_node.rb', line 116

def established?
  @state == :established
end

#expired?Boolean

Returns:

  • (Boolean)


120
121
122
123
124
125
126
# File 'lib/fluent/plugin/output_node.rb', line 120

def expired?
  if @keepalive.nil? || @keepalive == 0
    false
  else
    @expire && @expire < Time.now
  end
end

#generate_pingObject



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/fluent/plugin/output_node.rb', line 146

def generate_ping
  log.debug "generating ping"
  # ['PING', self_hostname, sharedkey\_salt, sha512\_hex(sharedkey\_salt + self_hostname + nonce + shared_key),
  #  username || '', sha512\_hex(auth\_salt + username + password) || '']
  shared_key_hexdigest = Digest::SHA512.new.update(@shared_key_salt).update(@sender.self_hostname).update(@shared_key_nonce).update(@shared_key).hexdigest
  ping = ['PING', @sender.self_hostname, @shared_key_salt, shared_key_hexdigest]
  if @authentication != ''
    password_hexdigest = Digest::SHA512.new.update(@authentication).update(@username).update(@password).hexdigest
    ping.push(@username, password_hexdigest)
  else
    ping.push('','')
  end
  @mtime = Time.now
  ping
end

#generate_saltObject



128
129
130
# File 'lib/fluent/plugin/output_node.rb', line 128

def generate_salt
  OpenSSL::Random.random_bytes(16)
end

#joinObject



112
113
114
# File 'lib/fluent/plugin/output_node.rb', line 112

def join
  @thread && @thread.join
end

#logObject



55
56
57
# File 'lib/fluent/plugin/output_node.rb', line 55

def log
  @sender.log
end

#on_read(data) ⇒ Object



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/fluent/plugin/output_node.rb', line 193

def on_read(data)
  log.debug "on_read"
  if self.established?
    #TODO: ACK
    log.warn "unknown packets arrived..."
    return
  end

  case @state
  when :helo
    unless check_helo(data)
      log.warn "received invalid helo message from #{@host}"
      self.shutdown
      return
    end
    send_data generate_ping()
    @mtime = Time.now
    @state = :pingpong
  when :pingpong
    success, reason = check_pong(data)
    unless success
      log.warn "connection refused to #{@host}:" + reason
      self.shutdown
      return
    end
    log.info "connection established to #{@host}" if @first_session
    @state = :established
    @expire = Time.now + @keepalive if @keepalive && @keepalive > 0
    @mtime = Time.now
    log.debug "connection established", host: @host, port: @port, expire: @expire
  end
end

#release!Object



88
89
90
# File 'lib/fluent/plugin/output_node.rb', line 88

def release!
  @writing = false
end

#send_data(data) ⇒ Object



188
189
190
191
# File 'lib/fluent/plugin/output_node.rb', line 188

def send_data(data)
  @mtime = Time.now
  @sslsession.write data.to_msgpack
end

#shutdownObject



92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/output_node.rb', line 92

def shutdown
  log.debug "shutting down node #{@host}"
  @state = :closed

  if @thread == Thread.current
    @sslsession.close if @sslsession
    @socket.close if @socket
    @thread.kill
  else
    if @thread
      @thread.kill
      @thread.join
    end
    @sslsession.close if @sslsession
    @socket.close if @socket
  end
rescue => e
  log.debug "error on node shutdown #{e.class}:#{e.message}"
end

#startObject



67
68
69
# File 'lib/fluent/plugin/output_node.rb', line 67

def start
  @thread = Thread.new(&method(:connect))
end

#tain!Object

Raises:

  • (RuntimeError)


79
80
81
82
# File 'lib/fluent/plugin/output_node.rb', line 79

def tain!
  raise RuntimeError, "BUG: taining detached node" if @detach
  @writing = true
end

#tained?Boolean

Returns:

  • (Boolean)


84
85
86
# File 'lib/fluent/plugin/output_node.rb', line 84

def tained?
  @writing
end