Class: Brown::Agent
- Inherits:
-
Object
- Object
- Brown::Agent
- Extended by:
- TestHelpers
- Defined in:
- lib/brown/agent.rb,
lib/brown/test.rb
Overview
A Brown Agent. The whole reason we're here.
An agent is the fundamental unit of work in the Brown universe. Pretty much everything you do to an agent is done on its class; the individual instances of the agent are run by the agent, internally, when reacting to stimuli.
Defined Under Namespace
Classes: AMQPMessage, AMQPMessageMock, AMQPPublisher, Memo, Stimulus
Class Method Summary collapse
-
.amqp_listener(exchange_name = "", queue_name: nil, amqp_url: "amqp://localhost", concurrency: 1, &blk) {|message| ... } ⇒ Object
Listen for messages from an AMQP broker.
-
.amqp_publisher(name, publisher_opts = {}) ⇒ Object
Declare an AMQP publisher, and create an AMQP exchange to publish to.
-
.every(n) { ... } ⇒ Object
Execute a block of code periodically.
- .less_log_detail ⇒ Object
-
.logger(l = nil) ⇒ Logger
Get or set the logger for this agent.
-
.logger=(l) ⇒ Object
Set the logger that this agent will use to report problems.
-
.memo(name, safe = false, &generator) ⇒ Object
Define a "memo" for this agent.
- .more_log_detail ⇒ Object
-
.run ⇒ Object
Start the agent running.
-
.safe_memo(name, &generator) ⇒ Object
A variant of Agent.memo which is intended for objects which are inherently thread-safe within themselves.
-
.stimulate(method_name) {|worker| ... } ⇒ Object
Define a generic stimulus for this agent.
-
.stop ⇒ Object
Stop the agent running.
Instance Method Summary collapse
-
#logger ⇒ Logger
The logger for this agent.
Methods included from TestHelpers
amqp_listener?, amqp_listener_with_test, amqp_publisher?, amqp_publisher_with_test, amqp_receive, included, memo?, memo_with_test, reset_memos, stimulus?, timer?, trigger
Class Method Details
.amqp_listener(exchange_name = "", queue_name: nil, amqp_url: "amqp://localhost", concurrency: 1, &blk) {|message| ... } ⇒ Object
Listen for messages from an AMQP broker.
We setup a queue, bound to the exchange specified by the
exchange_name
argument, and then proceed to hoover up all the
messages we can.
The name of the queue that is created by default is derived from the agent class name and the exchange name being bound to. This allows for multiple instances of the same agent, running in separate processes or machines, to share the same queue of messages to process, for throughput or redundancy reasons.
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
# File 'lib/brown/agent.rb', line 239 def amqp_listener(exchange_name = "", queue_name: nil, amqp_url: "amqp://localhost", concurrency: 1, &blk ) listener_uuid = SecureRandom.uuid worker_method = "amqp_listener_worker_#{listener_uuid}".to_sym queue_memo = "amqp_listener_queue_#{listener_uuid}".to_sym exchange_list = Array === exchange_name ? exchange_name : [exchange_name] munged_exchange_list = exchange_list.map { |n| n.to_s == "" ? "" : "-#{n.to_s}" }.join queue_name ||= self.name.to_s + munged_exchange_list memo(queue_memo) do begin amqp = Bunny.new(amqp_url, logger: logger) amqp.start rescue Bunny::TCPConnectionFailed logger.error { "Failed to connect to #{amqp_url}" } sleep 5 retry rescue Bunny::PossibleAuthenticationFailureError logger.error { "Authentication failure for #{amqp_url}" } sleep 5 retry rescue StandardError => ex logger.error { "Unknown error while trying to connect to #{amqp_url}: #{ex.} (#{ex.class})" } sleep 5 retry end bind_queue( amqp_session: amqp, queue_name: queue_name, exchange_list: exchange_list, concurrency: concurrency ) end define_method(worker_method, &blk) stimulate(worker_method) do |worker| __send__(queue_memo) do |queue| queue.subscribe(manual_ack: true, block: true) do |di, prop, payload| worker.call Brown::Agent::AMQPMessage.new(di, prop, payload) end end end end |
.amqp_publisher(name, publisher_opts = {}) ⇒ Object
Declare an AMQP publisher, and create an AMQP exchange to publish to.
On the assumption that you already know how exchanges work, I'll just dive right in.
This method creates an accessor method on your agent named after the
symbol you pass in as name
, which returns an instance of
Brown::Agent::AMQPPublisher
. This object, in turn, defines an
AMQP exchange when it is created, and has a
publish
method on it (see Brown::Agent::AMQPPublisher#publish) which
sends arbitrary messages to the exchange.
This method is a thin shim around Brown::Agent::AMQPPublisher#initialize;
you should read that method's documentation for details of what
constitutes valid publisher_opts
, and also what exceptions can be
raised.
179 180 181 182 183 |
# File 'lib/brown/agent.rb', line 179 def amqp_publisher(name, publisher_opts = {}) opts = { :exchange_name => name }.merge(publisher_opts) safe_memo(name) { Brown::Agent::AMQPPublisher.new(opts) } end |
.every(n) { ... } ⇒ Object
Execute a block of code periodically.
This pretty much does what it says on the tin. Every
n
seconds (where n
can be a float) the given block
of code is executed.
Don't expect too much precision in the interval; we just sleep between triggers, so there might be a bit of an extra delay between invocations.
145 146 147 148 149 150 |
# File 'lib/brown/agent.rb', line 145 def every(n, &blk) method_name = ("every_#{n}__" + SecureRandom.uuid).to_sym define_method(method_name, &blk) stimulate(method_name) { |worker| sleep n; worker.call } end |
.less_log_detail ⇒ Object
380 381 382 |
# File 'lib/brown/agent.rb', line 380 def less_log_detail logger.level += 1 end |
.logger(l = nil) ⇒ Logger
Get or set the logger for this agent.
372 373 374 |
# File 'lib/brown/agent.rb', line 372 def logger(l=nil) (@logger = (l || @logger)) || (self == Brown::Agent ? Logger.new($stderr) : Brown::Agent.logger) end |
.logger=(l) ⇒ Object
Set the logger that this agent will use to report problems.
362 363 364 |
# File 'lib/brown/agent.rb', line 362 def logger=(l) @logger = l end |
.memo(name, safe = false, &generator) ⇒ Object
Define a "memo" for this agent.
A "memo" is an object which is common across all instances of a particular agent, and which is (usually) local to that agent. The intended purpose is for anything that is needed for processing stimuli, but which you don't want to recreate for every stimuli. Examples of this sort of thing include connection pools (database, HTTP connections, etc), config files, and caches. Basically, anything that has a non-trivial setup time, or which you want to share across all stimuli processing, should go in a memo.
Because we do everything in threads, and because dealing with locking by hand is a nightmare, access to memos is, by default, protected by a mutex. This means that any time you want to do something with a memo, you call its name and pass a block to do whatever you want to do with the value, like this:
config { |cfg| puts "foo is #{cfg[:foo]}" }
Now, you can, if you want, "leak" this object out of the mutex, with various sorts of assignment. DO NOT SUCCUMB TO THIS TEMPTATION. If you do this, you will risk all sorts of concurrency bugs, where two threads try to read and/or manipulate the object at the same time and all hell breaks loose.
If, and only if, you are 100% confident that the
object you want to work with is, in fact, entirely thread-safe (the
documentation should mention this), then you can mark a memo object
as "safe", either by passing true
to memo, or using the handy-dandy
safe_memo method. In this case, you can just reference the memo
name wherever you like:
safe_memo :db do
Sequel.connect("postgres:///app_database")
end
#...
db[:foo].where { :baz > 42 }
Note that there is intentionally no way to reassign a memo object.
This doesn't mean that memo objects are "read-only", however. The
state of the object can be mutated by calling any method on the
object that modifies it. If you want more read-only(ish) memos, you
probably want to call #freeze
on your object when you create it
(although all the usual caveats about #freeze
still apply).
107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/brown/agent.rb', line 107 def memo(name, safe=false, &generator) name = name.to_sym @memos ||= {} @memos[name] = Brown::Agent::Memo.new(generator, safe) define_method(name) do |test=nil, &blk| self.class.__send__(name, test, &blk) end self.singleton_class.__send__(:define_method, name) do |test=nil, &blk| memos[name].value(test, &blk) end end |
.more_log_detail ⇒ Object
376 377 378 |
# File 'lib/brown/agent.rb', line 376 def more_log_detail logger.level -= 1 end |
.run ⇒ Object
Start the agent running.
This fires off the stimuli listeners and then waits. If you want to do anything else while this runs, you'll want to fire this in a separate thread.
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
# File 'lib/brown/agent.rb', line 299 def run begin # Some memos (AMQPPublisher being the first) work best when # initialized when the agent starts up. At some point in the # future, we might implement selective initialisation, but for # now we'll take the brutally-simple approach of getting # everything. (@memos || {}).keys.each { |k| send(k, &->(_){}) } @thread_group ||= ThreadGroup.new @runner_thread = Thread.current (stimuli || {}).each do |s| @thread_group.add( Thread.new(s) do |s| begin s.run rescue Brown::StopSignal # OK then end end ) end @thread_group.list.each do |th| begin th.join rescue Brown::StopSignal # OK then end end rescue Exception => ex logger.error { "Agent #{self} caught unhandled exception: #{ex.} (#{ex.class})" } logger.info { ex.backtrace.map { |l| " #{l}" }.join("\n") } stop end end |
.safe_memo(name, &generator) ⇒ Object
A variant of memo which is intended for objects which are inherently thread-safe within themselves.
126 127 128 |
# File 'lib/brown/agent.rb', line 126 def safe_memo(name, &generator) memo(name, true, &generator) end |
.stimulate(method_name) {|worker| ... } ⇒ Object
Define a generic stimulus for this agent.
This is a fairly low-level method, designed to provide a means for defining stimuli for which there isn't a higher-level, more-specific stimulus definition approach.
When the agent is started (see run), the block you provide will
be executed in a dedicated thread. Every time the block finishes,
it will be run again. Your block should do whatever it needs to do
to detect when a stimuli is available (preferably by blocking
somehow, rather than polling, because polling sucks). When your
code detects that a stimulus has been received, it should run
worker.call
, passing in any arguments that are required to process
the stimulus. That will then create a new instance of the agent
class, and call the specified method_name
on that instance,
passing in the arguments that were passed to worker.call
.
39 40 41 42 43 44 45 46 47 48 |
# File 'lib/brown/agent.rb', line 39 def stimulate(method_name, &blk) @stimuli ||= [] @stimuli << Brown::Agent::Stimulus.new( method_name: method_name, stimuli_proc: blk, agent_class: self, logger: logger ) end |
.stop ⇒ Object
Stop the agent running.
This can either be called in a separate thread, or a signal handler, like so:
Signal.trap("INT") { SomeAgent.stop }
agent.run
346 347 348 349 350 351 352 353 354 355 356 |
# File 'lib/brown/agent.rb', line 346 def stop (@thread_group.list rescue []).each do |th| th.raise Brown::FinishSignal.new("agent finish") end (@thread_group.list rescue []).each do |th| th.join end @runner_thread.raise(Brown::StopSignal.new("Clean stop")) end |
Instance Method Details
#logger ⇒ Logger
The logger for this agent.
427 428 429 |
# File 'lib/brown/agent.rb', line 427 def logger self.class.logger end |