Class: Bosh::Agent::Handler

Inherits:
Object show all
Includes:
Exec
Defined in:
lib/bosh_agent/handler.rb

Direct Known Subclasses

HTTPHandler

Constant Summary collapse

MAX_NATS_RETRIES =
10
NATS_RECONNECT_SLEEP =
0.5
KILL_AGENT_THREAD_TIMEOUT_ON_ERRORS =

Seconds until we kill the agent so it can be restarted:

15
KILL_AGENT_THREAD_TIMEOUT_ON_RESTART =

When there’s an unexpected error

1
NATS_MAX_PAYLOAD_SIZE =
1024 * 1024

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeHandler

When we force a restart



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/bosh_agent/handler.rb', line 21

def initialize
  @agent_id  = Config.agent_id
  @logger    = Config.logger
  @nats_uri  = Config.mbus
  @base_dir  = Config.base_dir

  # Alert processing
  @process_alerts = Config.process_alerts
  @smtp_user      = Config.smtp_user
  @smtp_password  = Config.smtp_password
  @smtp_port      = Config.smtp_port

  @hbp = Bosh::Agent::HeartbeatProcessor.new

  @lock = Mutex.new

  @results = []
  self.current_long_running_task = {}
  @restarting_agent = false

  @nats_fail_count = 0

  @credentials = Config.credentials
  @sessions = {}
  @session_reply_map = {}

  find_message_processors
end

Instance Attribute Details

#current_long_running_taskObject

Returns the value of attribute current_long_running_task.



6
7
8
# File 'lib/bosh_agent/handler.rb', line 6

def current_long_running_task
  @current_long_running_task
end

#natsObject

Returns the value of attribute nats.



7
8
9
# File 'lib/bosh_agent/handler.rb', line 7

def nats
  @nats
end

#processorsObject (readonly)

Returns the value of attribute processors.



8
9
10
# File 'lib/bosh_agent/handler.rb', line 8

def processors
  @processors
end

Class Method Details

.startObject



10
11
12
# File 'lib/bosh_agent/handler.rb', line 10

def self.start
  new.start
end

Instance Method Details

#current_long_running_task?(agent_task_id) ⇒ Boolean

Returns:

  • (Boolean)


403
404
405
# File 'lib/bosh_agent/handler.rb', line 403

def current_long_running_task?(agent_task_id)
  current_long_running_task[:task_id] == agent_task_id
end

#decrypt(msg) ⇒ Object



357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
# File 'lib/bosh_agent/handler.rb', line 357

def decrypt(msg)
  %w(session_id encrypted_data).each do |key|
    unless msg.key?(key)
      @logger.info("Missing #{key} in #{msg}")
      return
    end
  end

  message_session_id = msg['session_id']
  reply_to = msg['reply_to']

  encryption_handler = lookup_encryption_handler(session_id: message_session_id)

  # save message handler for the reply
  @session_reply_map[reply_to] = encryption_handler

  # Log exceptions from the EncryptionHandler, but stay quiet on the wire.
  begin
    msg = encryption_handler.decrypt(msg['encrypted_data'])
  rescue Bosh::Core::EncryptionHandler::CryptError => e
    log_encryption_error(e)
    return
  end

  msg['reply_to'] = reply_to

  @logger.info("Decrypted Message: #{msg}")
  msg
end

#encrypt(reply_to, payload) ⇒ Object



391
392
393
394
395
396
397
398
399
400
401
# File 'lib/bosh_agent/handler.rb', line 391

def encrypt(reply_to, payload)
  encryption_handler = lookup_encryption_handler(reply_to: reply_to)
  session_id = encryption_handler.session_id

  payload = {
    'session_id' => session_id,
    'encrypted_data' => encryption_handler.encrypt(payload)
  }

  payload
end

#find_message_processorsObject



50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/bosh_agent/handler.rb', line 50

