Class: Smith::Messaging::Sender
- Inherits:
-
Object
- Object
- Smith::Messaging::Sender
- Defined in:
- lib/smith/messaging/sender.rb
Instance Method Summary collapse
- #consumer_count(&blk) ⇒ Object
- #counter ⇒ Object
- #delete(&blk) ⇒ Object
-
#initialize(queue_def, opts = {}, &blk) ⇒ Sender
constructor
A new instance of Sender.
- #message_count(&blk) ⇒ Object
-
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
-
#on_reply(opts = {}, &blk) ⇒ Object
Set up a listener that will receive replies from the published messages.
-
#on_reply_error(&blk) ⇒ Object
This gets called if there is a mismatch in the message_id & correlation_id.
-
#on_serialisation_error {|blk| ... } ⇒ Object
Called if there is a problem serialising an ACL.
- #on_timeout(timeout = nil, &blk) ⇒ Object
-
#publish(payload, opts = {}, &blk) ⇒ Object
If reply queue is set the block will be called when the message recipient replies to the message and it is received.
- #queue_name ⇒ Object
- #status(&blk) ⇒ Object
Methods included from Util
#number_of_consumers, #number_of_messages
Methods included from Logger
Constructor Details
#initialize(queue_def, opts = {}, &blk) ⇒ Sender
Returns a new instance of Sender.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/smith/messaging/sender.rb', line 10 def initialize(queue_def, opts={}, &blk) # This is for backward compatibility. @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts) @acl_type_cache = AclTypeCache.instance @reply_container = {} prefetch = option_or_default(@queue_def., :prefetch, Smith.config.agent.prefetch) @options = AmqpOptions.new(@queue_def.) @options.routing_key = @queue_def.normalise @message_counts = Hash.new(0) @exchange_completion = EM::Completion.new @queue_completion = EM::Completion.new @channel_completion = EM::Completion.new open_channel(:prefetch => prefetch) do |channel| @channel_completion.succeed(channel) channel.direct(@queue_def.normalise, @options.exchange) do |exchange| exchange.on_return do |basic_return,,payload| logger.error { "#{@acl_type_cache[.type].new.parse_from_string} returned! Exchange: #{reply_code.exchange}, reply_code: #{basic_return.reply_code}, reply_text: #{basic_return.reply_text}" } logger.error { "Properties: #{.properties}" } end channel.queue(@queue_def.normalise, @options.queue) do |queue| queue.bind(exchange, :routing_key => @queue_def.normalise) @queue_completion.succeed(queue) @exchange_completion.succeed(exchange) end end end blk.call(self) if blk end |
Instance Method Details
#consumer_count(&blk) ⇒ Object
157 158 159 160 161 |
# File 'lib/smith/messaging/sender.rb', line 157 def consumer_count(&blk) status do |_, consumers| blk.call(consumers) end end |
#counter ⇒ Object
163 164 165 |
# File 'lib/smith/messaging/sender.rb', line 163 def counter @message_counts[@queue_def.denormalise] end |
#delete(&blk) ⇒ Object
124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/smith/messaging/sender.rb', line 124 def delete(&blk) @queue_completion.completion do |queue| queue.delete do @exchange_completion.completion do |exchange| exchange.delete do @channel_completion.completion do |channel| channel.close(&blk) end end end end end end |
#message_count(&blk) ⇒ Object
151 152 153 154 155 |
# File 'lib/smith/messaging/sender.rb', line 151 def (&blk) status do || blk.call() end end |
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
168 169 170 171 172 |
# File 'lib/smith/messaging/sender.rb', line 168 def on_error(chain=false, &blk) @channel_completion.completion do |channel| channel.on_error(&blk) end end |
#on_reply(opts = {}, &blk) ⇒ Object
Set up a listener that will receive replies from the published messages. You must publish with intent to reply – tee he.
If you pass in a queue_name the same queue name will get used for every reply. This means that there are no create and teardown costs for each message. If no queue_name is given a random one will be assigned.
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/smith/messaging/sender.rb', line 95 def on_reply(opts={}, &blk) @reply_proc = blk @timeout ||= Timeout.new(Smith.config.smith.timeout, :queue_name => @queue_def.denormalise) queue_def = QueueDefinition.new(opts.delete(:reply_queue_name) || "#{@queue_def.denormalise}.reply", opts.merge(:auto_delete => true, :durable => false)) logger.debug { "reply queue: #{queue_def.denormalise}" } @reply_queue_completion ||= EM::Completion.new.tap do |completion| Receiver.new(queue_def) do |queue| queue.subscribe do |payload, receiver| @reply_container.delete(receiver.correlation_id).tap do |reply| if reply reply[:timeout].cancel_timeout reply[:reply_proc].call(payload, receiver) else receiver.ack if opts[:auto_ack] logger.error { "No reply block for correlation_id: #{receiver.correlation_id}. This is probably a timed out message. Message: #{payload.to_json}" } end end end EM.next_tick do completion.succeed(queue) end end end end |
#on_reply_error(&blk) ⇒ Object
This gets called if there is a mismatch in the message_id & correlation_id.
139 140 141 |
# File 'lib/smith/messaging/sender.rb', line 139 def on_reply_error(&blk) @reply_error = blk end |
#on_serialisation_error {|blk| ... } ⇒ Object
Called if there is a problem serialising an ACL.
85 86 87 |
# File 'lib/smith/messaging/sender.rb', line 85 def on_serialisation_error(&blk) @on_serialisation_error = blk end |
#on_timeout(timeout = nil, &blk) ⇒ Object
78 79 80 |
# File 'lib/smith/messaging/sender.rb', line 78 def on_timeout(timeout=nil, &blk) @timeout = Timeout.new(timeout || Smith.config.smith.timeout, &blk) end |
#publish(payload, opts = {}, &blk) ⇒ Object
If reply queue is set the block will be called when the message recipient replies to the message and it is received.
If a block is passed to this method but the :reply_queue option is not set it will be called when the message has been safely published.
If the :reply_queue is an empty string then a random queue name will be generated.
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/smith/messaging/sender.rb', line 60 def publish(payload, opts={}, &blk) if @reply_queue_completion @reply_queue_completion.completion do |reply_queue| = random logger.verbose { "message_id: #{}" } #### !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! #### #### TODO if there is a timeout delete #### #### the proc from the @reply_container. #### #### !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! #### @reply_container[] = {:reply_proc => @reply_proc, :timeout => @timeout.clone.tap {|t| t.set_timeout() }} _publish(payload, @options.publish(opts, {:reply_to => reply_queue.queue_name, :message_id => })) end else _publish(payload, @options.publish(opts), &blk) end end |
#queue_name ⇒ Object
174 175 176 |
# File 'lib/smith/messaging/sender.rb', line 174 def queue_name @queue_def.denormalise end |
#status(&blk) ⇒ Object
143 144 145 146 147 148 149 |
# File 'lib/smith/messaging/sender.rb', line 143 def status(&blk) @queue_completion.completion do |queue| queue.status do |, num_consumers| blk.call(, num_consumers) end end end |