Module: Emissary::Operator::AMQP

Defined in:
lib/emissary/operator/amqp.rb

Defined Under Namespace

Classes: InvalidConfig, InvalidExchange

Constant Summary collapse

REQUIRED_KEYS =
[ :uri, :subscriptions ]
VALID_EXCHANGES =
[ :headers, :topic, :direct, :fanout ]
@@queue_count =
1

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#not_ackedObject

Returns the value of attribute not_acked.



33
34
35
# File 'lib/emissary/operator/amqp.rb', line 33

def not_acked
  @not_acked
end

#subscriptionsObject

Returns the value of attribute subscriptions.



32
33
34
# File 'lib/emissary/operator/amqp.rb', line 32

def subscriptions
  @subscriptions
end

Instance Method Details

#acknowledge(message) ⇒ Object



163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/emissary/operator/amqp.rb', line 163

def acknowledge message
  unless message.kind_of? Emissary::Message
    Emissary.logger.warning "Can't acknowledge message not deriving from Emissary::Message class" 
  end
  
  @not_acked.delete(message.uuid).ack 
  Emissary.logger.debug "Acknowledged Message ID: #{message.uuid}"
rescue NoMethodError
  Emissary.logger.warning "Message with UUID #{message.uuid} not acknowledged."
rescue Exception => e
  Emissary.logger.error "Error in Emissary::Operator::AMQP#acknowledge: #{e.class.name}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
end

#closeObject



188
189
190
191
192
193
194
195
196
197
198
199
# File 'lib/emissary/operator/amqp.rb', line 188

def close
  # Note: NOT currently supported by current version of RabbitMQ (1.7.x)
  #Emissary.logger.info "Requeueing unacknowledged messages"
  #@not_acked.each { |i| i.reject :requeue => true }

  # unfortunately, due to the nature of amqp's deferred asyncronous handling of data send/recv
  # and no ability to determine whether our shutdown message was /actually/ sent, we have to resort
  # to sleeping for 1s to ensure our message went out
  sleep 1 # XXX: HACK HACK HACK - BAD BAD BAD :-(
  unsubscribe
  ::AMQP.stop 
end

#connectObject



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/emissary/operator/amqp.rb', line 84

def connect
  if @connect_details[:ssl] and not EM.ssl?
    raise ::Emissary::Error::ConnectionError ,
      "Requested SSL connection but EventMachine not compiled with SSL support - quitting!"
  end

  @message_pool = Queue.new

  @connection = ::AMQP.connect(@connect_details)
  @channel = ::MQ.new(@connection)
  
  @queue_config = {
    :durable     => @config[:queue_durable].nil?     ? false : @config[:queue_durable],
    :auto_delete => @config[:queue_auto_delete].nil? ? true  : @config[:queue_auto_delete],
    :exclusive   => @config[:queue_exclusive].nil?   ? true  : @config[:queue_exclusive]
  }

  @queue = ::MQ::Queue.new(@channel, @queue_name, @queue_config)
  
  @exchanges = {}
  @exchanges[:topic]  = ::MQ::Exchange.new(@channel, :topic,  'amq.topic')
  @exchanges[:fanout] = ::MQ::Exchange.new(@channel, :fanout, 'amq.fanout')
  @exchanges[:direct] = ::MQ::Exchange.new(@channel, :direct, 'amq.direct')
  
  true
end

#post_initObject



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/emissary/operator/amqp.rb', line 55

def post_init
  uri = ::URI.parse @config[:uri]
  ssl = (uri.scheme.to_sym == :amqps)

  @connect_details = {
    :host  => uri.host,
    :ssl   => ssl,
    :user  => (::URI.decode(uri.user) rescue nil)     || 'guest',
    :pass  => (::URI.decode(uri.password) rescue nil) || 'guest',
    :vhost => (! uri.path.empty? ? uri.path : '/nimbul'),
    :port  => uri.port || (ssl ? 5671 : 5672),
    :logging => !!@config[:debug],
  }
  
  # normalize the subscriptions
  @subscriptions = @config[:subscriptions].inject({}) do |hash,queue|
    key, type = queue.split(':')
    type = type.nil? ? DEFAULT_EXCHANGE : (VALID_EXCHANGES.include?(type.to_sym) ? type.to_sym : DEFAULT_EXCHANGE)
    (hash[type] ||= []) << key
    hash
  end
  
  # one unique receiving queue per connection
  @queue_name = "#{Emissary.identity.queue_name}.#{@@queue_count}"
  @@queue_count += 1
  
  @not_acked = {}
