Class: Brown::Agent

Inherits:
Object
  • Object
show all
Extended by:
TestHelpers
Defined in:
lib/brown/agent.rb,
lib/brown/test.rb

Overview

A Brown Agent. The whole reason we're here.

An agent is the fundamental unit of work in the Brown universe. Pretty much everything you do to an agent is done on its class; the individual instances of the agent are run by the agent, internally, when reacting to stimuli.

Defined Under Namespace

Classes: AMQPMessage, AMQPMessageMock, AMQPPublisher, Memo, Stimulus

Class Method Summary collapse

Instance Method Summary collapse

Methods included from TestHelpers

amqp_listener?, amqp_listener_with_test, amqp_publisher?, amqp_publisher_with_test, amqp_receive, included, memo?, memo_with_test, reset_memos, stimulus?, timer?, trigger

Class Method Details

.amqp_listener(exchange_name = "", queue_name: nil, amqp_url: "amqp://localhost", concurrency: 1, &blk) {|message| ... } ⇒ Object

Listen for messages from an AMQP broker.

We setup a queue, bound to the exchange specified by the exchange_name argument, and then proceed to hoover up all the messages we can.

The name of the queue that is created by default is derived from the agent class name and the exchange name being bound to. This allows for multiple instances of the same agent, running in separate processes or machines, to share the same queue of messages to process, for throughput or redundancy reasons.

