Class: Brown::Sender
- Inherits:
-
Object
- Object
- Brown::Sender
- Defined in:
- lib/brown/sender.rb
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
-
#counter ⇒ Object
def status(&blk) @queue_completion.completion do |queue| queue.status do |num_messages, num_consumers| blk.call(num_messages, num_consumers) if blk end end end.
- #delete(&blk) ⇒ Object
-
#initialize(queue_def, opts = {}) ⇒ Sender
constructor
A new instance of Sender.
-
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
- #publish(payload, opts = {}, &blk) ⇒ Object
- #queue_name ⇒ Object
Methods included from Util
#number_of_consumers, #number_of_messages
Methods included from Logger
#backtrace, #log_level, #logger
Methods included from AmqpErrors
#error_lookup, #error_message, #errors
Constructor Details
#initialize(queue_def, opts = {}) ⇒ Sender
Returns a new instance of Sender.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/brown/sender.rb', line 12 def initialize(queue_def, opts={}) @queue_def = queue_def.is_a?(Brown::QueueDefinition) ? queue_def : Brown::QueueDefinition.new(queue_def, opts) @name = @queue_def.denormalise @reply_container = {} @message_count = 0 @channel_completion = EM::Completion.new open_channel do |channel| logger.debug { "Opening a channel for sending" } @channel_completion.succeed(channel) end end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
10 11 12 |
# File 'lib/brown/sender.rb', line 10 def name @name end |
Instance Method Details
#counter ⇒ Object
def status(&blk) @queue_completion.completion do |queue| queue.status do |num_messages, num_consumers| blk.call(num_messages, num_consumers) if blk end end end
def message_count(&blk) status do |messages| blk.call(messages) if blk end end
def consumer_count(&blk) status do |_, consumers| blk.call(consumers) if blk end end
72 73 74 |
# File 'lib/brown/sender.rb', line 72 def counter @message_count end |
#delete(&blk) ⇒ Object
42 43 44 45 46 47 48 |
# File 'lib/brown/sender.rb', line 42 def delete(&blk) queue.delete do @channel_completion.completion do |channel| channel.close(&blk) end end end |
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
77 78 79 80 81 |
# File 'lib/brown/sender.rb', line 77 def on_error(chain=false, &blk) @channel_completion.completion do |channel| channel.on_error(&blk) end end |
#publish(payload, opts = {}, &blk) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/brown/sender.rb', line 28 def publish(payload, opts={}, &blk) logger.debug { "Publishing to: [queue]: #{@queue_def.denormalise}. [options]: #{opts}" } logger.debug { "ACL content: [queue]: #{@queue_def.denormalise}, [metadata type]: #{payload.class}, [message]: #{payload.inspect}" } increment_counter type = Brown::ACLLookup.get_by_type(payload.class) @channel_completion.completion do |channel| logger.debug { "Publishing #{payload.inspect} to queue #{@queue_def.denormalise}" } AMQP::Exchange.default(channel).publish(payload.to_s, opts.merge(:type => type, :routing_key => @queue_def.normalise), &blk) end end |
#queue_name ⇒ Object
83 84 85 |
# File 'lib/brown/sender.rb', line 83 def queue_name @queue_def.denormalise end |