Class: Fluent::SecureForwardOutput::Node
- Inherits:
-
Object
- Object
- Fluent::SecureForwardOutput::Node
- Defined in:
- lib/fluent/plugin/output_node.rb
Instance Attribute Summary collapse
-
#authentication ⇒ Object
Returns the value of attribute authentication.
-
#detach ⇒ Object
Returns the value of attribute detach.
-
#expire ⇒ Object
readonly
Returns the value of attribute expire.
-
#first_session ⇒ Object
Returns the value of attribute first_session.
-
#host ⇒ Object
Returns the value of attribute host.
-
#hostlabel ⇒ Object
Returns the value of attribute hostlabel.
-
#keepalive ⇒ Object
Returns the value of attribute keepalive.
-
#password ⇒ Object
Returns the value of attribute password.
-
#port ⇒ Object
Returns the value of attribute port.
-
#shared_key ⇒ Object
Returns the value of attribute shared_key.
-
#shared_key_salt ⇒ Object
Returns the value of attribute shared_key_salt.
-
#socket ⇒ Object
Returns the value of attribute socket.
-
#sslsession ⇒ Object
Returns the value of attribute sslsession.
-
#standby ⇒ Object
Returns the value of attribute standby.
-
#state ⇒ Object
Returns the value of attribute state.
-
#unpacker ⇒ Object
Returns the value of attribute unpacker.
-
#username ⇒ Object
Returns the value of attribute username.
Instance Method Summary collapse
- #check_helo(message) ⇒ Object
- #check_pong(message) ⇒ Object
- #connect ⇒ Object
- #dup ⇒ Object
- #established? ⇒ Boolean
- #expired? ⇒ Boolean
- #generate_ping ⇒ Object
- #generate_salt ⇒ Object
-
#initialize(sender, shared_key, conf) ⇒ Node
constructor
A new instance of Node.
- #join ⇒ Object
- #on_read(data) ⇒ Object
- #send_data(data) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(sender, shared_key, conf) ⇒ Node
Returns a new instance of Node.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/fluent/plugin/output_node.rb', line 19 def initialize(sender, shared_key, conf) @sender = sender @shared_key = shared_key @host = conf['host'] @port = (conf['port'] || Fluent::SecureForwardOutput::DEFAULT_SECURE_CONNECT_PORT).to_i @hostlabel = conf['hostlabel'] || conf['host'] @username = conf['username'] || '' @password = conf['password'] || '' @standby = conf.has_key?('standby') && Fluent::Config.bool_value(conf['standby']) != false @authentication = nil @keepalive = nil @expire = nil @first_session = false @detach = false @socket = nil @sslsession = nil @unpacker = MessagePack::Unpacker.new @shared_key_salt = generate_salt @state = :helo @thread = nil end |
Instance Attribute Details
#authentication ⇒ Object
Returns the value of attribute authentication.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def authentication @authentication end |
#detach ⇒ Object
Returns the value of attribute detach.
15 16 17 |
# File 'lib/fluent/plugin/output_node.rb', line 15 def detach @detach end |
#expire ⇒ Object (readonly)
Returns the value of attribute expire.
17 18 19 |
# File 'lib/fluent/plugin/output_node.rb', line 17 def expire @expire end |
#first_session ⇒ Object
Returns the value of attribute first_session.
15 16 17 |
# File 'lib/fluent/plugin/output_node.rb', line 15 def first_session @first_session end |
#host ⇒ Object
Returns the value of attribute host.
10 11 12 |
# File 'lib/fluent/plugin/output_node.rb', line 10 def host @host end |
#hostlabel ⇒ Object
Returns the value of attribute hostlabel.
10 11 12 |
# File 'lib/fluent/plugin/output_node.rb', line 10 def hostlabel @hostlabel end |
#keepalive ⇒ Object
Returns the value of attribute keepalive.
12 13 14 |
# File 'lib/fluent/plugin/output_node.rb', line 12 def keepalive @keepalive end |
#password ⇒ Object
Returns the value of attribute password.
10 11 12 |
# File 'lib/fluent/plugin/output_node.rb', line 10 def password @password end |
#port ⇒ Object
Returns the value of attribute port.
10 11 12 |
# File 'lib/fluent/plugin/output_node.rb', line 10 def port @port end |
#shared_key ⇒ Object
Returns the value of attribute shared_key.
10 11 12 |
# File 'lib/fluent/plugin/output_node.rb', line 10 def shared_key @shared_key end |
#shared_key_salt ⇒ Object
Returns the value of attribute shared_key_salt.
13 14 15 |
# File 'lib/fluent/plugin/output_node.rb', line 13 def shared_key_salt @shared_key_salt end |
#socket ⇒ Object
Returns the value of attribute socket.
13 14 15 |
# File 'lib/fluent/plugin/output_node.rb', line 13 def socket @socket end |
#sslsession ⇒ Object
Returns the value of attribute sslsession.
13 14 15 |
# File 'lib/fluent/plugin/output_node.rb', line 13 def sslsession @sslsession end |
#standby ⇒ Object
Returns the value of attribute standby.
10 11 12 |
# File 'lib/fluent/plugin/output_node.rb', line 10 def standby @standby end |
#state ⇒ Object
Returns the value of attribute state.
13 14 15 |
# File 'lib/fluent/plugin/output_node.rb', line 13 def state @state end |
#unpacker ⇒ Object
Returns the value of attribute unpacker.
13 14 15 |
# File 'lib/fluent/plugin/output_node.rb', line 13 def unpacker @unpacker end |
#username ⇒ Object
Returns the value of attribute username.
10 11 12 |
# File 'lib/fluent/plugin/output_node.rb', line 10 def username @username end |
Instance Method Details
#check_helo(message) ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/fluent/plugin/output_node.rb', line 100 def check_helo() $log.debug "checking helo" # ['HELO', options(hash)] unless .size == 2 && [0] == 'HELO' return false end opts = [1] @authentication = opts['auth'] @allow_keepalive = opts['keepalive'] true end |
#check_pong(message) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/fluent/plugin/output_node.rb', line 127 def check_pong() $log.debug "checking pong" # ['PONG', bool(authentication result), 'reason if authentication failed', # self_hostname, sha512\_hex(salt + self_hostname + sharedkey)] unless .size == 5 && [0] == 'PONG' return false, 'invalid format for PONG message' end pong, auth_result, reason, hostname, shared_key_hexdigest = unless auth_result return false, 'authentication failed: ' + reason end clientside = Digest::SHA512.new.update(@shared_key_salt).update(hostname).update(@shared_key).hexdigest unless shared_key_hexdigest == clientside return false, 'shared key mismatch' end return true, nil end |
#connect ⇒ Object
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
# File 'lib/fluent/plugin/output_node.rb', line 183 def connect $log.debug "starting client" addr = @sender.hostname_resolver.getaddress(@host) $log.debug "create tcp socket to node", :host => @host, :address => addr, :port => @port sock = TCPSocket.new(addr, @port) $log.trace "changing socket options" opt = [1, @sender.send_timeout.to_i].pack('I!I!') # { int l_onoff; int l_linger; } sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt) opt = [@sender.send_timeout.to_i, 0].pack('L!L!') # struct timeval sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, opt) # TODO: SSLContext constructer parameter (SSL/TLS protocol version) $log.trace "initializing SSL contexts" context = OpenSSL::SSL::SSLContext.new # TODO: context.ca_file = (ca_file_path) # TODO: context.ciphers = (SSL Shared key chiper protocols) $log.debug "trying to connect ssl session", :host => @host, :ipaddr => addr, :port => @port sslsession = OpenSSL::SSL::SSLSocket.new(sock, context) # TODO: check connection failure sslsession.connect $log.debug "ssl session connected", :host => @host, :port => @port begin unless @sender.allow_self_signed_certificate $log.debug "checking peer's certificate", :subject => sslsession.peer_cert.subject sslsession.post_connection_check(@hostlabel) verify = sslsession.verify_result if verify != OpenSSL::X509::V_OK err_name = Fluent::SecureForwardOutput::OpenSSLUtil.verify_result_name(verify) $log.warn "failed to verify certification while connecting host #{@host} as #{@hostlabel} (but not raised, why?)" $log.warn "verify_result: #{err_name}" raise RuntimeError, "failed to verify certification while connecting host #{@host} as #{@hostlabel}" end end rescue OpenSSL::SSL::SSLError => e $log.warn "failed to verify certification while connecting ssl session", :host => @host, :hostlabel => @hostlabel self.shutdown raise end $log.debug "ssl sessison connected", :host => @host, :port => @port @socket = sock @sslsession = sslsession buf = '' read_length = @sender.read_length read_interval = @sender.read_interval socket_interval = @sender.socket_interval loop do break if @detach begin while @sslsession.read_nonblock(read_length, buf) if buf == '' sleep read_interval next end @unpacker.feed_each(buf, &method(:on_read)) buf = '' end rescue OpenSSL::SSL::SSLError # to wait i/o restart sleep socket_interval rescue EOFError $log.warn "disconnected from #{@host}" break end end self.shutdown end |
#dup ⇒ Object
46 47 48 49 50 51 52 53 54 |
# File 'lib/fluent/plugin/output_node.rb', line 46 def dup renewed = self.class.new( @sender, @shared_key, {'host' => @host, 'port' => @port, 'hostlabel' => @hostlabel, 'username' => @username, 'password' => @password} ) renewed.keepalive = @keepalive if @keepalive renewed end |
#established? ⇒ Boolean
84 85 86 |
# File 'lib/fluent/plugin/output_node.rb', line 84 def established? @state == :established end |
#expired? ⇒ Boolean
88 89 90 91 92 93 94 |
# File 'lib/fluent/plugin/output_node.rb', line 88 def expired? if @keepalive.nil? || @keepalive == 0 false else @expire && @expire < Time.now end end |
#generate_ping ⇒ Object
112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/fluent/plugin/output_node.rb', line 112 def generate_ping $log.debug "generating ping" # ['PING', self_hostname, sharedkey\_salt, sha512\_hex(sharedkey\_salt + self_hostname + shared_key), # username || '', sha512\_hex(auth\_salt + username + password) || ''] shared_key_hexdigest = Digest::SHA512.new.update(@shared_key_salt).update(@sender.self_hostname).update(@shared_key).hexdigest ping = ['PING', @sender.self_hostname, @shared_key_salt, shared_key_hexdigest] if @authentication != '' password_hexdigest = Digest::SHA512.new.update(@authentication).update(@username).update(@password).hexdigest ping.push(@username, password_hexdigest) else ping.push('','') end ping end |
#generate_salt ⇒ Object
96 97 98 |
# File 'lib/fluent/plugin/output_node.rb', line 96 def generate_salt OpenSSL::Random.random_bytes(16) end |
#join ⇒ Object
80 81 82 |
# File 'lib/fluent/plugin/output_node.rb', line 80 def join @thread && @thread.join end |
#on_read(data) ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/fluent/plugin/output_node.rb', line 152 def on_read(data) $log.debug "on_read" if self.established? #TODO: ACK $log.warn "unknown packets arrived..." return end case @state when :helo unless check_helo(data) $log.warn "received invalid helo message from #{@host}" self.shutdown return end send_data generate_ping() @state = :pingpong when :pingpong success, reason = check_pong(data) unless success $log.warn "connection refused to #{@host}:" + reason self.shutdown return end $log.info "connection established to #{@host}" if @first_session @state = :established @expire = Time.now + @keepalive if @keepalive && @keepalive > 0 $log.debug "connection established", :host => @host, :port => @port, :expire => @expire end end |
#send_data(data) ⇒ Object
148 149 150 |
# File 'lib/fluent/plugin/output_node.rb', line 148 def send_data(data) @sslsession.write data.to_msgpack end |
#shutdown ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/fluent/plugin/output_node.rb', line 60 def shutdown $log.debug "shutting down node #{@host}" @state = :closed if @thread == Thread.current @sslsession.close if @sslsession @socket.close if @socket @thread.kill else if @thread @thread.kill @thread.join end @sslsession.close if @sslsession @socket.close if @socket end rescue => e $log.debug "error on node shutdown #{e.class}:#{e.}" end |
#start ⇒ Object
56 57 58 |
# File 'lib/fluent/plugin/output_node.rb', line 56 def start @thread = Thread.new(&method(:connect)) end |