Class: RailwayIpc::Client
- Inherits:
-
Object
- Object
- RailwayIpc::Client
- Defined in:
- lib/railway_ipc/rpc/client/client.rb,
lib/railway_ipc/rpc/client/errors/timeout_error.rb
Defined Under Namespace
Classes: TimeoutError
Instance Attribute Summary collapse
-
#message ⇒ Object
readonly
Returns the value of attribute message.
-
#rabbit_connection ⇒ Object
readonly
Returns the value of attribute rabbit_connection.
-
#request_message ⇒ Object
Returns the value of attribute request_message.
-
#response_message ⇒ Object
Returns the value of attribute response_message.
Class Method Summary collapse
Instance Method Summary collapse
- #await_response(timeout) ⇒ Object
-
#initialize(request_message, opts = { automatic_recovery: false }, rabbit_adapter: RailwayIpc::Rabbitmq::Adapter) ⇒ Client
constructor
A new instance of Client.
-
#process_payload(response) ⇒ Object
rubocop:disable Metrics/AbcSize.
- #registered_handlers ⇒ Object
- #request(timeout = 10) ⇒ Object
-
#setup_rabbit_connection ⇒ Object
rubocop:enable Metrics/AbcSize.
Methods included from RPC::PublishLocationConfigurable
Methods included from RPC::ErrorAdapterConfigurable
rpc_error_adapter, rpc_error_adapter_class
Constructor Details
#initialize(request_message, opts = { automatic_recovery: false }, rabbit_adapter: RailwayIpc::Rabbitmq::Adapter) ⇒ Client
Returns a new instance of Client.
24 25 26 27 |
# File 'lib/railway_ipc/rpc/client/client.rb', line 24 def initialize(, opts={ automatic_recovery: false }, rabbit_adapter: RailwayIpc::Rabbitmq::Adapter) @rabbit_connection = rabbit_adapter.new(exchange_name: self.class.exchange_name, options: opts) @request_message = end |
Instance Attribute Details
#message ⇒ Object (readonly)
Returns the value of attribute message.
11 12 13 |
# File 'lib/railway_ipc/rpc/client/client.rb', line 11 def @message end |
#rabbit_connection ⇒ Object (readonly)
Returns the value of attribute rabbit_connection.
11 12 13 |
# File 'lib/railway_ipc/rpc/client/client.rb', line 11 def rabbit_connection @rabbit_connection end |
#request_message ⇒ Object
Returns the value of attribute request_message.
10 11 12 |
# File 'lib/railway_ipc/rpc/client/client.rb', line 10 def @request_message end |
#response_message ⇒ Object
Returns the value of attribute response_message.
10 11 12 |
# File 'lib/railway_ipc/rpc/client/client.rb', line 10 def @response_message end |
Class Method Details
.handle_response(response_type) ⇒ Object
20 21 22 |
# File 'lib/railway_ipc/rpc/client/client.rb', line 20 def self.handle_response(response_type) RPC::ClientResponseHandlers.instance.register(response_type) end |
.request(message) ⇒ Object
16 17 18 |
# File 'lib/railway_ipc/rpc/client/client.rb', line 16 def self.request() new().request end |
Instance Method Details
#await_response(timeout) ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/railway_ipc/rpc/client/client.rb', line 68 def await_response(timeout) rabbit_connection.(timeout: timeout) do |_, _, payload| self. = process_payload(payload) end rescue RailwayIpc::Rabbitmq::Adapter::TimeoutError # rubocop:disable Style/RedundantSelf error = self.class.rpc_error_adapter_class.(TimeoutError.new, self.) # rubocop:enable Style/RedundantSelf self. = RailwayIpc::Response.new(error, success: false) rescue StandardError self. = RailwayIpc::Response.new(, success: false) ensure rabbit_connection.disconnect end |
#process_payload(response) ⇒ Object
rubocop:disable Metrics/AbcSize
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/railway_ipc/rpc/client/client.rb', line 42 def process_payload(response) decoded_payload = decode_payload(response) case decoded_payload.type when *registered_handlers @message = (decoded_payload).decode(decoded_payload.) RailwayIpc.logger.info( 'Handling response', feature: 'railway_ipc_consumer', exchange: self.class.exchange_name, protobuf: { type: .class, data: } ) RailwayIpc::Response.new(, success: true) else @message = LearnIpc::ErrorMessage.decode(decoded_payload.) raise RailwayIpc::UnhandledMessageError.new("#{self.class} does not know how to handle #{decoded_payload.type}") end end |
#registered_handlers ⇒ Object
37 38 39 |
# File 'lib/railway_ipc/rpc/client/client.rb', line 37 def registered_handlers RailwayIpc::RPC::ClientResponseHandlers.instance.registered end |
#request(timeout = 10) ⇒ Object
29 30 31 32 33 34 35 |
# File 'lib/railway_ipc/rpc/client/client.rb', line 29 def request(timeout=10) setup_rabbit_connection await_response(timeout) end |
#setup_rabbit_connection ⇒ Object
rubocop:enable Metrics/AbcSize
61 62 63 64 65 66 |
# File 'lib/railway_ipc/rpc/client/client.rb', line 61 def setup_rabbit_connection rabbit_connection .connect .create_exchange .create_queue(auto_delete: true, exclusive: true) end |