Class: Mafia::ConsumerPool
- Inherits:
-
Object
- Object
- Mafia::ConsumerPool
- Defined in:
- lib/mafia/consumer_pool.rb
Instance Method Summary collapse
- #fetch_consumer(routing_key) ⇒ Object
-
#initialize(config = nil) ⇒ ConsumerPool
constructor
A new instance of ConsumerPool.
- #subscribe ⇒ Object
Constructor Details
#initialize(config = nil) ⇒ ConsumerPool
Returns a new instance of ConsumerPool.
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
# File 'lib/mafia/consumer_pool.rb', line 3 def initialize(config=nil) @config = config || Mafia.config Mafia.logger.info("Mafia consumer pool starting connection to #{Mafia.config_to_url(@config)}") @conn = Bunny.new(@config.slice(:host, :port, :username, :password, :vhost)) @conn.start Mafia.logger.info("Mafia consumer pool started, waiting for rpc events") # create a channel and exchange that both client and server know about @channel = @conn.create_channel @queue = @channel.queue(@config[:queue]) @exchange = @channel.default_exchange end |
Instance Method Details
#fetch_consumer(routing_key) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/mafia/consumer_pool.rb', line 49 def fetch_consumer(routing_key) Mafia.consumers.each do |consumer| if consumer.routing_key == routing_key Mafia.logger.info("Routing key `#{routing_key}` to consumer #{consumer.name}") return consumer end end Mafia.logger.info("Routing key `#{routing_key}` to default consumer") DefaultConsumer end |
#subscribe ⇒ Object
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/mafia/consumer_pool.rb', line 19 def subscribe @queue.subscribe(block: true) do |delivery_info, properties, payload| = JSON.parse payload # calculate a result klass = fetch_consumer(['method']) begin args = (['params'].unshift(['method'])) result = [:ok, klass.process(*args)] rescue Exception => e Mafia.logger.error("error while running #{klass} with args: #{req_message['params']}: #{e.message}") result = [:error, e.] end reply = { 'id' => ['id'], 'result' => result, 'jsonrpc' => '2.0' } # enqueue our reply in the return queue Mafia.logger.info("publish to return queue #{properties.reply_to}") @exchange.publish(JSON.generate(reply), { routing_key: properties.reply_to, correlation_id: properties.correlation_id}) end end |