Class: RailwayIpc::Consumer
- Inherits:
-
Object
- Object
- RailwayIpc::Consumer
- Includes:
- Sneakers::Worker
- Defined in:
- lib/railway_ipc/consumer/consumer.rb
Class Method Summary collapse
-
.handle(message_type, with:) ⇒ Object
rubocop:enable Metrics/MethodLength.
- .inherited(base) ⇒ Object
-
.listen_to(queue:, exchange:, options: {}) ⇒ Object
rubocop:disable Metrics/MethodLength.
Instance Method Summary collapse
- #exchange_name ⇒ Object
- #get_handler(type) ⇒ Object
- #handlers ⇒ Object
- #queue_name ⇒ Object
- #registered_handlers ⇒ Object
-
#work_with_params(payload, _delivery_info, metadata) ⇒ Object
REFACTOR: Long term we should think about not leaking Sneakers methods as part of Railway’s public API since clients can (and do) override them.
Class Method Details
.handle(message_type, with:) ⇒ Object
rubocop:enable Metrics/MethodLength
39 40 41 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 39 def self.handle(, with:) handlers.register(message: , handler: with) end |
.inherited(base) ⇒ Object
7 8 9 10 11 12 13 14 15 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 7 def self.inherited(base) super base.instance_eval do def handlers @handlers ||= RailwayIpc::HandlerStore.new end end end |
.listen_to(queue:, exchange:, options: {}) ⇒ Object
rubocop:disable Metrics/MethodLength
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 18 def self.listen_to(queue:, exchange:, options: {}) unless .empty? RailwayIpc.logger.info( "Overriding configuration for #{queue} with new options", feature: 'railway_ipc_consumer', options: ) end from_queue queue, { exchange: exchange, durable: true, exchange_type: :fanout, arguments: { 'x-dead-letter-exchange' => 'ipc:errors' }, connection: RailwayIpc.bunny_connection }.merge() end |
Instance Method Details
#exchange_name ⇒ Object
55 56 57 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 55 def exchange_name queue.opts[:exchange] end |
#get_handler(type) ⇒ Object
81 82 83 84 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 81 def get_handler(type) manifest = handlers.get(type) manifest ? manifest.handler.new : nil end |
#handlers ⇒ Object
43 44 45 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 43 def handlers self.class.handlers end |
#queue_name ⇒ Object
51 52 53 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 51 def queue_name queue.name end |
#registered_handlers ⇒ Object
47 48 49 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 47 def registered_handlers handlers.registered end |
#work_with_params(payload, _delivery_info, metadata) ⇒ Object
REFACTOR: Long term we should think about not leaking Sneakers methods as part of Railway’s public API since clients can (and do) override them. -BN
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/railway_ipc/consumer/consumer.rb', line 62 def work_with_params(payload, _delivery_info, ) headers = .headers || {} = headers.fetch('message_format', 'binary_protobuf') = RailwayIpc::IncomingMessage.new(payload, message_format: ) RailwayIpc::ProcessIncomingMessage.call(self, ) ack! rescue StandardError => e RailwayIpc.logger.error( e., feature: 'railway_ipc_consumer', exchange: exchange_name, queue: queue_name, error: e.class, payload: payload ) reject! end |