Class: Fluent::SecureForwardOutput::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/output_node.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sender, shared_key, conf) ⇒ Node

Returns a new instance of Node.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/fluent/plugin/output_node.rb', line 19

def initialize(sender, shared_key, conf)
  @sender = sender
  @shared_key = shared_key

  @host = conf['host']
  @port = (conf['port'] || Fluent::SecureForwardOutput::DEFAULT_SECURE_CONNECT_PORT).to_i
  @hostlabel = conf['hostlabel'] || conf['host']
  @username = conf['username'] || ''
  @password = conf['password'] || ''
  @standby = conf.has_key?('standby') && Fluent::Config.bool_value(conf['standby']) != false

  @authentication = nil

  @keepalive = nil
  @expire = nil
  @first_session = false
  @detach = false

  @socket = nil
  @sslsession = nil
  @unpacker = MessagePack::Unpacker.new

  @shared_key_salt = generate_salt
  @state = :helo
  @thread = nil
end

Instance Attribute Details

#authenticationObject

Returns the value of attribute authentication.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def authentication
  @authentication
end

#detachObject

Returns the value of attribute detach.



15
16
17
# File 'lib/fluent/plugin/output_node.rb', line 15

def detach
  @detach
end

#expireObject (readonly)

Returns the value of attribute expire.



17
18
19
# File 'lib/fluent/plugin/output_node.rb', line 17

def expire
  @expire
end

#first_sessionObject

Returns the value of attribute first_session.



15
16
17
# File 'lib/fluent/plugin/output_node.rb', line 15

def first_session
  @first_session
end

#hostObject

Returns the value of attribute host.



10
11
12
# File 'lib/fluent/plugin/output_node.rb', line 10

def host
  @host
end

#hostlabelObject

Returns the value of attribute hostlabel.



10
11
12
# File 'lib/fluent/plugin/output_node.rb', line 10

def hostlabel
  @hostlabel
end

#keepaliveObject

Returns the value of attribute keepalive.



12
13
14
# File 'lib/fluent/plugin/output_node.rb', line 12

def keepalive
  @keepalive
end

#passwordObject

Returns the value of attribute password.



10
11
12
# File 'lib/fluent/plugin/output_node.rb', line 10

def password
  @password
end

#portObject

Returns the value of attribute port.



10
11
12
# File 'lib/fluent/plugin/output_node.rb', line 10

def port
  @port
end

#shared_keyObject

Returns the value of attribute shared_key.



10
11
12
# File 'lib/fluent/plugin/output_node.rb', line 10

def shared_key
  @shared_key
end

#shared_key_saltObject

Returns the value of attribute shared_key_salt.



13
14
15
# File 'lib/fluent/plugin/output_node.rb', line 13

def shared_key_salt
  @shared_key_salt
end

#socketObject

Returns the value of attribute socket.



13
14
15
# File 'lib/fluent/plugin/output_node.rb', line 13

def socket
  @socket
end

#sslsessionObject

Returns the value of attribute sslsession.



13
14
15
# File 'lib/fluent/plugin/output_node.rb', line 13

def sslsession
  @sslsession
end

#standbyObject

Returns the value of attribute standby.



10
11
12
# File 'lib/fluent/plugin/output_node.rb', line 10

def standby
  @standby
end

#stateObject

Returns the value of attribute state.



13
14
15
# File 'lib/fluent/plugin/output_node.rb', line 13

def state
  @state
end

#unpackerObject

Returns the value of attribute unpacker.



13
14
15
# File 'lib/fluent/plugin/output_node.rb', line 13

def unpacker
  @unpacker
end

#usernameObject

Returns the value of attribute username.



10
11
12
# File 'lib/fluent/plugin/output_node.rb', line 10

def username
  @username
end

Instance Method Details

#check_helo(message) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
# File 'lib/fluent/plugin/output_node.rb', line 100

def check_helo(message)
  $log.debug "checking helo"
  # ['HELO', options(hash)]
  unless message.size == 2 && message[0] == 'HELO'
    return false
  end
  opts = message[1]
  @authentication = opts['auth']
  @allow_keepalive = opts['keepalive']
  true
end

#check_pong(message) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/fluent/plugin/output_node.rb', line 127

def check_pong(message)
  $log.debug "checking pong"
  # ['PONG', bool(authentication result), 'reason if authentication failed',
  #  self_hostname, sha512\_hex(salt + self_hostname + sharedkey)]
  unless message.size == 5 && message[0] == 'PONG'
    return false, 'invalid format for PONG message'
  end
  pong, auth_result, reason, hostname, shared_key_hexdigest = message

  unless auth_result
    return false, 'authentication failed: ' + reason
  end

  clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(@shared_key).hexdigest
  unless shared_key_hexdigest == clientside
    return false, 'shared key mismatch'
  end

  return true, nil
end

#connectObject



183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/fluent/plugin/output_node.rb', line 183

