Class: Fluent::SecureForwardInput::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/input_session.rb

Overview

require ‘resolv’

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(receiver, socket) ⇒ Session

Returns a new instance of Session.



11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/fluent/plugin/input_session.rb', line 11

def initialize(receiver, socket)
  @receiver = receiver

  @state = :helo

  @socket = socket
  @socket.sync = true

  @ipaddress = nil
  @node = nil
  @unpacker = MessagePack::Unpacker.new
  @thread = Thread.new(&method(:start))
end

Instance Attribute Details

#auth_saltObject

Returns the value of attribute auth_salt.



9
10
11
# File 'lib/fluent/plugin/input_session.rb', line 9

def auth_salt
  @auth_salt
end

#nodeObject

Returns the value of attribute node.



9
10
11
# File 'lib/fluent/plugin/input_session.rb', line 9

def node
  @node
end

#receiverObject

Returns the value of attribute receiver.



8
9
10
# File 'lib/fluent/plugin/input_session.rb', line 8

def receiver
  @receiver
end

#socketObject

Returns the value of attribute socket.



9
10
11
# File 'lib/fluent/plugin/input_session.rb', line 9

def socket
  @socket
end

#stateObject

Returns the value of attribute state.



9
10
11
# File 'lib/fluent/plugin/input_session.rb', line 9

def state
  @state
end

#threadObject

Returns the value of attribute thread.



9
10
11
# File 'lib/fluent/plugin/input_session.rb', line 9

def thread
  @thread
end

#unpackerObject

Returns the value of attribute unpacker.



9
10
11
# File 'lib/fluent/plugin/input_session.rb', line 9

def unpacker
  @unpacker
end

Instance Method Details

#check_node(ipaddress) ⇒ Object



37
38
39
40
41
42
43
44
45
46
# File 'lib/fluent/plugin/input_session.rb', line 37

def check_node(ipaddress)
  node = nil
  @receiver.nodes.each do |n|
    if n[:address].include?(ipaddress)
      node = n
      break
    end
  end
  node
end

#check_ping(message) ⇒ Object



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/fluent/plugin/input_session.rb', line 64

def check_ping(message)
  log.debug "checking ping"
  # ['PING', self_hostname, shared_key\_salt, sha512\_hex(shared_key\_salt + self_hostname + shared_key),
  #  username || '', sha512\_hex(auth\_salt + username + password) || '']
  unless message.size == 6 && message[0] == 'PING'
    return false, 'invalid ping message'
  end
  ping, hostname, shared_key_salt, shared_key_hexdigest, username, password_digest = message

  shared_key = if @node && @node[:shared_key]
                 @node[:shared_key]
               else
                 @receiver.shared_key
               end
  serverside = Digest::SHA512.new.update(shared_key_salt).update(hostname).update(shared_key).hexdigest
  if shared_key_hexdigest != serverside
    log.warn "Shared key mismatch from '#{hostname}'"
    return false, 'shared_key mismatch'
  end

  if @receiver.authentication
    users = @receiver.select_authenticate_users(@node, username)
    success = false
    users.each do |user|
      passhash = Digest::SHA512.new.update(@auth_key_salt).update(username).update(user[:password]).hexdigest
      success ||= (passhash == password_digest)
    end
    unless success
      log.warn "Authentication failed from client '#{hostname}', username '#{username}'"
      return false, 'username/password mismatch'
    end
  end

  return true, shared_key_salt
end

#established?Boolean

Returns:

  • (Boolean)


29
30
31
# File 'lib/fluent/plugin/input_session.rb', line 29

def established?
  @state == :established
end

#generate_heloObject

not implemented yet def check_hostname_reverse_lookup(ipaddress)

rev_name = Resolv.getname(ipaddress)
proto, port, host, ipaddr, family_num, socktype_num, proto_num = Socket.getaddrinfo(rev_name, DUMMY_PORT)
unless ipaddr == ipaddress
  return false
end
true

end



58
59
60
61
62
# File 'lib/fluent/plugin/input_session.rb', line 58

