Class: Smith::Messaging::Sender
- Inherits:
-
Object
- Object
- Smith::Messaging::Sender
- Defined in:
- lib/smith/messaging/sender.rb
Instance Method Summary collapse
- #consumer_count(&blk) ⇒ Object
- #counter ⇒ Object
- #delete(&blk) ⇒ Object
-
#initialize(queue_def, opts = {}, &blk) ⇒ Sender
constructor
A new instance of Sender.
- #message_count(&blk) ⇒ Object
-
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
-
#on_reply(opts = {}, &blk) ⇒ Object
Set up a listener that will receive replies from the published messages.
-
#on_reply_error(&blk) ⇒ Object
This gets called if there is a mismatch in the message_id & correlation_id.
-
#on_serialisation_error {|blk| ... } ⇒ Object
Called if there is a problem serialising an ACL.
- #on_timeout(timeout = nil, &blk) ⇒ Object
-
#publish(payload, opts = {}, &blk) ⇒ Object
If reply queue is set the block will be called when the message recipient replies to the message and it is received.
- #queue_name ⇒ Object
- #status(&blk) ⇒ Object
Methods included from Util
#number_of_consumers, #number_of_messages
Methods included from Logger
Constructor Details
#initialize(queue_def, opts = {}, &blk) ⇒ Sender
Returns a new instance of Sender.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/smith/messaging/sender.rb', line 10 def initialize(queue_def, opts={}, &blk) # This is for backward compatibility. @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts) @acl_type_cache = AclTypeCache.instance @reply_container = {} prefetch = option_or_default(@queue_def., :prefetch, Smith.config.agent.prefetch) @options = AmqpOptions.new(@queue_def.) @options.routing_key = @queue_def.normalise @message_counts = Hash.new(0) @exchange_completion = EM::Completion.new @queue_completion = EM::Completion.new @channel_completion = EM::Completion.new open_channel(:prefetch => prefetch) do |channel| @channel_completion.succeed(channel) exchange_type = (opts[:fanout]) ? :fanout : :direct channel.send(exchange_type, @queue_def.normalise, @options.exchange) do |exchange| exchange.on_return do |basic_return, , payload| logger.error { acl_type = @acl_type_cache.get_by_hash(.properties[:type]).to_s rescue "Unknown ACL type" "Cannot deliver message type: #{acl_type}, from exchange: \"#{basic_return.exchange}\", using routing key: \"#{basic_return.routing_key}\"" } end if opts[:fanout] @exchange_completion.succeed(exchange) else channel.queue(@queue_def.normalise, @options.queue) do |queue| queue.bind(exchange, :routing_key => @queue_def.normalise) @queue_completion.succeed(queue) @exchange_completion.succeed(exchange) end end end end blk.call(self) if blk end |
Instance Method Details
#consumer_count(&blk) ⇒ Object
167 168 169 170 171 |
# File 'lib/smith/messaging/sender.rb', line 167 def consumer_count(&blk) status do |_, consumers| blk.call(consumers) end end |
#counter ⇒ Object
173 174 175 |
# File 'lib/smith/messaging/sender.rb', line 173 def counter @message_counts[@queue_def.denormalise] end |
#delete(&blk) ⇒ Object
134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/smith/messaging/sender.rb', line 134 def delete(&blk) @queue_completion.completion do |queue| queue.delete do @exchange_completion.completion do |exchange| exchange.delete do @channel_completion.completion do |channel| channel.close(&blk) end end end end end end |
#message_count(&blk) ⇒ Object
161 162 163 164 165 |
# File 'lib/smith/messaging/sender.rb', line 161 def (&blk) status do || blk.call() end end |
#on_error(chain = false, &blk) ⇒ Object
Define a channel error handler.
178 179 180 181 182 |
# File 'lib/smith/messaging/sender.rb', line 178 def on_error(chain=false, &blk) @channel_completion.completion do |channel| channel.on_error(&blk) end end |
#on_reply(opts = {}, &blk) ⇒ Object
Set up a listener that will receive replies from the published messages. You must publish with intent to reply – tee he.
If you pass in a queue_name the same queue name will get used for every reply. This means that there are no create and teardown costs for each message. If no queue_name is given a random one will be assigned.
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/smith/messaging/sender.rb', line 103 def on_reply(opts={}, &blk) @reply_proc = blk @timeout ||= Timeout.new(Smith.config.smith.timeout, :queue_name => @queue_def.denormalise) reply_queue = opts.clone.delete(:reply_queue_name) { random("#{@queue_def.denormalise}.") } queue_def = QueueDefinition.new(reply_queue, opts.merge(:auto_delete => true, :durable => false)) logger.debug { "reply queue: #{queue_def.denormalise}" } @reply_queue_completion ||= EM::Completion.new.tap do |completion| Receiver.new(queue_def) do |queue| queue.subscribe do |payload, receiver| @reply_container.delete(receiver.correlation_id).tap do |reply| if reply reply[:timeout].cancel_timeout reply[:reply_proc].call(payload, receiver) else receiver.ack if opts[:auto_ack] logger.error { "No reply block for correlation_id: #{receiver.correlation_id}. This is probably a timed out message. Message: #{payload.to_json}" } end end end EM.next_tick do completion.succeed(queue) end end end end |
#on_reply_error(&blk) ⇒ Object
This gets called if there is a mismatch in the message_id & correlation_id.
149 150 151 |
# File 'lib/smith/messaging/sender.rb', line 149 def on_reply_error(&blk) @reply_error = blk end |
#on_serialisation_error {|blk| ... } ⇒ Object
Called if there is a problem serialising an ACL.
93 94 95 |
# File 'lib/smith/messaging/sender.rb', line 93 def on_serialisation_error(&blk) @on_serialisation_error = blk end |
#on_timeout(timeout = nil, &blk) ⇒ Object
86 87 88 |
# File 'lib/smith/messaging/sender.rb', line 86 def on_timeout(timeout=nil, &blk) @timeout = Timeout.new(timeout || Smith.config.smith.timeout, &blk) end |
#publish(payload, opts = {}, &blk) ⇒ Object
If reply queue is set the block will be called when the message recipient replies to the message and it is received.
If a block is passed to this method but the :reply_queue option is not set it will be called when the message has been safely published.
If the :reply_queue is an empty string then a random queue name will be generated.
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/smith/messaging/sender.rb', line 68 def publish(payload, opts={}, &blk) if @reply_queue_completion @reply_queue_completion.completion do |reply_queue| = random logger.verbose { "message_id: #{}" } #### !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! #### #### TODO if there is a timeout delete #### #### the proc from the @reply_container. #### #### !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! #### @reply_container[] = {:reply_proc => @reply_proc, :timeout => @timeout.clone.tap {|t| t.set_timeout() }} _publish(payload, @options.publish(opts, {:reply_to => reply_queue.queue_name, :message_id => }), &blk) end else _publish(payload, @options.publish(opts), &blk) end end |
#queue_name ⇒ Object
184 185 186 |
# File 'lib/smith/messaging/sender.rb', line 184 def queue_name @queue_def.denormalise end |
#status(&blk) ⇒ Object
153 154 155 156 157 158 159 |
# File 'lib/smith/messaging/sender.rb', line 153 def status(&blk) @queue_completion.completion do |queue| queue.status do |, num_consumers| blk.call(, num_consumers) end end end |