Class: Fluent::SecureForwardOutput::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/output_node.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sender, conf) ⇒ Node

Returns a new instance of Node.



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/fluent/plugin/output_node.rb', line 21

def initialize(sender, conf)
  @sender = sender
  @shared_key = conf.shared_key || sender.shared_key

  @host = conf.host
  @port = conf.port
  @hostlabel = conf.hostlabel || conf.host
  @username = conf.username
  @password = conf.password
  @standby = conf.standby

  @proxy_uri = conf.proxy_uri

  @keepalive = sender.keepalive

  @authentication = nil

  @writing = false

  @expire = nil
  @first_session = false
  @detach = false

  @socket = nil
  @sslsession = nil
  @unpacker = MessagePack::Unpacker.new

  @shared_key_salt = generate_salt
  @state = :helo
  @thread = nil
end

Instance Attribute Details

#authenticationObject

Returns the value of attribute authentication.



14
15
16
# File 'lib/fluent/plugin/output_node.rb', line 14

def authentication
  @authentication
end

#detachObject

Returns the value of attribute detach.



17
18
19
# File 'lib/fluent/plugin/output_node.rb', line 17

def detach
  @detach
end

#expireObject (readonly)

Returns the value of attribute expire.



19
20
21
# File 'lib/fluent/plugin/output_node.rb', line 19

def expire
  @expire
end

#first_sessionObject

Returns the value of attribute first_session.



17
18
19
# File 'lib/fluent/plugin/output_node.rb', line 17

def first_session
  @first_session
end

#hostObject

Returns the value of attribute host.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def host
  @host
end

#hostlabelObject

Returns the value of attribute hostlabel.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def hostlabel
  @hostlabel
end

#keepaliveObject

Returns the value of attribute keepalive.



14
15
16
# File 'lib/fluent/plugin/output_node.rb', line 14

def keepalive
  @keepalive
end

#passwordObject

Returns the value of attribute password.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def password
  @password
end

#portObject

Returns the value of attribute port.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def port
  @port
end

#shared_keyObject

Returns the value of attribute shared_key.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def shared_key
  @shared_key
end

#shared_key_saltObject

Returns the value of attribute shared_key_salt.



15
16
17
# File 'lib/fluent/plugin/output_node.rb', line 15

def shared_key_salt
  @shared_key_salt
end

#socketObject

Returns the value of attribute socket.



15
16
17
# File 'lib/fluent/plugin/output_node.rb', line 15

def socket
  @socket
end

#sslsessionObject

Returns the value of attribute sslsession.



15
16
17
# File 'lib/fluent/plugin/output_node.rb', line 15

def sslsession
  @sslsession
end

#standbyObject

Returns the value of attribute standby.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def standby
  @standby
end

#stateObject

Returns the value of attribute state.



15
16
17
# File 'lib/fluent/plugin/output_node.rb', line 15

def state
  @state
end

#unpackerObject

Returns the value of attribute unpacker.



15
16
17
# File 'lib/fluent/plugin/output_node.rb', line 15

def unpacker
  @unpacker
end

#usernameObject

Returns the value of attribute username.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def username
  @username
end

Instance Method Details

#check_helo(message) ⇒ Object



130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/fluent/plugin/output_node.rb', line 130

def check_helo(message)
  log.debug "checking helo"
  # ['HELO', options(hash)]
  unless message.size == 2 && message[0] == 'HELO'
    return false
  end
  opts = message[1]
  @shared_key_nonce = opts['nonce'] || '' # make shared_key_check failed (instead of error) if protocol version mismatch exist
  @authentication = opts['auth']
  @allow_keepalive = opts['keepalive']
  true
end

#check_pong(message) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/fluent/plugin/output_node.rb', line 158

def check_pong(message)
  log.debug "checking pong"
  # ['PONG', bool(authentication result), 'reason if authentication failed',
  #  self_hostname, sha512\_hex(salt + self_hostname + nonce + sharedkey)]
  unless message.size == 5 && message[0] == 'PONG'
    return false, 'invalid format for PONG message'
  end
  pong, auth_result, reason, hostname, shared_key_hexdigest = message

  unless auth_result
    return false, 'authentication failed: ' + reason
  end

  if hostname == @sender.self_hostname
    return false, 'same hostname between input and output: invalid configuration'
  end

  clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(@shared_key_nonce).update(@shared_key).hexdigest
  unless shared_key_hexdigest == clientside
    return false, 'shared key mismatch'
  end

  return true, nil
end

#connectObject



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
# File 'lib/fluent/plugin/output_node.rb', line 218