Parameters:

  • exchange_name (#to_s, Array<#to_s>) (defaults to: "")

    the name of the exchange to bind to. You can also specify an array of exchange names, to have all of them put their messages into the one queue. This can be dangerous, because you need to make sure that your message handler can process the different types of messages that might be sent to the different exchangs.

  • queue_name (#to_s) (defaults to: nil)

    the name of the queue to create, if you don't want to use the class-derived default for some reason.

  • amqp_url (#to_s) (defaults to: "amqp://localhost")

    the URL of the AMQP broker to connect to.

  • concurrency (Integer) (defaults to: 1)

    how many messages to process in parallel. The default, 1, means that a message will need to be acknowledged (by calling message.ack) in your worker blk before the broker will consider sending another.

    If your agent is capable of processing more than one message in parallel (because the agent spends a lot of its time waiting for databases or HTTP requests, for example, or perhaps you're running your agents in a Ruby VM which has no GIL) you should increase this value to improve performance. Alternately, if you want/need to batch processing (say, you insert 100 records into a database in a single query) you'll need to increase this to get multiple records at once.

    Setting this to 0 is only for the adventurous. It tells the broker to send your agent messages as fast as it can. You still need to acknowledge the messages as you finish processing them (otherwise the broker will not consider them "delivered") but you will always be sent more messages if there are more to send, even if you never acknowledge any of them. This can get you into an awful lot of trouble if you're not careful, so don't do it just because you can.

  • blk (Proc)

    is called every time a message is received from the queue, and an instance of AMQPMessage will be passed as the sole argument.

Yield Parameters:



239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
# File 'lib/brown/agent.rb', line 239

def amqp_listener(exchange_name = "",
                  queue_name:  nil,
                  amqp_url:    "amqp://localhost",
                  concurrency: 1,
                  &blk
                 )
	listener_uuid = SecureRandom.uuid
	worker_method = "amqp_listener_worker_#{listener_uuid}".to_sym
	queue_memo    = "amqp_listener_queue_#{listener_uuid}".to_sym

	exchange_list = Array === exchange_name ?
	                  exchange_name :
	                  [exchange_name]

	munged_exchange_list = exchange_list.map { |n| n.to_s == "" ? "" : "-#{n.to_s}" }.join
	queue_name ||= self.name.to_s + munged_exchange_list

	memo(queue_memo) do
		begin
			amqp = Bunny.new(amqp_url, logger: logger)
			amqp.start
		rescue Bunny::TCPConnectionFailed
			logger.error { "Failed to connect to #{amqp_url}" }
			sleep 5
			retry
		rescue Bunny::PossibleAuthenticationFailureError
			logger.error { "Authentication failure for #{amqp_url}" }
			sleep 5
			retry
		rescue StandardError => ex
			logger.error { "Unknown error while trying to connect to #{amqp_url}: #{ex.message} (#{ex.class})" }
			sleep 5
			retry
		end

		bind_queue(
		  amqp_session:  amqp,
		  queue_name:    queue_name,
		  exchange_list: exchange_list,
		  concurrency:   concurrency
		)
	end

	define_method(worker_method, &blk)

	stimulate(worker_method) do |worker|
		__send__(queue_memo) do |queue|
			queue.subscribe(manual_ack: true, block: true) do |di, prop, payload|
				worker.call Brown::Agent::AMQPMessage.new(di, prop, payload)
			end
		end
	end
end

.amqp_publisher(name, publisher_opts = {}) ⇒ Object

Declare an AMQP publisher, and create an AMQP exchange to publish to.

On the assumption that you already know how exchanges work, I'll just dive right in.

This method creates an accessor method on your agent named after the symbol you pass in as name, which returns an instance of Brown::Agent::AMQPPublisher. This object, in turn, defines an AMQP exchange when it is created, and has a publish method on it (see Brown::Agent::AMQPPublisher#publish) which sends arbitrary messages to the exchange.

This method is a thin shim around Brown::Agent::AMQPPublisher#initialize; you should read that method's documentation for details of what constitutes valid publisher_opts, and also what exceptions can be raised.

Parameters:

  • name (Symbol)

    the name of the accessor method to call when you want to reference this publisher in your agent code.

  • publisher_opts (Hash) (defaults to: {})

    options which are passed to Brown::Agent::AMQPPublisher#initialize.

See Also:



179
180
181
182
183
# File 'lib/brown/agent.rb', line 179

def amqp_publisher(name, publisher_opts = {})
	opts = { :exchange_name => name }.merge(publisher_opts)

	safe_memo(name) { Brown::Agent::AMQPPublisher.new(opts) }
end

.every(n) { ... } ⇒ Object

Execute a block of code periodically.

This pretty much does what it says on the tin. Every n seconds (where n can be a float) the given block of code is executed.

Don't expect too much precision in the interval; we just sleep between triggers, so there might be a bit of an extra delay between invocations.

Parameters:

  • n (Numeric)

    The amount of time which should elapse between invocations of the block.

Yields:

  • every n seconds.



145
146
147
148
149
150
# File 'lib/brown/agent.rb', line 145

def every(n, &blk)
	method_name = ("every_#{n}__" + SecureRandom.uuid).to_sym
	define_method(method_name, &blk)

	stimulate(method_name) { |worker| sleep n; worker.call }
end

.less_log_detailObject



380
381
382
# File 'lib/brown/agent.rb', line 380

def less_log_detail
	logger.level += 1
end

.logger(l = nil) ⇒ Logger

Get or set the logger for this agent.

Parameters:

  • l (Logger) (defaults to: nil)

Returns:

  • (Logger)


372
373
374
# File 'lib/brown/agent.rb', line 372

def logger(l=nil)
	(@logger = (l || @logger)) || (self == Brown::Agent ? Logger.new($stderr) : Brown::Agent.logger)
end

.logger=(l) ⇒ Object

Set the logger that this agent will use to report problems.

Parameters:

  • l (Logger)


362
363
364
# File 'lib/brown/agent.rb', line 362

def logger=(l)
	@logger = l
end

.memo(name, safe = false, &generator) ⇒ Object

Define a "memo" for this agent.

A "memo" is an object which is common across all instances of a particular agent, and which is (usually) local to that agent. The intended purpose is for anything that is needed for processing stimuli, but which you don't want to recreate for every stimuli. Examples of this sort of thing include connection pools (database, HTTP connections, etc), config files, and caches. Basically, anything that has a non-trivial setup time, or which you want to share across all stimuli processing, should go in a memo.

Because we do everything in threads, and because dealing with locking by hand is a nightmare, access to memos is, by default, protected by a mutex. This means that any time you want to do something with a memo, you call its name and pass a block to do whatever you want to do with the value, like this:

config { |cfg| puts "foo is #{cfg[:foo]}" }

Now, you can, if you want, "leak" this object out of the mutex, with various sorts of assignment. DO NOT SUCCUMB TO THIS TEMPTATION. If you do this, you will risk all sorts of concurrency bugs, where two threads try to read and/or manipulate the object at the same time and all hell breaks loose.

If, and only if, you are 100% confident that the object you want to work with is, in fact, entirely thread-safe (the documentation should mention this), then you can mark a memo object as "safe", either by passing true to memo, or using the handy-dandy safe_memo method. In this case, you can just reference the memo name wherever you like:

safe_memo :db do
  Sequel.connect("postgres:///app_database")
end

#...
db[:foo].where { :baz > 42 }

Note that there is intentionally no way to reassign a memo object. This doesn't mean that memo objects are "read-only", however. The state of the object can be mutated by calling any method on the object that modifies it. If you want more read-only(ish) memos, you probably want to call #freeze on your object when you create it (although all the usual caveats about #freeze still apply).

Parameters:

  • name (Symbol)

    the name of the memo, and hence the name of the method that should be called to retrieve the memo's value.

  • safe (Boolean) (defaults to: false)

    whether or not the object will be "safe" for concurrent access by multiple threads. Do not enable this unless you are completely sure.

Returns:

  • void

See Also:



107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/brown/agent.rb', line 107

def memo(name, safe=false, &generator)
	name = name.to_sym
	@memos ||= {}
	@memos[name] = Brown::Agent::Memo.new(generator, safe)

	define_method(name) do |test=nil, &blk|
		self.class.__send__(name, test, &blk)
	end

	self.singleton_class.__send__(:define_method, name) do |test=nil, &blk|
		memos[name].value(test, &blk)
	end
end

.more_log_detailObject



376
377
378
# File 'lib/brown/agent.rb', line 376

def more_log_detail
	logger.level -= 1
end

.runObject

Start the agent running.

This fires off the stimuli listeners and then waits. If you want to do anything else while this runs, you'll want to fire this in a separate thread.



299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
# File 'lib/brown/agent.rb', line 299

def run
	begin
		# Some memos (AMQPPublisher being the first) work best when
		# initialized when the agent starts up.  At some point in the
		# future, we might implement selective initialisation, but for
		# now we'll take the brutally-simple approach of getting
		# everything.
		(@memos || {}).keys.each { |k| send(k, &->(_){}) }

		@thread_group ||= ThreadGroup.new
		@runner_thread = Thread.current

		(stimuli || {}).each do |s|
			@thread_group.add(
				Thread.new(s) do |s|
					begin
						s.run
					rescue Brown::StopSignal
						# OK then
					end
				end
			)
		end

		@thread_group.list.each do |th|
			begin
				th.join
			rescue Brown::StopSignal
				# OK then
			end
		end
	rescue Exception => ex
		logger.error { "Agent #{self} caught unhandled exception: #{ex.message} (#{ex.class})" }
		logger.info  { ex.backtrace.map { |l| "    #{l}" }.join("\n") }
		stop
	end
end

.safe_memo(name, &generator) ⇒ Object

A variant of memo which is intended for objects which are inherently thread-safe within themselves.

See Also:



126
127
128
# File 'lib/brown/agent.rb', line 126

def safe_memo(name, &generator)
	memo(name, true, &generator)
end

.stimulate(method_name) {|worker| ... } ⇒ Object

Define a generic stimulus for this agent.

This is a fairly low-level method, designed to provide a means for defining stimuli for which there isn't a higher-level, more-specific stimulus definition approach.

When the agent is started (see run), the block you provide will be executed in a dedicated thread. Every time the block finishes, it will be run again. Your block should do whatever it needs to do to detect when a stimuli is available (preferably by blocking somehow, rather than polling, because polling sucks). When your code detects that a stimulus has been received, it should run worker.call, passing in any arguments that are required to process the stimulus. That will then create a new instance of the agent class, and call the specified method_name on that instance, passing in the arguments that were passed to worker.call.

Parameters:

  • method_name (Symbol)

    the name of the method to call when the stimulus is triggered.

Yield Parameters:

  • worker (Proc)

    call this when you want a stimulus processed, passing in anything that the stimulus processing method (as specified by method_name) needs to do its job.

See Also:



39
40
41
42
43
44
45
46
47
48
# File 'lib/brown/agent.rb', line 39

def stimulate(method_name, &blk)
	@stimuli ||= []

	@stimuli << Brown::Agent::Stimulus.new(
	              method_name:  method_name,
	              stimuli_proc: blk,
	              agent_class:  self,
	              logger:       logger
	            )
end

.stopObject

Stop the agent running.

This can either be called in a separate thread, or a signal handler, like so:

Signal.trap("INT") { SomeAgent.stop }

agent.run


346
347
348
349
350
351
352
353
354
355
356
# File 'lib/brown/agent.rb', line 346

def stop
	(@thread_group.list rescue []).each do |th|
		th.raise Brown::FinishSignal.new("agent finish")
	end

	(@thread_group.list rescue []).each do |th|
		th.join
	end

	@runner_thread.raise(Brown::StopSignal.new("Clean stop"))
end

Instance Method Details

#loggerLogger

The logger for this agent.

Returns:

  • (Logger)


427
428
429
# File 'lib/brown/agent.rb', line 427

def logger
	self.class.logger
end