Class: Smith::Messaging::Sender

Inherits:
Object
  • Object
show all
Includes:
Logger, Util
Defined in:
lib/smith/messaging/sender.rb

Instance Method Summary collapse

Methods included from Util

#number_of_consumers, #number_of_messages

Methods included from Logger

included

Constructor Details

#initialize(queue_def, opts = {}, &blk) ⇒ Sender

Returns a new instance of Sender.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/smith/messaging/sender.rb', line 10

def initialize(queue_def, opts={}, &blk)

  # This is for backward compatibility.
  @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts)

  @acl_type_cache = AclTypeCache.instance

  @reply_container = {}

  prefetch = option_or_default(@queue_def.options, :prefetch, Smith.config.agent.prefetch)

  @options = AmqpOptions.new(@queue_def.options)
  @options.routing_key = @queue_def.normalise

  @message_counts = Hash.new(0)

  @exchange_completion = EM::Completion.new
  @queue_completion = EM::Completion.new
  @channel_completion = EM::Completion.new

  open_channel(:prefetch => prefetch) do |channel|
    @channel_completion.succeed(channel)
    exchange_type = (opts[:fanout]) ? :fanout : :direct

    channel.send(exchange_type, @queue_def.normalise, @options.exchange) do |exchange|

      exchange.on_return do |basic_return, , payload|
        logger.error {
          acl_type = @acl_type_cache.get_by_hash(.properties[:type]).to_s rescue "Unknown ACL type"
          "Cannot deliver message type: #{acl_type}, from exchange: \"#{basic_return.exchange}\", using routing key: \"#{basic_return.routing_key}\""
        }
      end

      if opts[:fanout]
        @exchange_completion.succeed(exchange)
      else
        channel.queue(@queue_def.normalise, @options.queue) do |queue|
          queue.bind(exchange, :routing_key => @queue_def.normalise)

          @queue_completion.succeed(queue)
          @exchange_completion.succeed(exchange)
        end
      end
    end
  end

  blk.call(self) if blk
end

Instance Method Details

#consumer_count(&blk) ⇒ Object



167
168
169
170
171
# File 'lib/smith/messaging/sender.rb', line 167

def consumer_count(&blk)
  status do |_, consumers|
    blk.call(consumers)
  end
end

#counterObject



173
174
175
# File 'lib/smith/messaging/sender.rb', line 173

def counter
  @message_counts[@queue_def.denormalise]
end

#delete(&blk) ⇒ Object



134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/smith/messaging/sender.rb', line 134

def delete(&blk)
  @queue_completion.completion do |queue|
    queue.delete do
      @exchange_completion.completion do |exchange|
        exchange.delete do
          @channel_completion.completion do |channel|
            channel.close(&blk)
          end
        end
      end
    end
  end
end

#message_count(&blk) ⇒ Object



161
162
163
164
165
# File 'lib/smith/messaging/sender.rb', line 161

def message_count(&blk)
  status do |messages|
    blk.call(messages)
  end
end

#on_error(chain = false, &blk) ⇒ Object

Define a channel error handler.



178
179
180
181
182
# File 'lib/smith/messaging/sender.rb', line 178

def on_error(chain=false, &blk)
  @channel_completion.completion do |channel|
    channel.on_error(&blk)
  end
end

#on_reply(opts = {}, &blk) ⇒ Object

Set up a listener that will receive replies from the published messages. You must publish with intent to reply – tee he.

If you pass in a queue_name the same queue name will get used for every reply. This means that there are no create and teardown costs for each message. If no queue_name is given a random one will be assigned.



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/smith/messaging/sender.rb', line 103

def on_reply(opts={}, &blk)
  @reply_proc = blk

  @timeout ||= Timeout.new(Smith.config.smith.timeout, :queue_name => @queue_def.denormalise)

  reply_queue = opts.clone.delete(:reply_queue_name) { random("#{@queue_def.denormalise}.") }

  queue_def = QueueDefinition.new(reply_queue, opts.merge(:auto_delete => true, :durable => false))
  logger.debug { "reply queue: #{queue_def.denormalise}" }

  @reply_queue_completion ||= EM::Completion.new.tap do |completion|
    Receiver.new(queue_def) do |queue|
      queue.subscribe do |payload, receiver|
        @reply_container.delete(receiver.correlation_id).tap do |reply|
          if reply
            reply[:timeout].cancel_timeout
            reply[:reply_proc].call(payload, receiver)
          else
            receiver.ack if opts[:auto_ack]
            logger.error { "No reply block for correlation_id: #{receiver.correlation_id}. This is probably a timed out message. Message: #{payload.to_json}" }
          end
        end
      end

      EM.next_tick do
        completion.succeed(queue)
      end
    end
  end
end

#on_reply_error(&blk) ⇒ Object

This gets called if there is a mismatch in the message_id & correlation_id.



149
150
151
# File 'lib/smith/messaging/sender.rb', line 149

def on_reply_error(&blk)
  @reply_error = blk
end

#on_serialisation_error {|blk| ... } ⇒ Object

Called if there is a problem serialising an ACL.

Yields:

  • (blk)

    block to run when there is an error.

Yield Parameters:

  • the (Proc)

    publish callback.



93
94
95
# File 'lib/smith/messaging/sender.rb', line 93

def on_serialisation_error(&blk)
  @on_serialisation_error = blk
end

#on_timeout(timeout = nil, &blk) ⇒ Object



86
87
88
# File 'lib/smith/messaging/sender.rb', line 86

def on_timeout(timeout=nil, &blk)
  @timeout = Timeout.new(timeout || Smith.config.smith.timeout, &blk)
end

#publish(payload, opts = {}, &blk) ⇒ Object

If reply queue is set the block will be called when the message recipient replies to the message and it is received.

If a block is passed to this method but the :reply_queue option is not set it will be called when the message has been safely published.

If the :reply_queue is an empty string then a random queue name will be generated.



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/smith/messaging/sender.rb', line 68

def publish(payload, opts={}, &blk)
  if @reply_queue_completion
    @reply_queue_completion.completion do |reply_queue|
      message_id = random
      logger.verbose { "message_id: #{message_id}" }

      #### !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ####
      #### TODO if there is a timeout delete   ####
      #### the proc from the @reply_container. ####
      #### !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ####
      @reply_container[message_id] = {:reply_proc => @reply_proc, :timeout => @timeout.clone.tap {|t| t.set_timeout(message_id) }}
      _publish(payload, @options.publish(opts, {:reply_to => reply_queue.queue_name, :message_id => message_id}), &blk)
    end
  else
    _publish(payload, @options.publish(opts), &blk)
  end
end

#queue_nameObject



184
185
186
# File 'lib/smith/messaging/sender.rb', line 184

def queue_name
  @queue_def.denormalise
end

#status(&blk) ⇒ Object



153
154
155
156
157
158
159
# File 'lib/smith/messaging/sender.rb', line 153

def status(&blk)
  @queue_completion.completion do |queue|
    queue.status do |num_messages, num_consumers|
      blk.call(num_messages, num_consumers)
    end
  end
end