def connect
  Thread.current.abort_on_exception = true
  log.debug "starting client"

  addr = @sender.hostname_resolver.getaddress(@host)
  log.debug "create tcp socket to node", host: @host, address: addr, port: @port

  begin
    if @proxy_uri.nil? then
      sock = TCPSocket.new(addr, @port)
    else
      proxy = Proxifier::Proxy(@proxy_uri)
      sock = proxy.open(addr, @port)
    end
  rescue => e
    log.warn "failed to connect for secure-forward", error_class: e.class, error: e, host: @host, address: addr, port: @port
    @state = :failed
    return
  end

  log.trace "changing socket options"
  opt = [1, @sender.send_timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)

  opt = [@sender.send_timeout.to_i, 0].pack('L!L!')  # struct timeval
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

  log.trace "initializing SSL contexts"

  context = OpenSSL::SSL::SSLContext.new(@sender.ssl_version)

  log.trace "setting SSL verification options"

  if @sender.secure
    # inject OpenSSL::SSL::SSLContext::DEFAULT_PARAMS
    # https://bugs.ruby-lang.org/issues/9424
    context.set_params({})

    if @sender.ssl_ciphers
      context.ciphers = @sender.ssl_ciphers
    else
      ### follow httpclient configuration by nahi
      # OpenSSL 0.9.8 default: "ALL:!ADH:!LOW:!EXP:!MD5:+SSLv2:@STRENGTH"
      context.ciphers = "ALL:!aNULL:!eNULL:!SSLv2" # OpenSSL >1.0.0 default
    end

    log.trace "set verify_mode VERIFY_PEER"
    context.verify_mode = OpenSSL::SSL::VERIFY_PEER
    if @sender.enable_strict_verification
      context.cert_store = OpenSSL::X509::Store.new
      begin
        context.cert_store.set_default_paths
      rescue OpenSSL::X509::StoreError => e
        log.warn "faild to load system default certificates", error: e
      end
    end
    if @sender.ca_cert_path
      log.trace "set to use private CA", path: @sender.ca_cert_path
      context.ca_file = @sender.ca_cert_path
    end
  end

  log.debug "trying to connect ssl session", host: @host, address: addr, port: @port
  begin
    sslsession = OpenSSL::SSL::SSLSocket.new(sock, context)
    log.trace "connecting...", host: @host, address: addr, port: @port
    sslsession.connect
  rescue => e
    log.warn "failed to establish SSL connection", error_class: e.class, error: e, host: @host, address: addr, port: @port
    @state = :failed
    return
  end

  log.debug "ssl session connected", host: @host, port: @port

  begin
    if @sender.enable_strict_verification
      log.debug "checking peer's certificate", subject: sslsession.peer_cert.subject
      sslsession.post_connection_check(@hostlabel)
      verify = sslsession.verify_result
      if verify != OpenSSL::X509::V_OK
        err_name = Fluent::SecureForwardOutput::OpenSSLUtil.verify_result_name(verify)
        log.warn "BUG: failed to verify certification while connecting host #{@host} as #{@hostlabel} (but not raised, why?)"
        log.warn "BUG: verify_result: #{err_name}"
        raise RuntimeError, "BUG: failed to verify certification and to handle it correctly while connecting host #{@host} as #{@hostlabel}"
      end
    end
  rescue OpenSSL::SSL::SSLError => e
    log.warn "failed to verify certification while connecting ssl session", host: @host, hostlabel: @hostlabel
    self.shutdown
    raise
  end

  log.debug "ssl session connected", host: @host, port: @port
  @socket = sock
  @sslsession = sslsession

  buf = ''
  read_length = @sender.read_length
  read_interval = @sender.read_interval
  socket_interval = @sender.socket_interval

  loop do
    break if @detach

    begin
      while @sslsession.read_nonblock(read_length, buf)
        if buf == ''
          sleep read_interval
          next
        end
        @unpacker.feed_each(buf, &method(:on_read))
        buf = ''
      end
    rescue OpenSSL::SSL::SSLError
      # to wait i/o restart
      sleep socket_interval
    rescue SystemCallError => e
      log.warn "disconnected by Error", error_class: e.class, error: e, host: @host, port: @port
      break
    rescue EOFError
      log.warn "disconnected", host: @host, port: @port
      break
    end
  end
  while @writing
    break if @detach

    sleep read_interval
  end
  self.shutdown
end

#detach!Object



69
70
71
# File 'lib/fluent/plugin/output_node.rb', line 69

def detach!
  @detach = true