def find_message_processors
  message_consts = Bosh::Agent::Message.constants
  @processors = {}
  message_consts.each do |c|
    klazz = Bosh::Agent::Message.const_get(c)
    if klazz.respond_to?(:process)
      # CamelCase -> under_score -> downcased
      processor_key = c.to_s.gsub(/(.)([A-Z])/, '\1_\2').downcase
      @processors[processor_key] = klazz
    end
  end
  @logger.info("Message processors: #{@processors.inspect}")
end

#generate_agent_task_idObject



301
302
303
# File 'lib/bosh_agent/handler.rb', line 301

def generate_agent_task_id
  SecureRandom.uuid
end

#handle_cancel_task(reply_to, agent_task_id) ⇒ Object

rubocop:enable MethodLength



212
213
214
215
216
217
218
219
220
221
222
223
224
# File 'lib/bosh_agent/handler.rb', line 212

def handle_cancel_task(reply_to, agent_task_id)
  if current_long_running_task?(agent_task_id)
    if current_long_running_task[:processor].respond_to?(:cancel)
      current_long_running_task[:processor].cancel
      publish(reply_to, { 'value' => 'canceled' })
      self.current_long_running_task = {}
    else
      publish(reply_to, { 'exception' => "could not cancel task #{agent_task_id}" })
    end
  else
    publish(reply_to, { 'exception' => 'unknown agent_task_id' })
  end
end

#handle_get_task(reply_to, agent_task_id) ⇒ Object



226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/bosh_agent/handler.rb', line 226

def handle_get_task(reply_to, agent_task_id)
  if current_long_running_task?(agent_task_id)
    publish(reply_to, { 'value' => { 'state' => 'running', 'agent_task_id' => agent_task_id } })
  else
    rs = @results.find { |time, task_id, result| task_id == agent_task_id }
    if rs
      _, _, result = rs
      publish(reply_to, result)
    else
      publish(reply_to, { 'exception' => 'unknown agent_task_id' })
    end
  end
end

#handle_message(json) ⇒ Object

rubocop:disable MethodLength



134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/bosh_agent/handler.rb', line 134

def handle_message(json)
  msg = Yajl::Parser.new.parse(json)

  unless msg['reply_to']
    @logger.info("Missing reply_to in: #{msg}")
    return
  end

  @logger.info("Message: #{msg.inspect}")

  if @credentials
    msg = decrypt(msg)
    return if msg.nil?
  end

  reply_to = msg['reply_to']
  method = msg['method']
  args = msg['arguments']

  if method == 'get_state'
    method = 'state'
  end

  processor = lookup(method)
  if processor
    EM.defer do
      process_in_thread(processor, reply_to, method, args)
    end
  elsif method == 'cancel_task'
    handle_cancel_task(reply_to, args.first)
  elsif method == 'get_task'
    handle_get_task(reply_to, args.first)
  elsif method == 'shutdown'
    handle_shutdown(reply_to)
  else
    re = RemoteException.new("unknown message #{msg.inspect}")
    publish(reply_to, re.to_hash)
  end
rescue Yajl::ParseError => e
  @logger.info("Failed to parse message: #{json}: #{e.inspect}: #{e.backtrace}")
end

#handle_shutdown(reply_to) ⇒ Object



331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'lib/bosh_agent/handler.rb', line 331

def handle_shutdown(reply_to)
  @logger.info("Shutting down #{URI.parse(Config.mbus).scheme.upcase} connection")
  payload = { value: 'shutdown' }

  if Bosh::Agent::Config.configure
    # We should never come back up again
    at_exit { `sv stop agent` }
  end

  publish(reply_to, payload) {
    shutdown
  }
end

#kill_main_thread_in(seconds) ⇒ Object



279
280
281
282
283
284
285
# File 'lib/bosh_agent/handler.rb', line 279

def kill_main_thread_in(seconds)
  @restarting_agent = true
  Thread.new do
    sleep(seconds)
    Thread.main.terminate
  end
end

#log_encryption_error(e) ⇒ Object



387
388
389
# File 'lib/bosh_agent/handler.rb', line 387

