Module: Wakame::AMQPClient

Included in:
Agent, Master
Defined in:
lib/wakame/amqp_client.rb

Defined Under Namespace

Modules: ClassMethods

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#amqp_clientObject (readonly)

Returns the value of attribute amqp_client.



115
116
117
# File 'lib/wakame/amqp_client.rb', line 115

def amqp_client
  @amqp_client
end

#mqObject (readonly)

Returns the value of attribute mq.



115
116
117
# File 'lib/wakame/amqp_client.rb', line 115

def mq
  @mq
end

#queue_subscribersObject (readonly)

Returns the value of attribute queue_subscribers.



219
220
221
# File 'lib/wakame/amqp_client.rb', line 219

def queue_subscribers
  @queue_subscribers
end

Class Method Details

.included(klass) ⇒ Object



13
14
15
16
17
# File 'lib/wakame/amqp_client.rb', line 13

def self.included(klass)
  klass.extend(ClassMethods)
  klass.class_eval {
  }
end

Instance Method Details

#add_subscriber(queue_name, &blk) ⇒ Object



221
222
223
224
225
226
227
228
229
230
# File 'lib/wakame/amqp_client.rb', line 221

def add_subscriber(queue_name, &blk)
  # @mq object can be used here as it is just for checing the member of defined queues.
  raise "Undefined queue name : #{queue_name}" unless @mq.queues.has_key?(queue_name)
  EM.barrier {
    @queue_subscribers ||= {}
    @queue_subscribers[queue_name] ||= []
    
    @queue_subscribers[queue_name] << blk
  }
end

#amqObject



144
145
146
147
# File 'lib/wakame/amqp_client.rb', line 144

def amq
  raise 'AMQP connection is not established yet' if Thread.current[:mq].nil?
  Thread.current[:mq]
end

#amqp_server_uriObject



117
118
119
120
121
122
123
124
# File 'lib/wakame/amqp_client.rb', line 117

def amqp_server_uri
  raise "The connection is not established yet." unless @amqp_client && connected?

  URI::AMQP.build(:host => @amqp_client.settings[:host],
                  :port => @amqp_client.settings[:port],
                  :path => @amqp_client.settings[:vhost]
                  )
end

#cleanupObject



149
150
# File 'lib/wakame/amqp_client.rb', line 149

def cleanup
end

#close(&blk) ⇒ Object



152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/wakame/amqp_client.rb', line 152

def close(&blk)
  closing_proc = proc {
    begin
      cleanup
      yield if block_given?
    ensure
      @amqp_client = nil
      @mq = Thread.current[:mq] = nil
    end
  }

  @amqp_client.close {
    closing_proc.call
  } unless @amqp_client.nil?
end

#connect(*args) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/wakame/amqp_client.rb', line 126

def connect(*args)
  close() unless connected?
  @amqp_client = AMQP.connect(*args)
  @amqp_client.instance_eval {
    def settings
      @settings
    end
  }
  @mq = Thread.current[:mq] = MQ.new(@amqp_client)

  run_defered_callbacks
  yield if block_given?
end

#connected?Boolean

Returns:

  • (Boolean)


140
141
142
# File 'lib/wakame/amqp_client.rb', line 140

def connected?
  !@amqp_client.nil?
end

#define_queue(name, exchange_name, opts = {}) ⇒ Object



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/wakame/amqp_client.rb', line 199

def define_queue(name, exchange_name, opts={})
  name = instance_eval('"' + name.gsub(/%\{/, '#{') + '"')
  opts.each { |k,v|
    if v.is_a? String
      opts[k] = instance_eval('"' + v.gsub(/%\{/, '#{') + '"')
    end
  }

  @queue_subscribers ||= {}

  q = amq.queue(name, opts)
  q.bind( exchange_name, opts ).subscribe {|data|
    unless queue_subscribers[name].nil?
      queue_subscribers[name].each { |p|
        p.call(data)
      }
    end
  }
end

#publish_to(name, *args) ⇒ Object

When you want to broadcast the data to all bound queues:

publish_to('exchange name', 'data')
publish_to('exchange name', '*', 'data')

When you want to send the data to keyed queue(s):

publish_to('exchange name', 'group.1', 'data')


174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/wakame/amqp_client.rb', line 174

def publish_to(name, *args)
  publish_proc = proc {
    ex = amq.exchanges[name] || raise("Undefined exchange name : #{name}")
    case ex.type
    when :topic
      if args.size == 1
        key = '*'
        data = args[0]
      else
        key = args[0].to_s
        data = args[1]
      end
    else
      data = args[0]
    end
    ex.publish(data, :key=>key)
  }

  if Thread.current[:mq].nil?
    EM.next_tick { publish_proc.call }
  else
    publish_proc.call
  end
end