Class: RailwayIpc::Server
- Inherits:
-
Object
- Object
- RailwayIpc::Server
- Defined in:
- lib/railway_ipc/rpc/server/server.rb
Instance Attribute Summary collapse
-
#message ⇒ Object
readonly
Returns the value of attribute message.
-
#responder ⇒ Object
readonly
Returns the value of attribute responder.
Class Method Summary collapse
Instance Method Summary collapse
-
#handle_request(payload) ⇒ Object
rubocop:disable Metrics/AbcSize rubocop:disable Metrics/MethodLength.
-
#initialize(_queue, _pool, opts = { automatic_recovery: true }, rabbit_adapter: RailwayIpc::Rabbitmq::Adapter) ⇒ Server
constructor
A new instance of Server.
- #run ⇒ Object
- #stop ⇒ Object
-
#work(payload) ⇒ Object
rubocop:disable Metrics/AbcSize rubocop:disable Metrics/MethodLength.
Methods included from RPC::ErrorAdapterConfigurable
rpc_error_adapter, rpc_error_adapter_class
Methods included from RPC::MessageObservationConfigurable
exchange_name, listen_to, queue_name
Constructor Details
#initialize(_queue, _pool, opts = { automatic_recovery: true }, rabbit_adapter: RailwayIpc::Rabbitmq::Adapter) ⇒ Server
Returns a new instance of Server.
17 18 19 20 21 22 23 |
# File 'lib/railway_ipc/rpc/server/server.rb', line 17 def initialize(_queue, _pool, opts={ automatic_recovery: true }, rabbit_adapter: RailwayIpc::Rabbitmq::Adapter) @rabbit_connection = rabbit_adapter.new( queue_name: self.class.queue_name, exchange_name: self.class.exchange_name, options: opts ) end |
Instance Attribute Details
#message ⇒ Object (readonly)
Returns the value of attribute message.
11 12 13 |
# File 'lib/railway_ipc/rpc/server/server.rb', line 11 def @message end |
#responder ⇒ Object (readonly)
Returns the value of attribute responder.
11 12 13 |
# File 'lib/railway_ipc/rpc/server/server.rb', line 11 def responder @responder end |
Class Method Details
.respond_to(message_type, with:) ⇒ Object
13 14 15 |
# File 'lib/railway_ipc/rpc/server/server.rb', line 13 def self.respond_to(, with:) RailwayIpc::RPC::ServerResponseHandlers.instance.register(handler: with, message: ) end |
Instance Method Details
#handle_request(payload) ⇒ Object
rubocop:disable Metrics/AbcSize rubocop:disable Metrics/MethodLength
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/railway_ipc/rpc/server/server.rb', line 67 def handle_request(payload) response = work(payload) rescue StandardError => e RailwayIpc.logger.error( 'Error responding to message.', exception: e, feature: 'railway_ipc_consumer', exchange: self.class.exchange_name, queue: self.class.queue_name, protobuf: { type: .class, data: } ) response = self.class.rpc_error_adapter_class.(e, ) ensure if response rabbit_connection.reply( RailwayIpc::Rabbitmq::Payload.encode(response), .reply_to ) end end |
#run ⇒ Object
25 26 27 28 29 30 31 32 |
# File 'lib/railway_ipc/rpc/server/server.rb', line 25 def run rabbit_connection .connect .create_exchange .create_queue(durable: true) .bind_queue_to_exchange subscribe_to_queue end |
#stop ⇒ Object
34 35 36 |
# File 'lib/railway_ipc/rpc/server/server.rb', line 34 def stop rabbit_connection.disconnect end |
#work(payload) ⇒ Object
rubocop:disable Metrics/AbcSize rubocop:disable Metrics/MethodLength
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/railway_ipc/rpc/server/server.rb', line 40 def work(payload) decoded_payload = RailwayIpc::Rabbitmq::Payload.decode(payload) case decoded_payload.type when *registered_handlers responder = get_responder(decoded_payload) @message = (decoded_payload).decode(decoded_payload.) responder.respond() else @message = LearnIpc::ErrorMessage.decode(decoded_payload.) raise RailwayIpc::UnhandledMessageError.new("#{self.class} does not know how to handle #{decoded_payload.type}") end rescue StandardError => e RailwayIpc.logger.error( e., feature: 'railway_ipc_consumer', exchange: self.class.exchange_name, queue: self.class.queue_name, error: e.class, payload: payload ) raise e end |