Class: Fluent::SecureForwardInput::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/input_session.rb

Overview

require ‘resolv’

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(receiver, socket) ⇒ Session

Returns a new instance of Session.



11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/fluent/plugin/input_session.rb', line 11

def initialize(receiver, socket)
  @receiver = receiver

  @state = :helo

  @socket = socket
  @socket.sync = true

  @ipaddress = nil
  @node = nil
  @unpacker = MessagePack::Unpacker.new
  @thread = Thread.new(&method(:start))
end

Instance Attribute Details

#auth_saltObject

Returns the value of attribute auth_salt.



9
10
11
# File 'lib/fluent/plugin/input_session.rb', line 9

def auth_salt
  @auth_salt
end

#nodeObject

Returns the value of attribute node.



9
10
11
# File 'lib/fluent/plugin/input_session.rb', line 9

def node
  @node
end

#receiverObject

Returns the value of attribute receiver.



8
9
10
# File 'lib/fluent/plugin/input_session.rb', line 8

def receiver
  @receiver
end

#socketObject

Returns the value of attribute socket.



9
10
11
# File 'lib/fluent/plugin/input_session.rb', line 9

def socket
  @socket
end

#stateObject

Returns the value of attribute state.



9
10
11
# File 'lib/fluent/plugin/input_session.rb', line 9

def state
  @state
end

#threadObject

Returns the value of attribute thread.



9
10
11
# File 'lib/fluent/plugin/input_session.rb', line 9

def thread
  @thread
end

#unpackerObject

Returns the value of attribute unpacker.



9
10
11
# File 'lib/fluent/plugin/input_session.rb', line 9

def unpacker
  @unpacker
end

Instance Method Details

#check_node(ipaddress) ⇒ Object



41
42
43
44
45
46
47
48
49
50
# File 'lib/fluent/plugin/input_session.rb', line 41

def check_node(ipaddress)
  node = nil
  @receiver.nodes.each do |n|
    if n[:address].include?(ipaddress)
      node = n
      break
    end
  end
  node
end

#check_ping(message) ⇒ Object



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/fluent/plugin/input_session.rb', line 68

def check_ping(message)
  log.debug "checking ping"
  # ['PING', self_hostname, shared_key\_salt, sha512\_hex(shared_key\_salt + self_hostname + nonce + shared_key),
  #  username || '', sha512\_hex(auth\_salt + username + password) || '']
  unless message.size == 6 && message[0] == 'PING'
    return false, 'invalid ping message'
  end
  _ping, hostname, shared_key_salt, shared_key_hexdigest, username, password_digest = message

  shared_key = if @node && @node[:shared_key]
                 @node[:shared_key]
               else
                 @receiver.shared_key
               end
  serverside = Digest::SHA512.new.update(shared_key_salt).update(hostname).update(@shared_key_nonce).update(shared_key).hexdigest
  if shared_key_hexdigest != serverside
    log.warn "Shared key mismatch from '#{hostname}'"
    return false, 'shared_key mismatch'
  end

  if @receiver.authentication
    users = @receiver.select_authenticate_users(@node, username)
    success = false
    users.each do |user|
      passhash = Digest::SHA512.new.update(@auth_key_salt).update(username).update(user[:password]).hexdigest
      success ||= (passhash == password_digest)
    end
    unless success
      log.warn "Authentication failed from client '#{hostname}', username '#{username}'"
      return false, 'username/password mismatch'
    end
  end

  return true, shared_key_salt
end

#closed?Boolean

Returns:

  • (Boolean)


29
30
31
# File 'lib/fluent/plugin/input_session.rb', line 29

def closed?
  @state == :closed
end

#established?Boolean

Returns:

  • (Boolean)


33
34
35
# File 'lib/fluent/plugin/input_session.rb', line 33

def established?
  @state == :established
end

#generate_heloObject

not implemented yet def check_hostname_reverse_lookup(ipaddress)

rev_name = Resolv.getname(ipaddress)
proto, port, host, ipaddr, family_num, socktype_num, proto_num = Socket.getaddrinfo(rev_name, DUMMY_PORT)
unless ipaddr == ipaddress
  return false
end
true

end



