Class: RailwayIpc::Consumer
- Inherits:
-
Object
- Object
- RailwayIpc::Consumer
- Includes:
- Sneakers::Worker
- Defined in:
- lib/railway_ipc/consumer/consumer.rb
Class Method Summary collapse
- .handle(message_type, with:) ⇒ Object
- .inherited(base) ⇒ Object
- .listen_to(queue:, exchange:, options: {}) ⇒ Object
Instance Method Summary collapse
- #exchange_name ⇒ Object
- #get_handler(type) ⇒ Object
- #handlers ⇒ Object
- #queue_name ⇒ Object
- #registered_handlers ⇒ Object
- #work(payload) ⇒ Object
Class Method Details
.handle(message_type, with:) ⇒ Object
34 35 36 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 34 def self.handle(, with:) handlers.register(message: , handler: with) end |
.inherited(base) ⇒ Object
7 8 9 10 11 12 13 14 15 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 7 def self.inherited(base) super base.instance_eval do def handlers @handlers ||= RailwayIpc::HandlerStore.new end end end |
.listen_to(queue:, exchange:, options: {}) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 17 def self.listen_to(queue:, exchange:, options: {}) unless .empty? RailwayIpc.logger.info( "Overriding configuration for #{queue} with new options", feature: 'railway_ipc_consumer', options: ) end from_queue queue, { exchange: exchange, durable: true, exchange_type: :fanout, connection: RailwayIpc.bunny_connection }.merge() end |
Instance Method Details
#exchange_name ⇒ Object
50 51 52 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 50 def exchange_name queue.opts[:exchange] end |
#get_handler(type) ⇒ Object
70 71 72 73 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 70 def get_handler(type) manifest = handlers.get(type) manifest ? manifest.handler.new : nil end |
#handlers ⇒ Object
38 39 40 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 38 def handlers self.class.handlers end |
#queue_name ⇒ Object
46 47 48 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 46 def queue_name queue.name end |
#registered_handlers ⇒ Object
42 43 44 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 42 def registered_handlers handlers.registered end |
#work(payload) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 54 def work(payload) = RailwayIpc::IncomingMessage.new(payload) RailwayIpc::ProcessIncomingMessage.call(self, ) ack! rescue StandardError => e RailwayIpc.logger.error( e., feature: 'railway_ipc_consumer', exchange: exchange_name, queue: queue_name, error: e.class, payload: payload ) raise e end |