Class: RightScale::Agent
- Includes:
- ConsoleHelper, DaemonizeHelper
- Defined in:
- lib/right_agent/agent.rb
Overview
Agent for receiving requests via RightNet and acting upon them by dispatching to a registered actor to perform See load_actors for details on how the agent specific environment is loaded Operates in either HTTP or AMQP mode for RightNet communication
Constant Summary collapse
- DEFAULT_OPTIONS =
Default option settings for the agent
{ :user => 'agent', :pass => 'testing', :vhost => '/right_net', :secure => true, :log_level => :info, :daemonize => false, :console => false, :root_dir => Dir.pwd, :mode => :amqp, :time_to_live => 0, :retry_interval => nil, :retry_timeout => nil, :connect_timeout => 60, :reconnect_interval => 60, :offline_queueing => false, :ping_interval => 0, :check_interval => 5 * 60, :grace_timeout => 30, :prefetch => 1, :heartbeat => 0 }
- MAX_ABNORMAL_TERMINATE_DELAY =
Maximum abnormal termination delay for slowing crash cycling
60 * 60
- TERMINATE_BLOCK =
Block to be activated when finish terminating
lambda { EM.stop if EM.reactor_running? }
- TRACE_LEVEL =
Exceptions with restricted error backtrace level Value :no_trace means no backtrace and no tracking in stats or reporting to Errbit
{ RightSupport::Net::NoResult => :no_trace, RightScale::HttpExceptions::RequestTimeout => :no_trace, RightScale::Exceptions::ConnectivityFailure => :no_trace, RightScale::BalancedHttpClient::NotResponding => :no_trace, RightAMQP::HABrokerClient::NoConnectedBrokers => :no_trace }
Instance Attribute Summary collapse
-
#client ⇒ Object
readonly
(RightHttpClient|RightAMQP::HABrokerClient) Client for accessing RightNet/RightApi.
-
#dispatchers ⇒ Object
readonly
(Hash) Dispatcher for each queue for messages received via AMQP.
-
#identity ⇒ Object
readonly
(String) Identity of this agent.
-
#mode ⇒ Object
readonly
(Symbol) RightNet communication mode: :http or :amqp.
-
#options ⇒ Object
readonly
(Hash) Configuration options applied to the agent.
-
#registry ⇒ Object
readonly
(ActorRegistry) Registry for this agents actors.
-
#request_queue ⇒ Object
readonly
(String) Name of AMQP queue to which requests are to be published.
-
#tags ⇒ Object
(Array) Tag strings published by agent.
Class Method Summary collapse
-
.start(opts = {}) ⇒ Object
Initializes a new agent and establishes an HTTP or AMQP RightNet connection This must be used inside an EM.run block unless the EventMachine reactor was already started by the server that this application runs on.
Instance Method Summary collapse
-
#connect(host, port, index, priority = nil, force = false) ⇒ Object
Connect to an additional AMQP broker or reconnect it if connection has failed Subscribe to identity queue on this broker Update config file if this is a new broker Assumes already has credentials on this broker and identity queue exists.
-
#connect_failed(ids) ⇒ Object
There were problems while setting up service for this agent on the given AMQP brokers, so mark these brokers as failed if not currently connected and later, during the periodic status check, attempt to reconnect.
-
#disconnect(host, port, remove = false) ⇒ Object
Disconnect from an AMQP broker and optionally remove it from the configuration Refuse to do so if it is the last connected broker.
-
#initialize(opts) ⇒ Agent
constructor
Initialize the new agent.
-
#register(actor, prefix = nil) ⇒ Object
Register an actor for this agent.
-
#run ⇒ Object
Put the agent in service This requires making a RightNet connection via HTTP or AMQP and other initialization like loading actors.
-
#self_href ⇒ String, NilClass
Resource href associated with this agent, if any.
-
#stats(options = {}) ⇒ Object
Retrieve statistics about agent operation.
-
#status(&callback) ⇒ Object
Record callback to be notified of agent status changes Multiple callbacks are supported.
-
#terminate(reason = nil, exception = nil) ⇒ Object
Gracefully terminate execution by allowing unfinished tasks to complete Immediately terminate if called a second time Report reason for termination if it is abnormal.
-
#update_configuration(opts) ⇒ Object
Update agent’s persisted configuration Note that @options are frozen and therefore not updated.
Methods included from DaemonizeHelper
Methods included from ConsoleHelper
Constructor Details
#initialize(opts) ⇒ Agent
Initialize the new agent
Parameters
- opts(Hash)
-
Configuration options per start method above
Return
- true
-
Always return true
178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/right_agent/agent.rb', line 178 def initialize(opts) set_configuration(opts) @tags = [] @tags << opts[:tag] if opts[:tag] @tags.flatten! @status_callbacks = [] @options.freeze @last_stat_reset_time = Time.now reset_agent_stats true end |
Instance Attribute Details
#client ⇒ Object (readonly)
(RightHttpClient|RightAMQP::HABrokerClient) Client for accessing RightNet/RightApi
49 50 51 |
# File 'lib/right_agent/agent.rb', line 49 def client @client end |
#dispatchers ⇒ Object (readonly)
(Hash) Dispatcher for each queue for messages received via AMQP
43 44 45 |
# File 'lib/right_agent/agent.rb', line 43 def dispatchers @dispatchers end |
#identity ⇒ Object (readonly)
(String) Identity of this agent
37 38 39 |
# File 'lib/right_agent/agent.rb', line 37 def identity @identity end |
#mode ⇒ Object (readonly)
(Symbol) RightNet communication mode: :http or :amqp
52 53 54 |
# File 'lib/right_agent/agent.rb', line 52 def mode @mode end |
#options ⇒ Object (readonly)
(Hash) Configuration options applied to the agent
40 41 42 |
# File 'lib/right_agent/agent.rb', line 40 def @options end |
#registry ⇒ Object (readonly)
(ActorRegistry) Registry for this agents actors
46 47 48 |
# File 'lib/right_agent/agent.rb', line 46 def registry @registry end |
#request_queue ⇒ Object (readonly)
(String) Name of AMQP queue to which requests are to be published
55 56 57 |
# File 'lib/right_agent/agent.rb', line 55 def request_queue @request_queue end |
#tags ⇒ Object
(Array) Tag strings published by agent
58 59 60 |
# File 'lib/right_agent/agent.rb', line 58 def @tags end |
Class Method Details
.start(opts = {}) ⇒ Object
Initializes a new agent and establishes an HTTP or AMQP RightNet connection This must be used inside an EM.run block unless the EventMachine reactor was already started by the server that this application runs on
Parameters
- opts(Hash)
-
Configuration options:
- :identity(String)
-
Identity of this agent; no default
- :agent_name(String)
-
Local name for this agent
- :root_dir(String)
-
Application root for this agent containing subdirectories actors, certs, and init;
defaults to current working directory
:pid_dir(String):: Path to the directory where the agent stores its process id file (only if daemonized);
defaults to the current working directory
:log_dir(String):: Log directory path; defaults to the platform specific log directory
:log_level(Symbol):: The verbosity of logging -- :debug, :info, :warn, :error or :fatal
:actors(Array):: List of actors to load
:console(Boolean):: true indicates to start interactive console
:daemonize(Boolean):: true indicates to daemonize
:retry_interval(Numeric):: Number of seconds between request retries
:retry_timeout(Numeric):: Maximum number of seconds to retry request before give up
:time_to_live(Integer):: Number of seconds before a request expires and is to be ignored
by the receiver, 0 means never expire; defaults to 0
:connect_timeout(Integer):: Number of seconds to wait for an AMQP broker connection to be established
:reconnect_interval(Integer):: Number of seconds between AMQP broker reconnect attempts
:offline_queueing(Boolean):: Whether to queue request if currently not connected to RightNet,
also requires agent invocation of Sender initialize_offline_queue and start_offline_queue methods,
as well as enable_offline_mode and disable_offline_mode as connection status changes
:ping_interval(Integer):: Minimum number of seconds since last message receipt to ping the RightNet
router to check connectivity; defaults to 0 meaning do not ping
:check_interval(Integer):: Number of seconds between publishing stats and checking for AMQP broker
connections that failed during agent launch and then attempting to reconnect
:heartbeat(Integer):: Number of seconds between AMQP connection heartbeats used to keep
connection alive (e.g., when AMQP broker is behind a firewall), nil or 0 means disable
:grace_timeout(Integer):: Maximum number of seconds to wait after last request received before
terminating regardless of whether there are still unfinished requests
:dup_check(Boolean):: Whether to check for and reject duplicate requests, e.g., due to retries
or redelivery by AMQP broker after server failure
:prefetch(Integer):: Maximum number of messages the AMQP broker is to prefetch for this agent
before it receives an ack. Value 1 ensures that only last unacknowledged gets redelivered
if the agent crashes. Value 0 means unlimited prefetch.
:airbrake_endpoint(String):: URL for Airbrake for reporting unexpected exceptions to Errbit
:airbrake_api_key(String):: Key for using the Airbrake API access to Errbit
:ready_callback(Proc):: Called once agent is connected to AMQP broker and ready for service (no argument)
:restart_callback(Proc):: Called on each restart vote with votes being initiated by offline queue
exceeding MAX_QUEUED_REQUESTS or by repeated failures to access RightNet when online (no argument)
:services(Symbol):: List of services provided by this agent; defaults to all methods exposed by actors
:secure(Boolean):: true indicates to use security features of RabbitMQ to restrict agents to themselves
:fiber_pool_size(Integer):: Size of fiber pool
:fiber_pool(FiberPool):: Fiber pool configured for use with EventMachine when making HTTP requests
:mode(Symbol):: RightNet communication mode: :http or :amqp; defaults to :amqp
:api_url(String):: Domain name for HTTP access to RightApi server
:account_id(Integer):: Identifier for account owning this agent
:shard_id(Integer):: Identifier for database shard in which this agent is operating
:vhost(String):: AMQP broker virtual host
:user(String):: AMQP broker user
:pass(String):: AMQP broker password
:host(String):: Comma-separated list of AMQP broker hosts; if only one, it is reapplied
to successive ports; if none; defaults to 'localhost'
:port(Integer):: Comma-separated list of AMQP broker ports corresponding to hosts; if only one,
it is incremented and applied to successive hosts; if none, defaults to AMQP::HOST
On start config.yml is read, so it is common to specify options in the YAML file. However, when both Ruby code options and YAML file specify options, Ruby code options take precedence.
Return
- agent(Agent)
-
New agent
165 166 167 168 169 |
# File 'lib/right_agent/agent.rb', line 165 def self.start(opts = {}) agent = new(opts) agent.run agent end |
Instance Method Details
#connect(host, port, index, priority = nil, force = false) ⇒ Object
Connect to an additional AMQP broker or reconnect it if connection has failed Subscribe to identity queue on this broker Update config file if this is a new broker Assumes already has credentials on this broker and identity queue exists
Parameters
- host(String)
-
Host name of broker
- port(Integer)
-
Port number of broker
- index(Integer)
-
Small unique id associated with this broker for use in forming alias
- priority(Integer|nil)
-
Priority position of this broker in list for use
by this agent with nil meaning add to end of list
- force(Boolean)
-
Reconnect even if already connected
Return
- (String|nil)
-
Error message if failed, otherwise nil
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 |
# File 'lib/right_agent/agent.rb', line 299 def connect(host, port, index, priority = nil, force = false) @connect_request_stats.update("connect b#{index}") even_if = " even if already connected" if force Log.info("Connecting to broker at host #{host.inspect} port #{port.inspect} " + "index #{index.inspect} priority #{priority.inspect}#{even_if}") Log.info("Current broker configuration: #{@client.status.inspect}") result = nil begin @client.connect(host, port, index, priority, force) do |id| @client.connection_status(:one_off => @options[:connect_timeout], :brokers => [id]) do |status| begin if status == :connected setup_queues([id]) remaining = 0 @remaining_queue_setup.each_value { |ids| remaining += ids.size } Log.info("[setup] Finished subscribing to queues after reconnecting to broker #{id}") if remaining == 0 unless update_configuration(:host => @client.hosts, :port => @client.ports) Log.warning("Successfully connected to broker #{id} but failed to update config file") end else ErrorTracker.log(self, "Failed to connect to broker #{id}, status #{status.inspect}") end rescue Exception => e ErrorTracker.log(self, "Failed to connect to broker #{id}, status #{status.inspect}", e) end end end rescue StandardError => e ErrorTracker.log(self, msg = "Failed to connect to broker at host #{host.inspect} and port #{port.inspect}", e) result = Log.format(msg, e) end result end |
#connect_failed(ids) ⇒ Object
There were problems while setting up service for this agent on the given AMQP brokers, so mark these brokers as failed if not currently connected and later, during the periodic status check, attempt to reconnect
Parameters
- ids(Array)
-
Identity of brokers
Return
- (String|nil)
-
Error message if failed, otherwise nil
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 |
# File 'lib/right_agent/agent.rb', line 384 def connect_failed(ids) aliases = @client.aliases(ids).join(", ") @connect_request_stats.update("enroll failed #{aliases}") result = nil begin Log.info("Received indication that service initialization for this agent for brokers #{ids.inspect} has failed") connected = @client.connected ignored = connected & ids Log.info("Not marking brokers #{ignored.inspect} as unusable because currently connected") if ignored Log.info("Current broker configuration: #{@client.status.inspect}") @client.declare_unusable(ids - ignored) rescue StandardError => e ErrorTracker.log(self, msg = "Failed handling broker connection failure indication for #{ids.inspect}", e) result = Log.format(msg, e) end result end |
#disconnect(host, port, remove = false) ⇒ Object
Disconnect from an AMQP broker and optionally remove it from the configuration Refuse to do so if it is the last connected broker
Parameters
- host(String)
-
Host name of broker
- port(Integer)
-
Port number of broker
- remove(Boolean)
-
Whether to remove broker from configuration rather than just closing it,
defaults to false
Return
- (String|nil)
-
Error message if failed, otherwise nil
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
# File 'lib/right_agent/agent.rb', line 344 def disconnect(host, port, remove = false) and_remove = " and removing" if remove Log.info("Disconnecting#{and_remove} broker at host #{host.inspect} port #{port.inspect}") Log.info("Current broker configuration: #{@client.status.inspect}") id = RightAMQP::HABrokerClient.identity(host, port) @connect_request_stats.update("disconnect #{@client.alias_(id)}") connected = @client.connected result = e = nil if connected.include?(id) && connected.size == 1 result = "Not disconnecting from #{id} because it is the last connected broker for this agent" elsif @client.get(id) begin if remove @client.remove(host, port) do |id| unless update_configuration(:host => @client.hosts, :port => @client.ports) result = "Successfully disconnected from broker #{id} but failed to update config file" end end else @client.close_one(id) end rescue StandardError => e result = Log.format("Failed to disconnect from broker #{id}", e) end else result = "Cannot disconnect from broker #{id} because not configured for this agent" end ErrorTracker.log(self, result, e) if result result end |
#register(actor, prefix = nil) ⇒ Object
Register an actor for this agent
Parameters
- actor(Actor)
-
Actor to be registered
- prefix(String)
-
Prefix to be used in place of actor’s default_prefix
Return
- (Actor)
-
Actor registered
258 259 260 |
# File 'lib/right_agent/agent.rb', line 258 def register(actor, prefix = nil) @registry.register(actor, prefix) end |
#run ⇒ Object
Put the agent in service This requires making a RightNet connection via HTTP or AMQP and other initialization like loading actors
Return
- true
-
Always return true
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 |
# File 'lib/right_agent/agent.rb', line 196 def run Log.init(@identity, @options[:log_path], :print => true) Log.level = @options[:log_level] if @options[:log_level] RightSupport::Log::Mixin.default_logger = Log ErrorTracker.init(self, @options[:agent_name], :shard_id => @options[:shard_id], :trace_level => TRACE_LEVEL, :airbrake_endpoint => @options[:airbrake_endpoint], :airbrake_api_key => @options[:airbrake_api_key]) @history.update("start") now = Time.now Log.info("[start] Agent #{@identity} starting; time: #{now.utc}; utc_offset: #{now.utc_offset}") @options.each { |k, v| Log.info("- #{k}: #{k.to_s =~ /pass/ ? '****' : (v.respond_to?(:each) ? v.inspect : v)}") } begin # Capture process id in file after optional daemonize pid_file = PidFile.new(@identity) pid_file.check daemonize(@identity, @options) if @options[:daemonize] pid_file.write at_exit { pid_file.remove } if @mode == :http # HTTP is being used for RightNet communication instead of AMQP # The code loaded with the actors specific to this application # is responsible to call setup_http at the appropriate time start_service else # Initiate AMQP broker connection, wait for connection before proceeding # otherwise messages published on failed connection will be lost @client = RightAMQP::HABrokerClient.new(Serializer.new(:secure), @options.merge(:exception_stats => ErrorTracker.exception_stats)) @queues.each { |s| @remaining_queue_setup[s] = @client.all } @client.connection_status(:one_off => @options[:connect_timeout]) do |status| if status == :connected # Need to give EM (on Windows) a chance to respond to the AMQP handshake # before doing anything interesting to prevent AMQP handshake from # timing-out; delay post-connected activity a second EM_S.add_timer(1) { start_service } elsif status == :failed terminate("failed to connect to any brokers during startup") elsif status == :timeout terminate("failed to connect to any brokers after #{@options[:connect_timeout]} seconds during startup") else terminate("broker connect attempt failed unexpectedly with status #{status} during startup") end end end rescue PidFile::AlreadyRunning EM.stop if EM.reactor_running? raise rescue StandardError => e terminate("failed startup", e) end true end |
#self_href ⇒ String, NilClass
Resource href associated with this agent, if any
265 266 267 |
# File 'lib/right_agent/agent.rb', line 265 def self_href @client.self_href if @client && @mode == :http end |
#stats(options = {}) ⇒ Object
Retrieve statistics about agent operation
Parameters:
- options(Hash)
-
Request options:
- :reset(Boolean)
-
Whether to reset the statistics after getting the current ones
Return
- result(OperationResult)
-
Always returns success
475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 |
# File 'lib/right_agent/agent.rb', line 475 def stats( = {}) now = Time.now reset = [:reset] stats = { "name" => @agent_name, "identity" => @identity, "hostname" => Socket.gethostname, "memory" => Platform.process.resident_set_size, "version" => AgentConfig.protocol_version, "agent stats" => agent_stats(reset), "receive stats" => dispatcher_stats(reset), "send stats" => @sender.stats(reset), "last reset time" => @last_stat_reset_time.to_i, "stat time" => now.to_i, "service uptime" => @history.analyze_service, "machine uptime" => Platform.shell.uptime } stats["revision"] = @revision if @revision if @mode == :http stats.merge!(@client.stats(reset)) else stats["brokers"] = @client.stats(reset) end result = OperationResult.success(stats) @last_stat_reset_time = now if reset result end |
#status(&callback) ⇒ Object
Record callback to be notified of agent status changes Multiple callbacks are supported
Block
optional block activated when there is a status change with parameters
type (Symbol):: Type of client reporting status change: :auth, :api, :router, :broker
state (Symbol):: State of client
Return
- (Hash)
-
Status of various clients
279 280 281 282 |
# File 'lib/right_agent/agent.rb', line 279 def status(&callback) @status_callbacks << callback if callback @status end |
#terminate(reason = nil, exception = nil) ⇒ Object
Gracefully terminate execution by allowing unfinished tasks to complete Immediately terminate if called a second time Report reason for termination if it is abnormal
Parameters
- reason(String)
-
Reason for abnormal termination, if any
- exception(Exception|String)
-
Exception or other parenthetical error information, if any
Return
- true
-
Always return true
434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 |
# File 'lib/right_agent/agent.rb', line 434 def terminate(reason = nil, exception = nil) begin @history.update("stop") if @history ErrorTracker.log(self, "[stop] Terminating because #{reason}", exception) if reason if exception.is_a?(Exception) h = @history.analyze_service if h[:last_crashed] delay = [(Time.now.to_i - h[:last_crash_time]) * 2, MAX_ABNORMAL_TERMINATE_DELAY].min Log.info("[stop] Delaying termination for #{RightSupport::Stats.elapsed(delay)} to slow crash cycling") sleep(delay) end end if @terminating || @client.nil? @terminating = true @termination_timer.cancel if @termination_timer @termination_timer = nil Log.info("[stop] Terminating immediately") @terminate_callback.call @history.update("graceful exit") if @history && @client.nil? else @terminating = true @check_status_timer.cancel if @check_status_timer @check_status_timer = nil Log.info("[stop] Agent #{@identity} terminating") stop_gracefully(@options[:grace_timeout]) end rescue StandardError => e ErrorTracker.log(self, "Failed to terminate gracefully", e) begin @terminate_callback.call; rescue Exception; end end true end |
#update_configuration(opts) ⇒ Object
Update agent’s persisted configuration Note that @options are frozen and therefore not updated
Parameters
- opts(Hash)
-
Options being updated
Return
- (Boolean)
-
true if successful, otherwise false
410 411 412 413 414 415 416 417 418 419 420 421 422 |
# File 'lib/right_agent/agent.rb', line 410 def update_configuration(opts) if (cfg = AgentConfig.load_cfg(@agent_name)) opts.each { |k, v| cfg[k] = v } AgentConfig.store_cfg(@agent_name, cfg) true else ErrorTracker.log(self, "Could not access configuration file #{AgentConfig.cfg_file(@agent_name).inspect} for update") false end rescue StandardError => e ErrorTracker.log(self, "Failed updating configuration file #{AgentConfig.cfg_file(@agent_name).inspect}", e) false end |