Class: RightScale::Agent

Inherits:
Object show all
Includes:
ConsoleHelper, DaemonizeHelper
Defined in:
lib/right_agent/agent.rb

Overview

Agent for receiving requests via RightNet and acting upon them by dispatching to a registered actor to perform See load_actors for details on how the agent specific environment is loaded Operates in either HTTP or AMQP mode for RightNet communication

Constant Summary collapse

DEFAULT_OPTIONS =

Default option settings for the agent

{
  :user               => 'agent',
  :pass               => 'testing',
  :vhost              => '/right_net',
  :secure             => true,
  :log_level          => :info,
  :daemonize          => false,
  :console            => false,
  :root_dir           => Dir.pwd,
  :mode               => :amqp,
  :time_to_live       => 0,
  :retry_interval     => nil,
  :retry_timeout      => nil,
  :connect_timeout    => 60,
  :reconnect_interval => 60,
  :offline_queueing   => false,
  :ping_interval      => 0,
  :check_interval     => 5 * 60,
  :grace_timeout      => 30,
  :prefetch           => 1,
  :heartbeat          => 0
}
MAX_ABNORMAL_TERMINATE_DELAY =

Maximum abnormal termination delay for slowing crash cycling

60 * 60
TERMINATE_BLOCK =

Block to be activated when finish terminating

lambda { EM.stop if EM.reactor_running? }
TRACE_LEVEL =

Exceptions with restricted error backtrace level Value :no_trace means no backtrace and no tracking in stats or reporting to Errbit