def log_encryption_error(e)
  @logger.info("Encrypton Error: #{e.inspect} #{e.backtrace.join('\n')}")
end

#lookup(method) ⇒ Object



64
65
66
# File 'lib/bosh_agent/handler.rb', line 64

def lookup(method)
  @processors[method]
end

#lookup_encryption_handler(arg) ⇒ Object



345
346
347
348
349
350
351
352
353
354
355
# File 'lib/bosh_agent/handler.rb', line 345

def lookup_encryption_handler(arg)
  if arg[:session_id]
    message_session_id = arg[:session_id]
    @sessions[message_session_id] ||= Bosh::Core::EncryptionHandler.new(@agent_id, @credentials)
    encryption_handler = @sessions[message_session_id]
    return encryption_handler
  elsif arg[:reply_to]
    reply_to = arg[:reply_to]
    @session_reply_map[reply_to]
  end
end

#on_connectObject



113
114
115
116
117
# File 'lib/bosh_agent/handler.rb', line 113

def on_connect
  subscription = "agent.#{@agent_id}"
  @nats.subscribe(subscription) { |raw_msg| handle_message(raw_msg) }
  @nats_fail_count = 0
end

#post_prepare_network_changeObject

When there’s a network change on an existing vm, director sends a prepare_network_change message to the vm agent. After agent replies to director with a ‘true` message, the post_prepare_network_change method is called (via EM callback).

The post_prepare_network_change method will delete the udev network persistent rules, delete the agent settings and then it should restart the agent to get the new agent settings (set by director-cpi). For a simple network change (i.e. dns changes) this is enough, as when the agent is restarted it will apply the new network settings. But for other network changes (i.e. IP change), the CPI will be responsible to reboot or recreate the vm if needed.



315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# File 'lib/bosh_agent/handler.rb', line 315

def post_prepare_network_change
  if Bosh::Agent::Config.configure
    udev_file = '/etc/udev/rules.d/70-persistent-net.rules'
    if File.exist?(udev_file)
      @logger.info('deleting 70-persistent-net.rules - again')
      File.delete(udev_file)
    end
    @logger.info('Removing settings.json')
    settings_file = Bosh::Agent::Config.settings_file
    File.delete(settings_file)
  end

  @logger.info('Restarting agent to prepare for a network change')
  kill_main_thread_in(KILL_AGENT_THREAD_TIMEOUT_ON_RESTART)
end

#process(processor, args) ⇒ Object



287
288
289
290
291
292
293
294
295
296
297
298
299
# File 'lib/bosh_agent/handler.rb', line 287

def process(processor, args)
  result = processor.process(args)
  return { value: result }
rescue Bosh::Agent::Error => e
  @logger.info("#{e.inspect}: #{e.backtrace}")
  return RemoteException.from(e).to_hash
# rubocop:disable RescueException
rescue Exception => e
# rubocop:enable RescueException
  kill_main_thread_in(KILL_AGENT_THREAD_TIMEOUT_ON_ERRORS)
  @logger.error("#{e.inspect}: #{e.backtrace}")
  return { exception: "#{e.inspect}: #{e.backtrace}" }
end

#process_in_thread(processor, reply_to, method, args) ⇒ Object

rubocop:disable MethodLength



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/bosh_agent/handler.rb', line 178

def process_in_thread(processor, reply_to, method, args)
  if processor.respond_to?(:long_running?)
    if @restarting_agent
      exception = RemoteException.new('restarting agent')
      publish(reply_to, exception.to_hash)
    else
      @lock.synchronize do
        if current_long_running_task[:task_id]
          exception = RemoteException.new('already running long running task')
          publish(reply_to, exception.to_hash)
        else
          process_long_running(reply_to, processor, args)
        end
      end
    end
  else
    payload = process(processor, args)

    if Config.configure && method == 'prepare_network_change'
      publish(reply_to, payload) {
        post_prepare_network_change
      }
    else
      publish(reply_to, payload)
    end

  end
rescue => e
  # since this is running in a thread we're going to be nice and
  # log an error as this would otherwise be lost
  @logger.error("#{processor.to_s}: #{e.message}\n#{e.backtrace.join("\n")}")
end

#process_long_running(reply_to, processor, args) ⇒ Object



266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/bosh_agent/handler.rb', line 266

def process_long_running(reply_to, processor, args)
  agent_task_id = generate_agent_task_id

  self.current_long_running_task = { task_id: agent_task_id, processor: processor }

  payload = { value: { state: 'running', agent_task_id: agent_task_id } }
  publish(reply_to, payload)

  result = process(processor, args)
  @results << [Time.now.to_i, agent_task_id, result]
  self.current_long_running_task = {}
end

#publish(reply_to, payload, &blk) ⇒ Object



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# File 'lib/bosh_agent/handler.rb', line 242

def publish(reply_to, payload, &blk)
  @logger.info("reply_to: #{reply_to}: payload: #{payload.inspect}")

  unencrypted = payload
  if @credentials
    payload = encrypt(reply_to, payload)
  end

  json = Yajl::Encoder.encode(payload)

  if json.bytesize < NATS_MAX_PAYLOAD_SIZE
    EM.next_tick do
      @nats.publish(reply_to, json, &blk)
    end
  else
    msg = 'message > NATS_MAX_PAYLOAD, stored in blobstore'
    exception = RemoteException.new(msg, nil, unencrypted)
    @logger.fatal(msg)
    EM.next_tick do
      @nats.publish(reply_to, Yajl::Encoder.encode(exception.to_hash), &blk)
    end
  end
end

#setup_heartbeatsObject



119
120
121
122
123
124
125
126
127
# File 'lib/bosh_agent/handler.rb', line 119

def setup_heartbeats
  interval = Config.heartbeat_interval.to_i
  if interval > 0
    @hbp.enable(interval)
    @logger.info("Heartbeats are enabled and will be sent every #{interval} seconds")
  else
    @logger.warn('Heartbeats are disabled')
  end
end

#setup_syslog_monitorObject



129
130
131
# File 'lib/bosh_agent/handler.rb', line 129

def setup_syslog_monitor
  Bosh::Agent::SyslogMonitor.start(@nats, @agent_id)
end

#shutdownObject

rubocop:enable MethodLength



105
106
107
108
109
110
111
# File 'lib/bosh_agent/handler.rb', line 105

def shutdown
  @logger.info('Exit')
  NATS.stop do
    EM.stop
    exit
  end
end

#startObject

rubocop:disable MethodLength



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/bosh_agent/handler.rb', line 69

def start
  %w(TERM INT QUIT).each { |s| trap(s) { shutdown } }

  EM.run do
    begin
      @nats = NATS.connect(uri: @nats_uri, autostart: false) { on_connect }
      Config.nats = @nats
    rescue Errno::ENETUNREACH, Timeout::Error => e
      @logger.info("Unable to talk to nats - retry (#{e.inspect})")
      sleep 0.1
      retry
    end

    setup_heartbeats

    if @process_alerts
      if @smtp_port.nil? || @smtp_user.nil? || @smtp_password.nil?
        @logger.error 'Cannot start alert processor without having SMTP port, user and password configured'
        @logger.error 'Agent will be running but alerts will NOT be properly processed'
      else
        @logger.debug("SMTP: #{@smtp_password}")
        @processor = Bosh::Agent::AlertProcessor.start('127.0.0.1', @smtp_port, @smtp_user, @smtp_password)
        setup_syslog_monitor
      end
    end
  end
rescue NATS::ConnectError => e
  @nats_fail_count += 1
  @logger.error("NATS connection error: #{e.message}")
  sleep NATS_RECONNECT_SLEEP
  # only retry a few times and then exit which lets the agent recover if we change credentials
  retry if @nats_fail_count < MAX_NATS_RETRIES
  @logger.fatal("Unable to reconnect to NATS after #{MAX_NATS_RETRIES} retries, exiting...")
end