Class: Smith::Messaging::Sender

Inherits:
Object
  • Object
show all
Includes:
Logger, Util
Defined in:
lib/smith/messaging/sender.rb

Instance Method Summary collapse

Methods included from Util

#number_of_consumers, #number_of_messages

Methods included from Logger

included

Constructor Details

#initialize(queue_def, opts = {}, &blk) ⇒ Sender

Returns a new instance of Sender.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/smith/messaging/sender.rb', line 10

def initialize(queue_def, opts={}, &blk)

  # This is for backward compatibility.
  @queue_def = queue_def.is_a?(QueueDefinition) ? queue_def : QueueDefinition.new(queue_def, opts)

  @acl_type_cache = AclTypeCache.instance

  @reply_container = {}

  prefetch = option_or_default(@queue_def.options, :prefetch, Smith.config.agent.prefetch)

  @options = AmqpOptions.new(@queue_def.options)
  @options.routing_key = @queue_def.normalise

  @message_counts = Hash.new(0)

  @exchange_completion = EM::Completion.new
  @queue_completion = EM::Completion.new
  @channel_completion = EM::Completion.new

  open_channel(:prefetch => prefetch) do |channel|
    @channel_completion.succeed(channel)
    channel.direct(@queue_def.normalise, @options.exchange) do |exchange|

      exchange.on_return do |basic_return,,payload|
        logger.error { "#{@acl_type_cache[.type].new.parse_from_string} returned! Exchange: #{reply_code.exchange}, reply_code: #{basic_return.reply_code}, reply_text: #{basic_return.reply_text}" }
        logger.error { "Properties: #{.properties}" }
      end

      channel.queue(@queue_def.normalise, @options.queue) do |queue|
        queue.bind(exchange, :routing_key => @queue_def.normalise)

        @queue_completion.succeed(queue)
        @exchange_completion.succeed(exchange)
      end
    end
  end

  blk.call(self) if blk
end

Instance Method Details

#consumer_count(&blk) ⇒ Object



157
158
159
160
161
# File 'lib/smith/messaging/sender.rb', line 157

def consumer_count(&blk)
  status do |_, consumers|
    blk.call(consumers)
  end
end

#counterObject



163
164
165
# File 'lib/smith/messaging/sender.rb', line 163

def counter
  @message_counts[@queue_def.denormalise]
end

#delete(&blk) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/smith/messaging/sender.rb', line 124

def delete(&blk)
  @queue_completion.completion do |queue|
    queue.delete do
      @exchange_completion.completion do |exchange|
        exchange.delete do
          @channel_completion.completion do |channel|
            channel.close(&blk)
          end
        end
      end
    end
  end
end

#message_count(&blk) ⇒ Object



151
152
153
154
155
# File 'lib/smith/messaging/sender.rb', line 151

def message_count(&blk)
  status do |messages|
    blk.call(messages)
  end
end

#on_error(chain = false, &blk) ⇒ Object

Define a channel error handler.



168
169
170
171
172
# File 'lib/smith/messaging/sender.rb', line 168

def on_error(chain=false, &blk)
  @channel_completion.completion do |channel|
    channel.on_error(&blk)
  end
end

#on_reply(opts = {}, &blk) ⇒ Object

Set up a listener that will receive replies from the published messages. You must publish with intent to reply – tee he.

If you pass in a queue_name the same queue name will get used for every reply. This means that there are no create and teardown costs for each message. If no queue_name is given a random one will be assigned.



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/smith/messaging/sender.rb', line 95

def on_reply(opts={}, &blk)
  @reply_proc = blk

  @timeout ||= Timeout.new(Smith.config.smith.timeout, :queue_name => @queue_def.denormalise)

  queue_def = QueueDefinition.new(opts.delete(:reply_queue_name) || "#{@queue_def.denormalise}.reply", opts.merge(:auto_delete => true, :durable => false))
  logger.debug { "reply queue: #{queue_def.denormalise}" }

  @reply_queue_completion ||= EM::Completion.new.tap do |completion|
    Receiver.new(queue_def) do |queue|
      queue.subscribe do |payload, receiver|
        @reply_container.delete(receiver.correlation_id).tap do |reply|
          if reply
            reply[:timeout].cancel_timeout
            reply[:reply_proc].call(payload, receiver)
          else
            receiver.ack if opts[:auto_ack]
            logger.error { "No reply block for correlation_id: #{receiver.correlation_id}. This is probably a timed out message. Message: #{payload.to_json}" }
          end
        end
      end

      EM.next_tick do
        completion.succeed(queue)
      end
    end
  end
end

#on_reply_error(&blk) ⇒ Object

This gets called if there is a mismatch in the message_id & correlation_id.



139
140
141
# File 'lib/smith/messaging/sender.rb', line 139

def on_reply_error(&blk)
  @reply_error = blk
end

#on_serialisation_error {|blk| ... } ⇒ Object

Called if there is a problem serialising an ACL.

Yields:

  • (blk)

    block to run when there is an error.

Yield Parameters:

  • the (Proc)

    publish callback.



85
86
87
# File 'lib/smith/messaging/sender.rb', line 85

def on_serialisation_error(&blk)
  @on_serialisation_error = blk
end

#on_timeout(timeout = nil, &blk) ⇒ Object



78
79
80
# File 'lib/smith/messaging/sender.rb', line 78

def on_timeout(timeout=nil, &blk)
  @timeout = Timeout.new(timeout || Smith.config.smith.timeout, &blk)
end

#publish(payload, opts = {}, &blk) ⇒ Object

If reply queue is set the block will be called when the message recipient replies to the message and it is received.

If a block is passed to this method but the :reply_queue option is not set it will be called when the message has been safely published.

If the :reply_queue is an empty string then a random queue name will be generated.



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/smith/messaging/sender.rb', line 60

def publish(payload, opts={}, &blk)
  if @reply_queue_completion
    @reply_queue_completion.completion do |reply_queue|
      message_id = random
      logger.verbose { "message_id: #{message_id}" }

      #### !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ####
      #### TODO if there is a timeout delete   ####
      #### the proc from the @reply_container. ####
      #### !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! ####
      @reply_container[message_id] = {:reply_proc => @reply_proc, :timeout => @timeout.clone.tap {|t| t.set_timeout(message_id) }}
      _publish(payload, @options.publish(opts, {:reply_to => reply_queue.queue_name, :message_id => message_id}))
    end
  else
    _publish(payload, @options.publish(opts), &blk)
  end
end

#queue_nameObject



174
175
176
# File 'lib/smith/messaging/sender.rb', line 174

def queue_name
  @queue_def.denormalise
end

#status(&blk) ⇒ Object



143
144
145
146
147
148
149
# File 'lib/smith/messaging/sender.rb', line 143

def status(&blk)
  @queue_completion.completion do |queue|
    queue.status do |num_messages, num_consumers|
      blk.call(num_messages, num_consumers)
    end
  end
end