def generate_helo
  log.debug "generating helo"
  # ['HELO', options(hash)]
  [ 'HELO', {'auth' => (@receiver.authentication ? @auth_key_salt : ''), 'keepalive' => @receiver.allow_keepalive } ]
end

#generate_pong(auth_result, reason_or_salt) ⇒ Object



100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/fluent/plugin/input_session.rb', line 100

def generate_pong(auth_result, reason_or_salt)
  log.debug "generating pong"
  # ['PONG', bool(authentication result), 'reason if authentication failed',
  #  self_hostname, sha512\_hex(salt + self_hostname + sharedkey)]
  if not auth_result
    return ['PONG', false, reason_or_salt, '', '']
  end

  shared_key = if @node && @node[:shared_key]
                 @node[:shared_key]
               else
                 @receiver.shared_key
               end
  shared_key_hex = Digest::SHA512.new.update(reason_or_salt).update(@receiver.self_hostname).update(shared_key).hexdigest
  [ 'PONG', true, '', @receiver.self_hostname, shared_key_hex ]
end

#generate_saltObject



33
34
35
# File 'lib/fluent/plugin/input_session.rb', line 33

def generate_salt
  OpenSSL::Random.random_bytes(16)
end

#logObject



25
26
27
# File 'lib/fluent/plugin/input_session.rb', line 25

def log
  @receiver.log
end

#on_read(data) ⇒ Object



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/fluent/plugin/input_session.rb', line 117

def on_read(data)
  log.debug "on_read"
  if self.established?
    @receiver.on_message(data)
  end

  case @state
  when :pingpong
    success, reason_or_salt = self.check_ping(data)
    if not success
      send_data generate_pong(false, reason_or_salt)
      self.shutdown
      return
    end
    send_data generate_pong(true, reason_or_salt)

    log.debug "connection established"
    @state = :established
  end
end

#send_data(data) ⇒ Object



138
139
140
141
# File 'lib/fluent/plugin/input_session.rb', line 138

def send_data(data)
  # not nonblock because write data (response) needs sequence
  @socket.write data.to_msgpack
end

#shutdownObject



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
# File 'lib/fluent/plugin/input_session.rb', line 199

def shutdown
  @state = :closed
  if @thread == Thread.current
    @socket.close
    @thread.kill
  else
    if @thread
      @thread.kill
      @thread.join
    end
    @socket.close
  end
rescue => e
  log.debug "#{e.class}:#{e.message}"
end

#startObject



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/fluent/plugin/input_session.rb', line 143

def start
  log.debug "starting server"

  log.trace "accepting ssl session"
  begin
    @socket.accept
  rescue OpenSSL::SSL::SSLError => e
    log.debug "failed to establish ssl session"
    self.shutdown
    return
  end

  proto, port, host, ipaddr = @socket.io.peeraddr
  @node = check_node(ipaddr)
  if @node.nil? && (! @receiver.allow_anonymous_source)
    log.warn "Connection required from unknown host '#{host}' (#{ipaddr}), disconnecting..."
    self.shutdown
    return
  end

  @auth_key_salt = generate_salt

  buf = ''
  read_length = @receiver.read_length
  read_interval = @receiver.read_interval
  socket_interval = @receiver.socket_interval

  send_data generate_helo()
  @state = :pingpong

  loop do
    begin
      while @socket.read_nonblock(read_length, buf)
        if buf == ''
          sleep read_interval
          next
        end
        @unpacker.feed_each(buf, &method(:on_read))
        buf = ''
      end
    rescue OpenSSL::SSL::SSLError => e
      # to wait i/o restart
      sleep socket_interval
    rescue EOFError => e
      log.debug "Connection closed from '#{host}'(#{ipaddr})"
      break
    end
  end
rescue Errno::ECONNRESET => e
  # disconnected from client
rescue => e
  log.warn "unexpected error in in_secure_forward", :error_class => e.class, :error => e
ensure
  self.shutdown
end