Class: Fluent::SecureForwardOutput::Node
- Inherits:
-
Object
- Object
- Fluent::SecureForwardOutput::Node
- Defined in:
- lib/fluent/plugin/output_node.rb
Instance Attribute Summary collapse
-
#authentication ⇒ Object
Returns the value of attribute authentication.
-
#detach ⇒ Object
Returns the value of attribute detach.
-
#expire ⇒ Object
readonly
Returns the value of attribute expire.
-
#first_session ⇒ Object
Returns the value of attribute first_session.
-
#host ⇒ Object
Returns the value of attribute host.
-
#hostlabel ⇒ Object
Returns the value of attribute hostlabel.
-
#keepalive ⇒ Object
Returns the value of attribute keepalive.
-
#password ⇒ Object
Returns the value of attribute password.
-
#port ⇒ Object
Returns the value of attribute port.
-
#shared_key ⇒ Object
Returns the value of attribute shared_key.
-
#shared_key_salt ⇒ Object
Returns the value of attribute shared_key_salt.
-
#socket ⇒ Object
Returns the value of attribute socket.
-
#sslsession ⇒ Object
Returns the value of attribute sslsession.
-
#standby ⇒ Object
Returns the value of attribute standby.
-
#state ⇒ Object
Returns the value of attribute state.
-
#unpacker ⇒ Object
Returns the value of attribute unpacker.
-
#username ⇒ Object
Returns the value of attribute username.
Instance Method Summary collapse
- #check_helo(message) ⇒ Object
- #check_pong(message) ⇒ Object
- #connect ⇒ Object
- #detach! ⇒ Object
- #detached? ⇒ Boolean
- #dup ⇒ Object
- #established? ⇒ Boolean
- #expired? ⇒ Boolean
- #generate_ping ⇒ Object
- #generate_salt ⇒ Object
-
#initialize(sender, conf) ⇒ Node
constructor
A new instance of Node.
- #join ⇒ Object
- #log ⇒ Object
- #on_read(data) ⇒ Object
- #release! ⇒ Object
- #send_data(data) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
- #tain! ⇒ Object
- #tained? ⇒ Boolean
Constructor Details
#initialize(sender, conf) ⇒ Node
Returns a new instance of Node.
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/fluent/plugin/output_node.rb', line 21 def initialize(sender, conf) @sender = sender @shared_key = conf.shared_key || sender.shared_key @host = conf.host @port = conf.port @hostlabel = conf.hostlabel || conf.host @username = conf.username @password = conf.password @standby = conf.standby @proxy_uri = conf.proxy_uri @keepalive = sender.keepalive @authentication = nil @writing = false @expire = nil @first_session = false @detach = false @socket = nil @sslsession = nil @unpacker = MessagePack::Unpacker.new @shared_key_salt = generate_salt @state = :helo @thread = nil end |
Instance Attribute Details
#authentication ⇒ Object
Returns the value of attribute authentication.
14 15 16 |
# File 'lib/fluent/plugin/output_node.rb', line 14 def authentication @authentication end |
#detach ⇒ Object
Returns the value of attribute detach.
17 18 19 |
# File 'lib/fluent/plugin/output_node.rb', line 17 def detach @detach end |
#expire ⇒ Object (readonly)
Returns the value of attribute expire.
19 20 21 |
# File 'lib/fluent/plugin/output_node.rb', line 19 def expire @expire end |
#first_session ⇒ Object
Returns the value of attribute first_session.
17 18 19 |
# File 'lib/fluent/plugin/output_node.rb', line 17 def first_session @first_session end |
#host ⇒ Object
Returns the value of attribute host.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def host @host end |
#hostlabel ⇒ Object
Returns the value of attribute hostlabel.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def hostlabel @hostlabel end |
#keepalive ⇒ Object
Returns the value of attribute keepalive.
14 15 16 |
# File 'lib/fluent/plugin/output_node.rb', line 14 def keepalive @keepalive end |
#password ⇒ Object
Returns the value of attribute password.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def password @password end |
#port ⇒ Object
Returns the value of attribute port.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def port @port end |
#shared_key ⇒ Object
Returns the value of attribute shared_key.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def shared_key @shared_key end |
#shared_key_salt ⇒ Object
Returns the value of attribute shared_key_salt.
15 16 17 |
# File 'lib/fluent/plugin/output_node.rb', line 15 def shared_key_salt @shared_key_salt end |
#socket ⇒ Object
Returns the value of attribute socket.
15 16 17 |
# File 'lib/fluent/plugin/output_node.rb', line 15 def socket @socket end |
#sslsession ⇒ Object
Returns the value of attribute sslsession.
15 16 17 |
# File 'lib/fluent/plugin/output_node.rb', line 15 def sslsession @sslsession end |
#standby ⇒ Object
Returns the value of attribute standby.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def standby @standby end |
#state ⇒ Object
Returns the value of attribute state.
15 16 17 |
# File 'lib/fluent/plugin/output_node.rb', line 15 def state @state end |
#unpacker ⇒ Object
Returns the value of attribute unpacker.
15 16 17 |
# File 'lib/fluent/plugin/output_node.rb', line 15 def unpacker @unpacker end |
#username ⇒ Object
Returns the value of attribute username.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def username @username end |
Instance Method Details
#check_helo(message) ⇒ Object
130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/fluent/plugin/output_node.rb', line 130 def check_helo() log.debug "checking helo" # ['HELO', options(hash)] unless .size == 2 && [0] == 'HELO' return false end opts = [1] @shared_key_nonce = opts['nonce'] || '' # make shared_key_check failed (instead of error) if protocol version mismatch exist @authentication = opts['auth'] @allow_keepalive = opts['keepalive'] true end |
#check_pong(message) ⇒ Object
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/fluent/plugin/output_node.rb', line 158 def check_pong() log.debug "checking pong" # ['PONG', bool(authentication result), 'reason if authentication failed', # self_hostname, sha512\_hex(salt + self_hostname + nonce + sharedkey)] unless .size == 5 && [0] == 'PONG' return false, 'invalid format for PONG message' end pong, auth_result, reason, hostname, shared_key_hexdigest = unless auth_result return false, 'authentication failed: ' + reason end if hostname == @sender.self_hostname return false, 'same hostname between input and output: invalid configuration' end clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(@shared_key_nonce).update(@shared_key).hexdigest unless shared_key_hexdigest == clientside return false, 'shared key mismatch' end return true, nil end |
#connect ⇒ Object
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 |
# File 'lib/fluent/plugin/output_node.rb', line 218 def connect Thread.current.abort_on_exception = true log.debug "starting client" addr = @sender.hostname_resolver.getaddress(@host) log.debug "create tcp socket to node", host: @host, address: addr, port: @port begin if @proxy_uri.nil? then sock = TCPSocket.new(addr, @port) else proxy = Proxifier::Proxy(@proxy_uri) sock = proxy.open(addr, @port) end rescue => e log.warn "failed to connect for secure-forward", error_class: e.class, error: e, host: @host, address: addr, port: @port @state = :failed return end log.trace "changing socket options" opt = [1, @sender.send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [@sender.send_timeout.to_i, 0].pack('L!L!') # struct timeval sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) log.trace "initializing SSL contexts" context = OpenSSL::SSL::SSLContext.new(@sender.ssl_version) log.trace "setting SSL verification options" if @sender.secure # inject OpenSSL::SSL::SSLContext::DEFAULT_PARAMS # https://bugs.ruby-lang.org/issues/9424 context.set_params({}) if @sender.ssl_ciphers context.ciphers = @sender.ssl_ciphers else ### follow httpclient configuration by nahi # OpenSSL 0.9.8 default: "ALL:!ADH:!LOW:!EXP:!MD5:+SSLv2:@STRENGTH" context.ciphers = "ALL:!aNULL:!eNULL:!SSLv2" # OpenSSL >1.0.0 default end log.trace "set verify_mode VERIFY_PEER" context.verify_mode = OpenSSL::SSL::VERIFY_PEER if @sender.enable_strict_verification context.cert_store = OpenSSL::X509::Store.new begin context.cert_store.set_default_paths rescue OpenSSL::X509::StoreError => e log.warn "faild to load system default certificates", error: e end end if @sender.ca_cert_path log.trace "set to use private CA", path: @sender.ca_cert_path context.ca_file = @sender.ca_cert_path end end log.debug "trying to connect ssl session", host: @host, address: addr, port: @port begin sslsession = OpenSSL::SSL::SSLSocket.new(sock, context) log.trace "connecting...", host: @host, address: addr, port: @port sslsession.connect rescue => e log.warn "failed to establish SSL connection", error_class: e.class, error: e, host: @host, address: addr, port: @port @state = :failed return end log.debug "ssl session connected", host: @host, port: @port begin if @sender.enable_strict_verification log.debug "checking peer's certificate", subject: sslsession.peer_cert.subject sslsession.post_connection_check(@hostlabel) verify = sslsession.verify_result if verify != OpenSSL::X509::V_OK err_name = Fluent::SecureForwardOutput::OpenSSLUtil.verify_result_name(verify) log.warn "BUG: failed to verify certification while connecting host #{@host} as #{@hostlabel} (but not raised, why?)" log.warn "BUG: verify_result: #{err_name}" raise RuntimeError, "BUG: failed to verify certification and to handle it correctly while connecting host #{@host} as #{@hostlabel}" end end rescue OpenSSL::SSL::SSLError => e log.warn "failed to verify certification while connecting ssl session", host: @host, hostlabel: @hostlabel self.shutdown raise end log.debug "ssl session connected", host: @host, port: @port @socket = sock @sslsession = sslsession buf = '' read_length = @sender.read_length read_interval = @sender.read_interval socket_interval = @sender.socket_interval loop do break if @detach begin while @sslsession.read_nonblock(read_length, buf) if buf == '' sleep read_interval next end @unpacker.feed_each(buf, &method(:on_read)) buf = '' end rescue OpenSSL::SSL::SSLError # to wait i/o restart sleep socket_interval rescue SystemCallError => e log.warn "disconnected by Error", error_class: e.class, error: e, host: @host, port: @port break rescue EOFError log.warn "disconnected", host: @host, port: @port break end end while @writing break if @detach sleep read_interval end self.shutdown end |
#detach! ⇒ Object
69 70 71 |
# File 'lib/fluent/plugin/output_node.rb', line 69 def detach! @detach = true end |
#detached? ⇒ Boolean
73 74 75 |
# File 'lib/fluent/plugin/output_node.rb', line 73 def detached? @detach end |
#dup ⇒ Object
57 58 59 60 61 62 63 |
# File 'lib/fluent/plugin/output_node.rb', line 57 def dup renewed = self.class.new( @sender, Fluent::Config::Section.new({host: @host, port: @port, hostlabel: @hostlabel, username: @username, password: @password, shared_key: @shared_key, standby: @standby, proxy_uri: @proxy_uri}) ) renewed end |
#established? ⇒ Boolean
114 115 116 |
# File 'lib/fluent/plugin/output_node.rb', line 114 def established? @state == :established end |
#expired? ⇒ Boolean
118 119 120 121 122 123 124 |
# File 'lib/fluent/plugin/output_node.rb', line 118 def expired? if @keepalive.nil? || @keepalive == 0 false else @expire && @expire < Time.now end end |
#generate_ping ⇒ Object
143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/fluent/plugin/output_node.rb', line 143 def generate_ping log.debug "generating ping" # ['PING', self_hostname, sharedkey\_salt, sha512\_hex(sharedkey\_salt + self_hostname + nonce + shared_key), # username || '', sha512\_hex(auth\_salt + username + password) || ''] shared_key_hexdigest = Digest::SHA512.new.update(@shared_key_salt).update(@sender.self_hostname).update(@shared_key_nonce).update(@shared_key).hexdigest ping = ['PING', @sender.self_hostname, @shared_key_salt, shared_key_hexdigest] if @authentication != '' password_hexdigest = Digest::SHA512.new.update(@authentication).update(@username).update(@password).hexdigest ping.push(@username, password_hexdigest) else ping.push('','') end ping end |
#generate_salt ⇒ Object
126 127 128 |
# File 'lib/fluent/plugin/output_node.rb', line 126 def generate_salt OpenSSL::Random.random_bytes(16) end |
#join ⇒ Object
110 111 112 |
# File 'lib/fluent/plugin/output_node.rb', line 110 def join @thread && @thread.join end |
#log ⇒ Object
53 54 55 |
# File 'lib/fluent/plugin/output_node.rb', line 53 def log @sender.log end |
#on_read(data) ⇒ Object
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/fluent/plugin/output_node.rb', line 187 def on_read(data) log.debug "on_read" if self.established? #TODO: ACK log.warn "unknown packets arrived..." return end case @state when :helo unless check_helo(data) log.warn "received invalid helo message from #{@host}" self.shutdown return end send_data generate_ping() @state = :pingpong when :pingpong success, reason = check_pong(data) unless success log.warn "connection refused to #{@host}:" + reason self.shutdown return end log.info "connection established to #{@host}" if @first_session @state = :established @expire = Time.now + @keepalive if @keepalive && @keepalive > 0 log.debug "connection established", host: @host, port: @port, expire: @expire end end |
#release! ⇒ Object
86 87 88 |
# File 'lib/fluent/plugin/output_node.rb', line 86 def release! @writing = false end |
#send_data(data) ⇒ Object
183 184 185 |
# File 'lib/fluent/plugin/output_node.rb', line 183 def send_data(data) @sslsession.write data.to_msgpack end |
#shutdown ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/fluent/plugin/output_node.rb', line 90 def shutdown log.debug "shutting down node #{@host}" @state = :closed if @thread == Thread.current @sslsession.close if @sslsession @socket.close if @socket @thread.kill else if @thread @thread.kill @thread.join end @sslsession.close if @sslsession @socket.close if @socket end rescue => e log.debug "error on node shutdown #{e.class}:#{e.message}" end |
#start ⇒ Object
65 66 67 |
# File 'lib/fluent/plugin/output_node.rb', line 65 def start @thread = Thread.new(&method(:connect)) end |
#tain! ⇒ Object
77 78 79 80 |
# File 'lib/fluent/plugin/output_node.rb', line 77 def tain! raise RuntimeError, "BUG: taining detached node" if @detach @writing = true end |
#tained? ⇒ Boolean
82 83 84 |
# File 'lib/fluent/plugin/output_node.rb', line 82 def tained? @writing end |