Class: Bosh::Agent::Handler

Inherits:
Object show all
Includes:
Exec
Defined in:
lib/bosh_agent/handler.rb

Direct Known Subclasses

HTTPHandler

Constant Summary collapse

MAX_NATS_RETRIES =
10
NATS_RECONNECT_SLEEP =
0.5
KILL_AGENT_THREAD_TIMEOUT_ON_ERRORS =

Seconds until we kill the agent so it can be restarted:

15
KILL_AGENT_THREAD_TIMEOUT_ON_RESTART =

When there’s an unexpected error

1
NATS_MAX_PAYLOAD_SIZE =
1024 * 1024

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeHandler

When we force a restart



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/bosh_agent/handler.rb', line 22

def initialize
  @agent_id  = Config.agent_id
  @logger    = Config.logger
  @nats_uri  = Config.mbus
  @base_dir  = Config.base_dir

  # Alert processing
  @process_alerts = Config.process_alerts
  @smtp_user      = Config.smtp_user
  @smtp_password  = Config.smtp_password
  @smtp_port      = Config.smtp_port

  @hbp = Bosh::Agent::HeartbeatProcessor.new

  @lock = Mutex.new

  @results = []
  @long_running_agent_task = []
  @restarting_agent = false

  @nats_fail_count = 0

  @credentials = Config.credentials
  @sessions = {}
  @session_reply_map = {}

  find_message_processors
end

Instance Attribute Details

#natsObject

Returns the value of attribute nats.



8
9
10
# File 'lib/bosh_agent/handler.rb', line 8

def nats
  @nats
end

#processorsObject (readonly)

Returns the value of attribute processors.



9
10
11
# File 'lib/bosh_agent/handler.rb', line 9

def processors
  @processors
end

Class Method Details

.startObject



11
12
13
# File 'lib/bosh_agent/handler.rb', line 11

def self.start
  new.start
end

Instance Method Details

#decrypt(msg) ⇒ Object



332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# File 'lib/bosh_agent/handler.rb', line 332

def decrypt(msg)
  [ "session_id", "encrypted_data" ].each do |key|
    unless msg.key?(key)
      @logger.info("Missing #{key} in #{msg}")
      return
    end
  end

  message_session_id = msg["session_id"]
  reply_to = msg["reply_to"]

  encryption_handler = lookup_encryption_handler(:session_id => message_session_id)

  # save message handler for the reply
  @session_reply_map[reply_to] = encryption_handler

  # Log exceptions from the EncryptionHandler, but stay quiet on the wire.
  begin
    msg = encryption_handler.decrypt(msg["encrypted_data"])
  rescue Bosh::Core::EncryptionHandler::CryptError => e
    log_encryption_error(e)
    return
  end

  msg["reply_to"] = reply_to

  @logger.info("Decrypted Message: #{msg}")
  msg
end

#encrypt(reply_to, payload) ⇒ Object



366
367
368
369
370
371
372
373
374
375
376
# File 'lib/bosh_agent/handler.rb', line 366

def encrypt(reply_to, payload)
  encryption_handler = lookup_encryption_handler(:reply_to => reply_to)
  session_id = encryption_handler.session_id

  payload = {
    "session_id" => session_id,
    "encrypted_data" => encryption_handler.encrypt(payload)
  }

  payload
end

#find_message_processorsObject



51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/bosh_agent/handler.rb', line 51

def find_message_processors
  message_consts = Bosh::Agent::Message.constants
  @processors = {}
  message_consts.each do |c|
    klazz = Bosh::Agent::Message.const_get(c)
    if klazz.respond_to?(:process)
      # CamelCase -> under_score -> downcased
      processor_key = c.to_s.gsub(/(.)([A-Z])/,'\1_\2').downcase
      @processors[processor_key] = klazz
    end
  end
  @logger.info("Message processors: #{@processors.inspect}")
end

#generate_agent_task_idObject



277
278
279
# File 'lib/bosh_agent/handler.rb', line 277

def generate_agent_task_id
  SecureRandom.uuid
end

#handle_get_task(reply_to, agent_task_id) ⇒ Object



202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/bosh_agent/handler.rb', line 202

def handle_get_task(reply_to, agent_task_id)
  if @long_running_agent_task == [agent_task_id]
    publish(reply_to, {"value" => {"state" => "running", "agent_task_id" => agent_task_id}})
  else
    rs = @results.find { |time, task_id, result| task_id == agent_task_id }
    if rs
      time, task_id, result = rs
      publish(reply_to, result)
    else
      publish(reply_to, {"exception" => "unknown agent_task_id" })
    end
  end
end

#handle_message(json) ⇒ Object



129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/bosh_agent/handler.rb', line 129

