Class: Brown::Sender

Inherits:
Object
  • Object
show all
Includes:
Logger, Util
Defined in:
lib/brown/sender.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util

#number_of_consumers, #number_of_messages

Methods included from Logger

#backtrace, #log_level, #logger

Methods included from AmqpErrors

#error_lookup, #error_message, #errors

Constructor Details

#initialize(queue_def, opts = {}) ⇒ Sender

Returns a new instance of Sender.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/brown/sender.rb', line 12

def initialize(queue_def, opts={})
	@queue_def = queue_def.is_a?(Brown::QueueDefinition) ? queue_def : Brown::QueueDefinition.new(queue_def, opts)
	@name      = @queue_def.denormalise

	@reply_container = {}

	@message_count = 0

	@channel_completion = EM::Completion.new

	open_channel do |channel|
		logger.debug { "Opening a channel for sending" }
		@channel_completion.succeed(channel)
	end
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



10
11
12
# File 'lib/brown/sender.rb', line 10

def name
  @name
end

Instance Method Details

#counterObject

def status(&blk) @queue_completion.completion do |queue| queue.status do |num_messages, num_consumers| blk.call(num_messages, num_consumers) if blk end end end

def message_count(&blk) status do |messages| blk.call(messages) if blk end end

def consumer_count(&blk) status do |_, consumers| blk.call(consumers) if blk end end



72
73
74
# File 'lib/brown/sender.rb', line 72

def counter
	@message_count
end

#delete(&blk) ⇒ Object



42
43
44
45
46
47
48
# File 'lib/brown/sender.rb', line 42

def delete(&blk)
	queue.delete do
		@channel_completion.completion do |channel|
			channel.close(&blk)
		end
	end
end

#on_error(chain = false, &blk) ⇒ Object

Define a channel error handler.



77
78
79
80
81
# File 'lib/brown/sender.rb', line 77

def on_error(chain=false, &blk)
	@channel_completion.completion do |channel|
		channel.on_error(&blk)
	end
end

#publish(payload, opts = {}, &blk) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/brown/sender.rb', line 28

def publish(payload, opts={}, &blk)
	logger.debug { "Publishing to: [queue]: #{@queue_def.denormalise}. [options]: #{opts}" }
	logger.debug { "ACL content: [queue]: #{@queue_def.denormalise}, [metadata type]: #{payload.class}, [message]: #{payload.inspect}" }

	increment_counter

	type = Brown::ACLLookup.get_by_type(payload.class)

	@channel_completion.completion do |channel|
		logger.debug { "Publishing #{payload.inspect} to queue #{@queue_def.denormalise}" }
		AMQP::Exchange.default(channel).publish(payload.to_s, opts.merge(:type => type, :routing_key => @queue_def.normalise), &blk)
	end
end

#queue_nameObject



83
84
85
# File 'lib/brown/sender.rb', line 83

def queue_name
	@queue_def.denormalise
end