end

#detached?Boolean

Returns:

  • (Boolean)


73
74
75
# File 'lib/fluent/plugin/output_node.rb', line 73

def detached?
  @detach
end

#dupObject



57
58
59
60
61
62
63
# File 'lib/fluent/plugin/output_node.rb', line 57

def dup
  renewed = self.class.new(
    @sender,
    Fluent::Config::Section.new({host: @host, port: @port, hostlabel: @hostlabel, username: @username, password: @password, shared_key: @shared_key, standby: @standby, proxy_uri: @proxy_uri})
  )
  renewed
end

#established?Boolean

Returns:

  • (Boolean)


114
115
116
# File 'lib/fluent/plugin/output_node.rb', line 114

def established?
  @state == :established
end

#expired?Boolean

Returns:

  • (Boolean)


118
119
120
121
122
123
124
# File 'lib/fluent/plugin/output_node.rb', line 118

def expired?
  if @keepalive.nil? || @keepalive == 0
    false
  else
    @expire && @expire < Time.now
  end
end

#generate_pingObject



143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/fluent/plugin/output_node.rb', line 143

def generate_ping
  log.debug "generating ping"
  # ['PING', self_hostname, sharedkey\_salt, sha512\_hex(sharedkey\_salt + self_hostname + nonce + shared_key),
  #  username || '', sha512\_hex(auth\_salt + username + password) || '']
  shared_key_hexdigest = Digest::SHA512.new.update(@shared_key_salt).update(@sender.self_hostname).update(@shared_key_nonce).update(@shared_key).hexdigest
  ping = ['PING', @sender.self_hostname, @shared_key_salt, shared_key_hexdigest]
  if @authentication != ''
    password_hexdigest = Digest::SHA512.new.update(@authentication).update(@username).update(@password).hexdigest
    ping.push(@username, password_hexdigest)
  else
    ping.push('','')
  end
  ping
end

#generate_saltObject



126
127
128
# File 'lib/fluent/plugin/output_node.rb', line 126

def generate_salt
  OpenSSL::Random.random_bytes(16)
end

#joinObject



110
111
112
# File 'lib/fluent/plugin/output_node.rb', line 110

def join
  @thread && @thread.join
end

#logObject



53
54
55
# File 'lib/fluent/plugin/output_node.rb', line 53

def log
  @sender.log
end

#on_read(data) ⇒ Object



187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/fluent/plugin/output_node.rb', line 187

def on_read(data)
  log.debug "on_read"
  if self.established?
    #TODO: ACK
    log.warn "unknown packets arrived..."
    return
  end

  case @state
  when :helo
    unless check_helo(data)
      log.warn "received invalid helo message from #{@host}"
      self.shutdown
      return
    end
    send_data generate_ping()
    @state = :pingpong
  when :pingpong
    success, reason = check_pong(data)
    unless success
      log.warn "connection refused to #{@host}:" + reason
      self.shutdown
      return
    end
    log.info "connection established to #{@host}" if @first_session
    @state = :established
    @expire = Time.now + @keepalive if @keepalive && @keepalive > 0
    log.debug "connection established", host: @host, port: @port, expire: @expire
  end
end

#release!Object



86
87
88
# File 'lib/fluent/plugin/output_node.rb', line 86

def release!
  @writing = false
end

#send_data(data) ⇒ Object



183
184
185
# File 'lib/fluent/plugin/output_node.rb', line 183

def send_data(data)
  @sslsession.write data.to_msgpack
end

#shutdownObject



90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/fluent/plugin/output_node.rb', line 90

def shutdown
  log.debug "shutting down node #{@host}"
  @state = :closed

  if @thread == Thread.current
    @sslsession.close if @sslsession
    @socket.close if @socket
    @thread.kill
  else
    if @thread
      @thread.kill
      @thread.join
    end
    @sslsession.close if @sslsession
    @socket.close if @socket
  end
rescue => e
  log.debug "error on node shutdown #{e.class}:#{e.message}"
end

#startObject



65
66
67
# File 'lib/fluent/plugin/output_node.rb', line 65

def start
  @thread = Thread.new(&method(:connect))
end

#tain!Object

Raises:

  • (RuntimeError)


77
78
79
80
# File 'lib/fluent/plugin/output_node.rb', line 77

def tain!
  raise RuntimeError, "BUG: taining detached node" if @detach
  @writing = true
end

#tained?Boolean

Returns:

  • (Boolean)


82
83
84
# File 'lib/fluent/plugin/output_node.rb', line 82

def tained?
  @writing
end