def handle_message(json)
  msg = Yajl::Parser.new.parse(json)

  unless msg["reply_to"]
    @logger.info("Missing reply_to in: #{msg}")
    return
  end

  @logger.info("Message: #{msg.inspect}")

  if @credentials
    msg = decrypt(msg)
    return if msg.nil?
  end

  reply_to = msg['reply_to']
  method = msg['method']
  args = msg['arguments']

  if method == "get_state"
    method = "state"
  end

  processor = lookup(method)
  if processor
    EM.defer do
      process_in_thread(processor, reply_to, method, args)
    end
  elsif method == "get_task"
    handle_get_task(reply_to, args.first)
  elsif method == "shutdown"
    handle_shutdown(reply_to)
  else
    re = RemoteException.new("unknown message #{msg.inspect}")
    publish(reply_to, re.to_hash)
  end
rescue Yajl::ParseError => e
  @logger.info("Failed to parse message: #{json}: #{e.inspect}: #{e.backtrace}")
end

#handle_shutdown(reply_to) ⇒ Object



306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/bosh_agent/handler.rb', line 306

def handle_shutdown(reply_to)
  @logger.info("Shutting down #{URI.parse(Config.mbus).scheme.upcase} connection")
  payload = {:value => "shutdown"}

  if Bosh::Agent::Config.configure
    # We should never come back up again
    at_exit { `sv stop agent` }
  end

  publish(reply_to, payload) {
    shutdown
  }
end

#kill_main_thread_in(seconds) ⇒ Object



255
256
257
258
259
260
261
# File 'lib/bosh_agent/handler.rb', line 255

def kill_main_thread_in(seconds)
  @restarting_agent = true
  Thread.new do
    sleep(seconds)
    Thread.main.terminate
  end
end

#log_encryption_error(e) ⇒ Object



362
363
364
# File 'lib/bosh_agent/handler.rb', line 362

def log_encryption_error(e)
  @logger.info("Encrypton Error: #{e.inspect} #{e.backtrace.join('\n')}")
end

#lookup(method) ⇒ Object



65
66
67
# File 'lib/bosh_agent/handler.rb', line 65

def lookup(method)
  @processors[method]
end

#lookup_encryption_handler(arg) ⇒ Object



320
321
322
323
324
325
326
327
328
329
330
# File 'lib/bosh_agent/handler.rb', line 320

def lookup_encryption_handler(arg)
  if arg[:session_id]
    message_session_id = arg[:session_id]
    @sessions[message_session_id] ||= Bosh::Core::EncryptionHandler.new(@agent_id, @credentials)
    encryption_handler = @sessions[message_session_id]
    return encryption_handler
  elsif arg[:reply_to]
    reply_to = arg[:reply_to]
    @session_reply_map[reply_to]
  end
end

#on_connectObject



109
110
111
112
113
# File 'lib/bosh_agent/handler.rb', line 109

def on_connect
  subscription = "agent.#{@agent_id}"
  @nats.subscribe(subscription) { |raw_msg| handle_message(raw_msg) }
  @nats_fail_count = 0
end

#post_prepare_network_changeObject

When there’s a network change on an existing vm, director sends a prepare_network_change message to the vm