end

#reject(message, opts = { :requeue => true }) ⇒ Object



176
177
178
179
180
181
182
183
184
185
186
# File 'lib/emissary/operator/amqp.rb', line 176

def reject message, opts = { :requeue => true }
  return true # currently not implemented in RabbitMQ 1.7.x (coming in versions supporting 0.9.1 AMQP spec)
  unless message.kind_of? Emissary::Message
    Emissary.logger.warning "Unable to reject message not deriving from Emissary::Message class" 
  end
  
  @not_acked.delete(message.uuid).reject(opts)
  Emissary.logger.debug "Rejected Message ID: #{message.uuid}"
rescue Exception => e
  Emissary.logger.error "Error in AMQP::Reject: #{e.class.name}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
end

#send_data(msg) ⇒ Object



153
154
155
156
157
158
159
160
161
# File 'lib/emissary/operator/amqp.rb', line 153

def send_data msg
  begin
    Emissary.logger.debug "Sending message through exchange '#{msg.exchange_type.to_s}' with routing key '#{msg.routing_key}'"
    Emissary.logger.debug "Message Originator: #{msg.originator} - Recipient: #{msg.recipient}"
    @exchanges[msg.exchange_type].publish msg.stamp_sent!.encode, :routing_key => msg.routing_key
  rescue NoMethodError
    raise InvalidExchange, "publish request on invalid exchange '#{msg.exchange_type}' with routing key '#{msg.routing_key}'"
  end
end

#statusObject



201
202
# File 'lib/emissary/operator/amqp.rb', line 201

def status
end

#subscribeObject



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/emissary/operator/amqp.rb', line 111

def subscribe
  @subscriptions.each do |exchange, keys|
    keys.map do |routing_key|
      Emissary.logger.debug "Subscribing To Key: '#{routing_key}' on Exchange '#{exchange}'"
      @queue.bind(@exchanges[exchange], :key => routing_key)
    end
  end

  # now bind to our name directly so we get messages that are
  # specifically for us 
  @queue.bind(@exchanges[:direct], :key => Emissary.identity.queue_name)

  @queue.subscribe(:ack => true) do |info, message|
    begin
      message = Emissary::Message.decode(message).stamp_received!
    rescue ::Emissary::Error::InvalidMessageFormat => e
      message = Emissary::Message.new
      message.errors << e
    end
    
    @not_acked[message.uuid] = info
    Emissary.logger.debug "Received through '#{info.exchange}' and routing key '#{info.routing_key}'"

    receive message 
  end
end

#unsubscribeObject



138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/emissary/operator/amqp.rb', line 138

def unsubscribe
  @subscriptions.each do |exchange, keys|
    keys.map do |routing_key|
      Emissary.logger.info "Unsubscribing from '#{routing_key}' on Exchange '#{exchange}'"
      @queue.unbind(@exchanges[exchange], :key => routing_key)
    end
  end
  
  Emissary.logger.info "Unsubscribing from my own queue."
  @queue.unbind(@exchanges[:direct], :key => Emissary.identity.queue_name)

  Emissary.logger.info "Cancelling all subscriptions."
  @queue.unsubscribe # could get away with only calling this but, the above "feels" cleaner
end

#validate_config!Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/emissary/operator/amqp.rb', line 37

def validate_config! 
  errors = []
  errors << 'config not a hash!' unless config.instance_of? Hash

  REQUIRED_KEYS.each do |key|
      errors << "missing required option '#{key}'" unless config.has_key? key
  end

  u = ::URI.parse(config[:uri])
  errors << "URI scheme /must/ be one of 'amqp' or 'amqps'" unless !!u.scheme.match(/^amqps{0,1}$/)
  [ :user, :password, :host, :path ].each do |v|
      errors << "invalid value 'nil' for URI part [#{v}]" if u.respond_to? v and u.send(v).nil?
  end
  
  raise errors.join("\n") unless errors.empty?
  return true
end