def connect
  $log.debug "starting client"

  addr = @sender.hostname_resolver.getaddress(@host)
  $log.debug "create tcp socket to node", :host => @host, :address => addr, :port => @port
  sock = TCPSocket.new(addr, @port)

  $log.trace "changing socket options"
  opt = [1, @sender.send_timeout.to_i].pack('I!I!')  # { int l_onoff; int l_linger; }
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)

  opt = [@sender.send_timeout.to_i, 0].pack('L!L!')  # struct timeval
  sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt)

  # TODO: SSLContext constructer parameter (SSL/TLS protocol version)
  $log.trace "initializing SSL contexts"
  context = OpenSSL::SSL::SSLContext.new
  # TODO: context.ca_file = (ca_file_path)
  # TODO: context.ciphers = (SSL Shared key chiper protocols)

  $log.debug "trying to connect ssl session", :host => @host, :ipaddr => addr, :port => @port
  sslsession = OpenSSL::SSL::SSLSocket.new(sock, context)
  # TODO: check connection failure
  sslsession.connect
  $log.debug "ssl session connected", :host => @host, :port => @port

  begin
    unless @sender.allow_self_signed_certificate
      $log.debug "checking peer's certificate", :subject => sslsession.peer_cert.subject
      sslsession.post_connection_check(@hostlabel)
      verify = sslsession.verify_result
      if verify != OpenSSL::X509::V_OK
        err_name = Fluent::SecureForwardOutput::OpenSSLUtil.verify_result_name(verify)
        $log.warn "failed to verify certification while connecting host #{@host} as #{@hostlabel} (but not raised, why?)"
        $log.warn "verify_result: #{err_name}"
        raise RuntimeError, "failed to verify certification while connecting host #{@host} as #{@hostlabel}"
      end
    end
  rescue OpenSSL::SSL::SSLError => e
    $log.warn "failed to verify certification while connecting ssl session", :host => @host, :hostlabel => @hostlabel
    self.shutdown
    raise
  end

  $log.debug "ssl sessison connected", :host => @host, :port => @port
  @socket = sock
  @sslsession = sslsession

  buf = ''
  read_length = @sender.read_length
  read_interval = @sender.read_interval
  socket_interval = @sender.socket_interval

  loop do
    break if @detach

    begin
      while @sslsession.read_nonblock(read_length, buf)
        if buf == ''
          sleep read_interval
          next
        end
        @unpacker.feed_each(buf, &method(:on_read))
        buf = ''
      end
    rescue OpenSSL::SSL::SSLError
      # to wait i/o restart
      sleep socket_interval
    rescue EOFError
      $log.warn "disconnected from #{@host}"
      break
    end
  end
  self.shutdown
end

#dupObject



46
47
48
49
50
51
52
53
54
# File 'lib/fluent/plugin/output_node.rb', line 46

def dup
  renewed = self.class.new(
    @sender,
    @shared_key,
    {'host' => @host, 'port' => @port, 'hostlabel' => @hostlabel, 'username' => @username, 'password' => @password}
  )
  renewed.keepalive = @keepalive if @keepalive
  renewed
end

#established?Boolean

Returns:

  • (Boolean)


84
85
86
# File 'lib/fluent/plugin/output_node.rb', line 84

def established?
  @state == :established
end

#expired?Boolean

Returns:

  • (Boolean)


88
89
90
91
92
93
94
# File 'lib/fluent/plugin/output_node.rb', line 88

def expired?
  if @keepalive.nil? || @keepalive == 0
    false
  else
    @expire && @expire < Time.now
  end
end

#generate_pingObject



112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/fluent/plugin/output_node.rb', line 112

def generate_ping
  $log.debug "generating ping"
  # ['PING', self_hostname, sharedkey\_salt, sha512\_hex(sharedkey\_salt + self_hostname + shared_key),
  #  username || '', sha512\_hex(auth\_salt + username + password) || '']
  shared_key_hexdigest = Digest::SHA512.new.update(@shared_key_salt).update(@sender.self_hostname).update(@shared_key).hexdigest
  ping = ['PING', @sender.self_hostname, @shared_key_salt, shared_key_hexdigest]
  if @authentication != ''
    password_hexdigest = Digest::SHA512.new.update(@authentication).update(@username).update(@password).hexdigest
    ping.push(@username, password_hexdigest)
  else
    ping.push('','')
  end
  ping
end

#generate_saltObject



96
97
98
# File 'lib/fluent/plugin/output_node.rb', line 96

def generate_salt
  OpenSSL::Random.random_bytes(16)
end

#joinObject



80
81
82
# File 'lib/fluent/plugin/output_node.rb', line 80

def join
  @thread && @thread.join
end

#on_read(data) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/fluent/plugin/output_node.rb', line 152

def on_read(data)
  $log.debug "on_read"
  if self.established?
    #TODO: ACK
    $log.warn "unknown packets arrived..."
    return
  end

  case @state
  when :helo
    unless check_helo(data)
      $log.warn "received invalid helo message from #{@host}"
      self.shutdown
      return
    end
    send_data generate_ping()
    @state = :pingpong
  when :pingpong
    success, reason = check_pong(data)
    unless success
      $log.warn "connection refused to #{@host}:" + reason
      self.shutdown
      return
    end
    $log.info "connection established to #{@host}" if @first_session
    @state = :established
    @expire = Time.now + @keepalive if @keepalive && @keepalive > 0
    $log.debug "connection established", :host => @host, :port => @port, :expire => @expire
  end
end

#send_data(data) ⇒ Object



148
149
150
# File 'lib/fluent/plugin/output_node.rb', line 148

def send_data(data)
  @sslsession.write data.to_msgpack
end

#shutdownObject



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/fluent/plugin/output_node.rb', line 60

def shutdown
  $log.debug "shutting down node #{@host}"
  @state = :closed

  if @thread == Thread.current
    @sslsession.close if @sslsession
    @socket.close if @socket
    @thread.kill
  else
    if @thread
      @thread.kill
      @thread.join
    end
    @sslsession.close if @sslsession
    @socket.close if @socket
  end
rescue => e
  $log.debug "error on node shutdown #{e.class}:#{e.message}"
end

#startObject



56
57
58
# File 'lib/fluent/plugin/output_node.rb', line 56

def start
  @thread = Thread.new(&method(:connect))
end