agent. After agent replies to director with a ‘true` message, the post_prepare_network_change method is called (via EM callback).

The post_prepare_network_change method will delete the udev network persistent rules, delete the agent settings and then it should restart the agent to get the new agent settings (set by director-cpi). For a simple network change (i.e. dns changes) this is enough, as when the agent is restarted it will apply the new network settings. But for other network changes (i.e. IP change), the CPI will be responsible to reboot or recreate the vm if needed.



290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/bosh_agent/handler.rb', line 290

def post_prepare_network_change
  if Bosh::Agent::Config.configure
    udev_file = '/etc/udev/rules.d/70-persistent-net.rules'
    if File.exist?(udev_file)
      @logger.info("deleting 70-persistent-net.rules - again")
      File.delete(udev_file)
    end
    @logger.info("Removing settings.json")
    settings_file = Bosh::Agent::Config.settings_file
    File.delete(settings_file)
  end

  @logger.info("Restarting agent to prepare for a network change")
  kill_main_thread_in(KILL_AGENT_THREAD_TIMEOUT_ON_RESTART)
end

#process(processor, args) ⇒ Object



263
264
265
266
267
268
269
270
271
272
273
274
275
# File 'lib/bosh_agent/handler.rb', line 263

def process(processor, args)
  begin
    result = processor.process(args)
    return {:value => result}
  rescue Bosh::Agent::Error => e
    @logger.info("#{e.inspect}: #{e.backtrace}")
    return RemoteException.from(e).to_hash
  rescue Exception => e
    kill_main_thread_in(KILL_AGENT_THREAD_TIMEOUT_ON_ERRORS)
    @logger.error("#{e.inspect}: #{e.backtrace}")
    return {:exception => "#{e.inspect}: #{e.backtrace}"}
  end
end

#process_in_thread(processor, reply_to, method, args) ⇒ Object



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/bosh_agent/handler.rb', line 169

def process_in_thread(processor, reply_to, method, args)
  if processor.respond_to?(:long_running?)
    if @restarting_agent
      exception = RemoteException.new("restarting agent")
      publish(reply_to, exception.to_hash)
    else
      @lock.synchronize do
        if @long_running_agent_task.empty?
          process_long_running(reply_to, processor, args)
        else
          exception = RemoteException.new("already running long running task")
          publish(reply_to, exception.to_hash)
        end
      end
    end
  else
    payload = process(processor, args)

    if Config.configure && method == 'prepare_network_change'
      publish(reply_to, payload) {
        post_prepare_network_change
      }
    else
      publish(reply_to, payload)
    end

  end
rescue => e
  # since this is running in a thread we're going to be nice and
  # log an error as this would otherwise be lost
  @logger.error("#{processor.to_s}: #{e.message}\n#{e.backtrace.join("\n")}")
end

#process_long_running(reply_to, processor, args) ⇒ Object



242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/bosh_agent/handler.rb', line 242

def process_long_running(reply_to, processor, args)
  agent_task_id = generate_agent_task_id

  @long_running_agent_task = [agent_task_id]

  payload = {:value => {:state => "running", :agent_task_id => agent_task_id}}
  publish(reply_to, payload)

  result = process(processor, args)
  @results << [Time.now.to_i, agent_task_id, result]
  @long_running_agent_task = []
end

#publish(reply_to, payload, &blk) ⇒ Object



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/bosh_agent/handler.rb', line 218

def publish(reply_to, payload, &blk)
  @logger.info("reply_to: #{reply_to}: payload: #{payload.inspect}")

  unencrypted = payload
  if @credentials
    payload = encrypt(reply_to, payload)
  end

  json = Yajl::Encoder.encode(payload)

  if json.bytesize < NATS_MAX_PAYLOAD_SIZE
    EM.next_tick do
      @nats.publish(reply_to, json, &blk)
    end
  else
    msg = "message > NATS_MAX_PAYLOAD, stored in blobstore"
    exception = RemoteException.new(msg, nil, unencrypted)
    @logger.fatal(msg)
    EM.next_tick do
      @nats.publish(reply_to, exception.to_hash, &blk)
    end
  end
end

#setup_heartbeatsObject



115
116
117
118
119
120
121
122
123
# File 'lib/bosh_agent/handler.rb', line 115

def setup_heartbeats
  interval = Config.heartbeat_interval.to_i
  if interval > 0
    @hbp.enable(interval)
    @logger.info("Heartbeats are enabled and will be sent every #{interval} seconds")
  else
    @logger.warn("Heartbeats are disabled")
  end
end

#setup_syslog_monitorObject



125
126
127
# File 'lib/bosh_agent/handler.rb', line 125

def setup_syslog_monitor
  Bosh::Agent::SyslogMonitor.start(@nats, @agent_id)
end

#shutdownObject



104
105
106
107
# File 'lib/bosh_agent/handler.rb', line 104

def shutdown
  @logger.info("Exit")
  NATS.stop { EM.stop; exit }
end

#startObject



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/bosh_agent/handler.rb', line 69

def start
  ['TERM', 'INT', 'QUIT'].each { |s| trap(s) { shutdown } }

  EM.run do
    begin
      @nats = NATS.connect(:uri => @nats_uri, :autostart => false) { on_connect }
      Config.nats = @nats
    rescue Errno::ENETUNREACH, Timeout::Error => e
      @logger.info("Unable to talk to nats - retry (#{e.inspect})")
      sleep 0.1
      retry
    end

    setup_heartbeats

    if @process_alerts
      if (@smtp_port.nil? || @smtp_user.nil? || @smtp_password.nil?)
        @logger.error "Cannot start alert processor without having SMTP port, user and password configured"
        @logger.error "Agent will be running but alerts will NOT be properly processed"
      else
        @logger.debug("SMTP: #{@smtp_password}")
        @processor = Bosh::Agent::AlertProcessor.start("127.0.0.1", @smtp_port, @smtp_user, @smtp_password)
        setup_syslog_monitor
      end
    end
  end
rescue NATS::ConnectError => e
  @nats_fail_count += 1
  @logger.error("NATS connection error: #{e.message}")
  sleep NATS_RECONNECT_SLEEP
  # only retry a few times and then exit which lets the agent recover if we change credentials
  retry if @nats_fail_count < MAX_NATS_RETRIES
  @logger.fatal("Unable to reconnect to NATS after #{MAX_NATS_RETRIES} retries, exiting...")
end