{
  RightSupport::Net::NoResult => :no_trace,
  RightScale::HttpExceptions::RequestTimeout => :no_trace,
  RightScale::Exceptions::ConnectivityFailure => :no_trace,
  RightScale::BalancedHttpClient::NotResponding => :no_trace,
  RightAMQP::HABrokerClient::NoConnectedBrokers => :no_trace
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from DaemonizeHelper

#daemonize

Methods included from ConsoleHelper

included, #start_console

Constructor Details

#initialize(opts) ⇒ Agent

Initialize the new agent

Parameters

opts(Hash)

Configuration options per start method above

Return

true

Always return true



178
179
180
181
182
183
184
185
186
187
188
# File 'lib/right_agent/agent.rb', line 178

def initialize(opts)
  set_configuration(opts)
  @tags = []
  @tags << opts[:tag] if opts[:tag]
  @tags.flatten!
  @status_callbacks = []
  @options.freeze
  @last_stat_reset_time = Time.now
  reset_agent_stats
  true
end

Instance Attribute Details

#clientObject (readonly)

(RightHttpClient|RightAMQP::HABrokerClient) Client for accessing RightNet/RightApi



49
50
51
# File 'lib/right_agent/agent.rb', line 49

def client
  @client
end

#dispatchersObject (readonly)

(Hash) Dispatcher for each queue for messages received via AMQP



43
44
45
# File 'lib/right_agent/agent.rb', line 43

def dispatchers
  @dispatchers
end

#identityObject (readonly)

(String) Identity of this agent



37
38
39
# File 'lib/right_agent/agent.rb', line 37

def identity
  @identity
end

#modeObject (readonly)

(Symbol) RightNet communication mode: :http or :amqp



52
53
54
# File 'lib/right_agent/agent.rb', line 52

def mode
  @mode
end

#optionsObject (readonly)

(Hash) Configuration options applied to the agent



40
41
42
# File 'lib/right_agent/agent.rb', line 40

def options
  @options
end

#registryObject (readonly)

(ActorRegistry) Registry for this agents actors



46
47
48
# File 'lib/right_agent/agent.rb', line 46

def registry
  @registry
end

#request_queueObject (readonly)

(String) Name of AMQP queue to which requests are to be published



55
56
57
# File 'lib/right_agent/agent.rb', line 55

def request_queue
  @request_queue
end

#tagsObject

(Array) Tag strings published by agent



58
59
60
# File 'lib/right_agent/agent.rb', line 58

def tags
  @tags
end

Class Method Details

.start(opts = {}) ⇒ Object

Initializes a new agent and establishes an HTTP or AMQP RightNet connection This must be used inside an EM.run block unless the EventMachine reactor was already started by the server that this application runs on

Parameters

opts(Hash)

Configuration options:

:identity(String)

Identity of this agent; no default

:agent_name(String)

Local name for this agent

:root_dir(String)

Application root for this agent containing subdirectories actors, certs, and init;

  defaults to current working directory
:pid_dir(String):: Path to the directory where the agent stores its process id file (only if daemonized);
  defaults to the current working directory
:log_dir(String):: Log directory path; defaults to the platform specific log directory
:log_level(Symbol):: The verbosity of logging -- :debug, :info, :warn, :error or :fatal
:actors(Array):: List of actors to load
:console(Boolean):: true indicates to start interactive console
:daemonize(Boolean):: true indicates to daemonize
:retry_interval(Numeric):: Number of seconds between request retries
:retry_timeout(Numeric):: Maximum number of seconds to retry request before give up
:time_to_live(Integer):: Number of seconds before a request expires and is to be ignored
  by the receiver, 0 means never expire; defaults to 0
:connect_timeout(Integer):: Number of seconds to wait for an AMQP broker connection to be established
:reconnect_interval(Integer):: Number of seconds between AMQP broker reconnect attempts
:offline_queueing(Boolean):: Whether to queue request if currently not connected to RightNet,
  also requires agent invocation of Sender initialize_offline_queue and start_offline_queue methods,
  as well as enable_offline_mode and disable_offline_mode as connection status changes
:ping_interval(Integer):: Minimum number of seconds since last message receipt to ping the RightNet
  router to check connectivity; defaults to 0 meaning do not ping
:check_interval(Integer):: Number of seconds between publishing stats and checking for AMQP broker
  connections that failed during agent launch and then attempting to reconnect
:heartbeat(Integer):: Number of seconds between AMQP connection heartbeats used to keep
  connection alive (e.g., when AMQP broker is behind a firewall), nil or 0 means disable
:grace_timeout(Integer):: Maximum number of seconds to wait after last request received before
  terminating regardless of whether there are still unfinished requests
:dup_check(Boolean):: Whether to check for and reject duplicate requests, e.g., due to retries
  or redelivery by AMQP broker after server failure
:prefetch(Integer):: Maximum number of messages the AMQP broker is to prefetch for this agent
  before it receives an ack. Value 1 ensures that only last unacknowledged gets redelivered
  if the agent crashes. Value 0 means unlimited prefetch.
:airbrake_endpoint(String):: URL for Airbrake for reporting unexpected exceptions to Errbit
:airbrake_api_key(String):: Key for using the Airbrake API access to Errbit
:ready_callback(Proc):: Called once agent is connected to AMQP broker and ready for service (no argument)
:restart_callback(Proc):: Called on each restart vote with votes being initiated by offline queue
  exceeding MAX_QUEUED_REQUESTS or by repeated failures to access RightNet when online (no argument)
:services(Symbol):: List of services provided by this agent; defaults to all methods exposed by actors
:secure(Boolean):: true indicates to use security features of RabbitMQ to restrict agents to themselves
:fiber_pool_size(Integer):: Size of fiber pool
:fiber_pool(FiberPool):: Fiber pool configured for use with EventMachine when making HTTP requests
:mode(Symbol):: RightNet communication mode: :http or :amqp; defaults to :amqp
:api_url(String):: Domain name for HTTP access to RightApi server
:account_id(Integer):: Identifier for account owning this agent
:shard_id(Integer):: Identifier for database shard in which this agent is operating
:vhost(String):: AMQP broker virtual host
:user(String):: AMQP broker user
:pass(String):: AMQP broker password
:host(String):: Comma-separated list of AMQP broker hosts; if only one, it is reapplied
  to successive ports; if none; defaults to 'localhost'
:port(Integer):: Comma-separated list of AMQP broker ports corresponding to hosts; if only one,
  it is incremented and applied to successive hosts; if none, defaults to AMQP::HOST

On start config.yml is read, so it is common to specify options in the YAML file. However, when both Ruby code options and YAML file specify options, Ruby code options take precedence.

Return

agent(Agent)

New agent



165
166
167
168
169
# File 'lib/right_agent/agent.rb', line 165

def self.start(opts = {})
  agent = new(opts)
  agent.run
  agent
end

Instance Method Details

#connect(host, port, index, priority = nil, force = false) ⇒ Object

Connect to an additional AMQP broker or reconnect it if connection has failed Subscribe to identity queue on this broker Update config file if this is a new broker Assumes already has credentials on this broker and identity queue exists

Parameters

host(String)

Host name of broker

port(Integer)

Port number of broker

index(Integer)

Small unique id associated with this broker for use in forming alias

priority(Integer|nil)

Priority position of this broker in list for use

by this agent with nil meaning add to end of list
force(Boolean)

Reconnect even if already connected

Return

(String|nil)

Error message if failed, otherwise nil



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# File 'lib/right_agent/agent.rb', line 299

def connect(host, port, index, priority = nil, force = false)
  @connect_request_stats.update("connect b#{index}")
  even_if = " even if already connected" if force
  Log.info("Connecting to broker at host #{host.inspect} port #{port.inspect} " +
           "index #{index.inspect} priority #{priority.inspect}#{even_if}")
  Log.info("Current broker configuration: #{@client.status.inspect}")
  result = nil
  begin
    @client.connect(host, port, index, priority, force) do |id|
      @client.connection_status(:one_off => @options[:connect_timeout], :brokers => [id]) do |status|
        begin
          if status == :connected
            setup_queues([id])
            remaining = 0
            @remaining_queue_setup.each_value { |ids| remaining += ids.size }
            Log.info("[setup] Finished subscribing to queues after reconnecting to broker #{id}") if remaining == 0
            unless update_configuration(:host => @client.hosts, :port => @client.ports)
              Log.warning("Successfully connected to broker #{id} but failed to update config file")
            end
          else
            ErrorTracker.log(self, "Failed to connect to broker #{id}, status #{status.inspect}")
          end
        rescue Exception => e
          ErrorTracker.log(self, "Failed to connect to broker #{id}, status #{status.inspect}", e)
        end
      end
    end
  rescue StandardError => e
    ErrorTracker.log(self, msg = "Failed to connect to broker at host #{host.inspect} and port #{port.inspect}", e)
    result = Log.format(msg, e)
  end
  result
end

#connect_failed(ids) ⇒ Object

There were problems while setting up service for this agent on the given AMQP brokers, so mark these brokers as failed if not currently connected and later, during the periodic status check, attempt to reconnect

Parameters

ids(Array)

Identity of brokers

Return

(String|nil)

Error message if failed, otherwise nil



384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
# File 'lib/right_agent/agent.rb', line 384

def connect_failed(ids)
  aliases = @client.aliases(ids).join(", ")
  @connect_request_stats.update("enroll failed #{aliases}")
  result = nil
  begin
    Log.info("Received indication that service initialization for this agent for brokers #{ids.inspect} has failed")
    connected = @client.connected
    ignored = connected & ids
    Log.info("Not marking brokers #{ignored.inspect} as unusable because currently connected") if ignored
    Log.info("Current broker configuration: #{@client.status.inspect}")
    @client.declare_unusable(ids - ignored)
  rescue StandardError => e
    ErrorTracker.log(self, msg = "Failed handling broker connection failure indication for #{ids.inspect}", e)
    result = Log.format(msg, e)
  end
  result
end

#disconnect(host, port, remove = false) ⇒ Object

Disconnect from an AMQP broker and optionally remove it from the configuration Refuse to do so if it is the last connected broker

Parameters

host(String)

Host name of broker

port(Integer)

Port number of broker

remove(Boolean)

Whether to remove broker from configuration rather than just closing it,

defaults to false

Return

(String|nil)

Error message if failed, otherwise nil



344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
# File 'lib/right_agent/agent.rb', line 344

def disconnect(host, port, remove = false)
  and_remove = " and removing" if remove
  Log.info("Disconnecting#{and_remove} broker at host #{host.inspect} port #{port.inspect}")
  Log.info("Current broker configuration: #{@client.status.inspect}")
  id = RightAMQP::HABrokerClient.identity(host, port)
  @connect_request_stats.update("disconnect #{@client.alias_(id)}")
  connected = @client.connected
  result = e = nil
  if connected.include?(id) && connected.size == 1
    result = "Not disconnecting from #{id} because it is the last connected broker for this agent"
  elsif @client.get(id)
    begin
      if remove
        @client.remove(host, port) do |id|
          unless update_configuration(:host => @client.hosts, :port => @client.ports)
            result = "Successfully disconnected from broker #{id} but failed to update config file"
          end
        end
      else
        @client.close_one(id)
      end
    rescue StandardError => e
      result = Log.format("Failed to disconnect from broker #{id}", e)
    end
  else
    result = "Cannot disconnect from broker #{id} because not configured for this agent"
  end
  ErrorTracker.log(self, result, e) if result
  result
end

#register(actor, prefix = nil) ⇒ Object

Register an actor for this agent

Parameters

actor(Actor)

Actor to be registered

prefix(String)

Prefix to be used in place of actor’s default_prefix

Return

(Actor)

Actor registered



258
259
260
# File 'lib/right_agent/agent.rb', line 258

def register(actor, prefix = nil)
  @registry.register(actor, prefix)
end

#runObject

Put the agent in service This requires making a RightNet connection via HTTP or AMQP and other initialization like loading actors

Return

true

Always return true



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# File 'lib/right_agent/agent.rb', line 196

def run
  Log.init(@identity, @options[:log_path], :print => true)
  Log.level = @options[:log_level] if @options[:log_level]
  RightSupport::Log::Mixin.default_logger = Log
  ErrorTracker.init(self, @options[:agent_name], :shard_id => @options[:shard_id], :trace_level => TRACE_LEVEL,
                    :airbrake_endpoint => @options[:airbrake_endpoint], :airbrake_api_key => @options[:airbrake_api_key])
  @history.update("start")

  now = Time.now
  Log.info("[start] Agent #{@identity} starting; time: #{now.utc}; utc_offset: #{now.utc_offset}")
  @options.each { |k, v| Log.info("-  #{k}: #{k.to_s =~ /pass/ ? '****' : (v.respond_to?(:each) ? v.inspect : v)}") }

  begin
    # Capture process id in file after optional daemonize
    pid_file = PidFile.new(@identity)
    pid_file.check
    daemonize(@identity, @options) if @options[:daemonize]
    pid_file.write
    at_exit { pid_file.remove }

    if @mode == :http
      # HTTP is being used for RightNet communication instead of AMQP
      # The code loaded with the actors specific to this application
      # is responsible to call setup_http at the appropriate time
      start_service
    else
      # Initiate AMQP broker connection, wait for connection before proceeding
      # otherwise messages published on failed connection will be lost
      @client = RightAMQP::HABrokerClient.new(Serializer.new(:secure), @options.merge(:exception_stats => ErrorTracker.exception_stats))
      @queues.each { |s| @remaining_queue_setup[s] = @client.all }
      @client.connection_status(:one_off => @options[:connect_timeout]) do |status|
        if status == :connected
          # Need to give EM (on Windows) a chance to respond to the AMQP handshake
          # before doing anything interesting to prevent AMQP handshake from
          # timing-out; delay post-connected activity a second
          EM_S.add_timer(1) { start_service }
        elsif status == :failed
          terminate("failed to connect to any brokers during startup")
        elsif status == :timeout
          terminate("failed to connect to any brokers after #{@options[:connect_timeout]} seconds during startup")
        else
          terminate("broker connect attempt failed unexpectedly with status #{status} during startup")
        end
      end
    end
  rescue PidFile::AlreadyRunning
    EM.stop if EM.reactor_running?
    raise
  rescue StandardError => e
    terminate("failed startup", e)
  end
  true
end

#self_hrefString, NilClass

Resource href associated with this agent, if any

Returns:

  • (String, NilClass)

    href or nil if unknown



265
266
267
# File 'lib/right_agent/agent.rb', line 265

def self_href
  @client.self_href if @client && @mode == :http
end

#stats(options = {}) ⇒ Object

Retrieve statistics about agent operation

Parameters:

options(Hash)

Request options:

:reset(Boolean)

Whether to reset the statistics after getting the current ones

Return

result(OperationResult)

Always returns success



475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
# File 'lib/right_agent/agent.rb', line 475

def stats(options = {})
  now = Time.now
  reset = options[:reset]
  stats = {
    "name"            => @agent_name,
    "identity"        => @identity,
    "hostname"        => Socket.gethostname,
    "memory"          => Platform.process.resident_set_size,
    "version"         => AgentConfig.protocol_version,
    "agent stats"     => agent_stats(reset),
    "receive stats"   => dispatcher_stats(reset),
    "send stats"      => @sender.stats(reset),
    "last reset time" => @last_stat_reset_time.to_i,
    "stat time"       => now.to_i,
    "service uptime"  => @history.analyze_service,
    "machine uptime"  => Platform.shell.uptime
  }
  stats["revision"] = @revision if @revision
  if @mode == :http
    stats.merge!(@client.stats(reset))
  else
    stats["brokers"] = @client.stats(reset)
  end
  result = OperationResult.success(stats)
  @last_stat_reset_time = now if reset
  result
end

#status(&callback) ⇒ Object

Record callback to be notified of agent status changes Multiple callbacks are supported

Block

optional block activated when there is a status change with parameters

type (Symbol):: Type of client reporting status change: :auth, :api, :router, :broker
state (Symbol):: State of client

Return

(Hash)

Status of various clients



279
280
281
282
# File 'lib/right_agent/agent.rb', line 279

def status(&callback)
  @status_callbacks << callback if callback
  @status
end

#terminate(reason = nil, exception = nil) ⇒ Object

Gracefully terminate execution by allowing unfinished tasks to complete Immediately terminate if called a second time Report reason for termination if it is abnormal

Parameters

reason(String)

Reason for abnormal termination, if any

exception(Exception|String)

Exception or other parenthetical error information, if any

Return

true

Always return true



434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
# File 'lib/right_agent/agent.rb', line 434

def terminate(reason = nil, exception = nil)
  begin
    @history.update("stop") if @history
    ErrorTracker.log(self, "[stop] Terminating because #{reason}", exception) if reason
    if exception.is_a?(Exception)
      h = @history.analyze_service
      if h[:last_crashed]
        delay = [(Time.now.to_i - h[:last_crash_time]) * 2, MAX_ABNORMAL_TERMINATE_DELAY].min
        Log.info("[stop] Delaying termination for #{RightSupport::Stats.elapsed(delay)} to slow crash cycling")
        sleep(delay)
      end
    end
    if @terminating || @client.nil?
      @terminating = true
      @termination_timer.cancel if @termination_timer
      @termination_timer = nil
      Log.info("[stop] Terminating immediately")
      @terminate_callback.call
      @history.update("graceful exit") if @history && @client.nil?
    else
      @terminating = true
      @check_status_timer.cancel if @check_status_timer
      @check_status_timer = nil
      Log.info("[stop] Agent #{@identity} terminating")
      stop_gracefully(@options[:grace_timeout])
    end
  rescue StandardError => e
    ErrorTracker.log(self, "Failed to terminate gracefully", e)
    begin @terminate_callback.call; rescue Exception; end
  end
  true
end

#update_configuration(opts) ⇒ Object

Update agent’s persisted configuration Note that @options are frozen and therefore not updated

Parameters

opts(Hash)

Options being updated

Return

(Boolean)

true if successful, otherwise false



410
411
412
413
414
415
416
417
418
419
420
421
422
# File 'lib/right_agent/agent.rb', line 410

def update_configuration(opts)
  if (cfg = AgentConfig.load_cfg(@agent_name))
    opts.each { |k, v| cfg[k] = v }
    AgentConfig.store_cfg(@agent_name, cfg)
    true
  else
    ErrorTracker.log(self, "Could not access configuration file #{AgentConfig.cfg_file(@agent_name).inspect} for update")
    false
  end
rescue StandardError => e
  ErrorTracker.log(self, "Failed updating configuration file #{AgentConfig.cfg_file(@agent_name).inspect}", e)
  false
end