62
63
64
65
66
# File 'lib/fluent/plugin/input_session.rb', line 62

def generate_helo
  log.debug "generating helo"
  # ['HELO', options(hash)]
  [ 'HELO', {'nonce' => @shared_key_nonce, 'auth' => (@receiver.authentication ? @auth_key_salt : ''), 'keepalive' => @receiver.allow_keepalive } ]
end

#generate_pong(auth_result, reason_or_salt) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/fluent/plugin/input_session.rb', line 104

def generate_pong(auth_result, reason_or_salt)
  log.debug "generating pong"
  # ['PONG', bool(authentication result), 'reason if authentication failed',
  #  self_hostname, sha512\_hex(salt + self_hostname + nonce + sharedkey)]
  if not auth_result
    return ['PONG', false, reason_or_salt, '', '']
  end

  shared_key = if @node && @node[:shared_key]
                 @node[:shared_key]
               else
                 @receiver.shared_key
               end
  shared_key_hex = Digest::SHA512.new.update(reason_or_salt).update(@receiver.self_hostname).update(@shared_key_nonce).update(shared_key).hexdigest
  [ 'PONG', true, '', @receiver.self_hostname, shared_key_hex ]
end

#generate_saltObject



37
38
39
# File 'lib/fluent/plugin/input_session.rb', line 37

def generate_salt
  OpenSSL::Random.random_bytes(16)
end

#logObject



25
26
27
# File 'lib/fluent/plugin/input_session.rb', line 25

def log
  @receiver.log
end

#on_read(data) ⇒ Object



121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/fluent/plugin/input_session.rb', line 121

def on_read(data)
  log.debug "on_read"
  if self.established?
    @receiver.on_message(data)
  end

  case @state
  when :pingpong
    success, reason_or_salt = self.check_ping(data)
    if not success
      send_data generate_pong(false, reason_or_salt)
      self.shutdown
      return
    end
    send_data generate_pong(true, reason_or_salt)

    log.debug "connection established"
    @state = :established
  end
end

#send_data(data) ⇒ Object



142
143
144
145
# File 'lib/fluent/plugin/input_session.rb', line 142

def send_data(data)
  # not nonblock because write data (response) needs sequence
  @socket.write data.to_msgpack
end

#shutdownObject



205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/fluent/plugin/input_session.rb', line 205

def shutdown
  @state = :closed
  log.debug "Shutdown called"
  @socket.close
  if @thread == Thread.current
    @thread.kill
  else
    if @thread
      @thread.kill
      @thread.join
    end
  end
rescue => e
  log.debug "#{e.class}:#{e.message}"
end

#startObject



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/fluent/plugin/input_session.rb', line 147

def start
  log.debug "starting server"

  log.trace "accepting ssl session"
  begin
    @socket.accept
  rescue OpenSSL::SSL::SSLError => e
    log.debug "failed to establish ssl session", error_class: e.class, error: e
    self.shutdown
    return
  end

  _proto, port, host, ipaddr = @socket.io.peeraddr
  @node = check_node(ipaddr)
  if @node.nil? && (! @receiver.allow_anonymous_source)
    log.warn "Connection required from unknown host '#{host}' (#{ipaddr}), disconnecting..."
    self.shutdown
    return
  end

  @shared_key_nonce = generate_salt
  @auth_key_salt = generate_salt

  buf = ''
  read_length = @receiver.read_length
  read_interval = @receiver.read_interval
  socket_interval = @receiver.socket_interval

  send_data generate_helo()
  @state = :pingpong

  loop do
    begin
      while @socket.read_nonblock(read_length, buf)
        if buf == ''
          sleep read_interval
          next
        end
        @unpacker.feed_each(buf, &method(:on_read))
        buf = ''
      end
    rescue OpenSSL::SSL::SSLError => e
      # to wait i/o restart
      sleep socket_interval
    rescue EOFError => e
      log.debug "Connection closed from '#{host}'(#{ipaddr})"
      break
    end
  end
rescue Errno::ECONNRESET => e
  # disconnected from client
rescue => e
  log.warn "unexpected error in in_secure_forward from #{host}:#{port}", error_class: e.class, error: e
ensure
  log.debug "Shutting down #{host}:#{port}"
  self.shutdown
end